Improve websocket notifications metrics.

This commit is contained in:
Andrey Antukh 2021-01-25 20:15:28 +01:00 committed by Hirunatan
parent f8b349814c
commit 8236d84dfa

View file

@ -41,19 +41,32 @@
(defmethod ig/init-key ::handler
[_ {:keys [session metrics] :as cfg}]
(let [wrap-session (:middleware session)
mtx-active-conn (mtx/create
{:name "http_ws_notifications_active_connections"
:registry (:registry metrics)
:type :gauge
:help "Active websocket connections on notifications service."})
mtx-msg-counter (mtx/create
{:name "http_ws_notifications_message_counter"
:registry (:registry metrics)
:type :counter
:help "Counter of total messages processed on websocket conenction on notifications service."})
cfg (assoc cfg
:mtx-active-conn mtx-active-conn
:mtx-msg-counter mtx-msg-counter)]
mtx-active-connections
(mtx/create
{:name "websocket_notifications_active_connections"
:registry (:registry metrics)
:type :gauge
:help "Active websocket connections on notifications service."})
mtx-message-recv
(mtx/create
{:name "websocket_notifications_message_recv_timing"
:registry (:registry metrics)
:type :summary
:help "Message receive summary timing (ms)."})
mtx-message-send
(mtx/create
{:name "websocket_notifications_message_send_timing"
:registry (:registry metrics)
:type :summary
:help "Message receive summary timing (ms)."})
cfg (assoc cfg
:mtx-active-connections mtx-active-connections
:mtx-message-recv mtx-message-recv
:mtx-message-send mtx-message-send)]
(-> #(handler cfg %)
(wrap-session)
(wrap-keyword-params)
@ -100,18 +113,10 @@
;; WebSocket Http Handler
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare on-connect)
(declare handle-connect)
(defrecord WebSocket [conn in out sub])
;; (defonce metrics-active-connections
;; (mtx/gauge {:id "notificatons__active_connections"
;; :help "Active connections to the notifications service."}))
;; (defonce metrics-message-counter
;; (mtx/counter {:id "notificatons__messages_counter"
;; :help "A total number of messages handled by the notifications service."}))
(defn- ws-send
[conn data]
(try
@ -122,51 +127,55 @@
false)))
(defn websocket
[{:keys [file-id team-id redis mtx-active-conn mtx-msg-counter] :as cfg}]
[{:keys [file-id team-id redis] :as cfg}]
(let [in (a/chan 32)
out (a/chan 32)]
{:on-connect
(fn [conn]
(mtx-active-conn :inc)
(let [sub (rd/subscribe redis {:xform (map t/decode-str)
:topics [file-id team-id]})
ws (WebSocket. conn in out sub nil cfg)]
out (a/chan 32)
mtx-active-connections (:mtx-active-connections cfg)
mtx-message-send (:mtx-message-send cfg)
mtx-message-recv (:mtx-message-recv cfg)
;; message forwarding loop
(a/go-loop []
(let [val (a/<! out)]
(when-not (nil? val)
(when (ws-send conn (t/encode-str val))
(recur)))))
ws-send (mtx/wrap-summary ws-send mtx-message-send)]
(a/go
(a/<! (on-connect ws))
(a/close! sub))))
(letfn [(on-connect [conn]
(mtx-active-connections :inc)
(let [sub (rd/subscribe redis {:xform (map t/decode-str)
:topics [file-id team-id]})
ws (WebSocket. conn in out sub nil cfg)]
:on-error
(fn [_conn _e]
(a/close! out)
(a/close! in))
;; message forwarding loop
(a/go-loop []
(let [val (a/<! out)]
(when-not (nil? val)
(when (ws-send conn (t/encode-str val))
(recur)))))
:on-close
(fn [_conn _status _reason]
(mtx-active-conn :dec)
(a/close! out)
(a/close! in))
(a/go
(a/<! (handle-connect ws))
(a/close! sub))))
:on-text
(fn [_ws message]
(mtx-msg-counter :inc)
(let [message (t/decode-str message)]
(a/>!! in message)))
(on-error [_conn _e]
(a/close! out)
(a/close! in))
:on-bytes
(constantly nil)}))
(on-close [_conn _status _reason]
(mtx-active-connections :dec)
(a/close! out)
(a/close! in))
(on-message [_ws message]
(let [message (t/decode-str message)]
(a/>!! in message)))]
{:on-connect on-connect
:on-error on-error
:on-close on-close
:on-text (mtx/wrap-summary on-message mtx-message-recv)
:on-bytes (constantly nil)})))
(declare handle-message)
(declare start-loop!)
(defn- on-connect
(defn- handle-connect
[{:keys [conn] :as ws}]
(a/go
(try