From 8236d84dfa777c3dc8cd779c90eafb6b989cb3d3 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 25 Jan 2021 20:15:28 +0100 Subject: [PATCH] :sparkles: Improve websocket notifications metrics. --- backend/src/app/notifications.clj | 121 ++++++++++++++++-------------- 1 file changed, 65 insertions(+), 56 deletions(-) diff --git a/backend/src/app/notifications.clj b/backend/src/app/notifications.clj index 8fda876963..7a38bb8a18 100644 --- a/backend/src/app/notifications.clj +++ b/backend/src/app/notifications.clj @@ -41,19 +41,32 @@ (defmethod ig/init-key ::handler [_ {:keys [session metrics] :as cfg}] (let [wrap-session (:middleware session) - mtx-active-conn (mtx/create - {:name "http_ws_notifications_active_connections" - :registry (:registry metrics) - :type :gauge - :help "Active websocket connections on notifications service."}) - mtx-msg-counter (mtx/create - {:name "http_ws_notifications_message_counter" - :registry (:registry metrics) - :type :counter - :help "Counter of total messages processed on websocket conenction on notifications service."}) - cfg (assoc cfg - :mtx-active-conn mtx-active-conn - :mtx-msg-counter mtx-msg-counter)] + + mtx-active-connections + (mtx/create + {:name "websocket_notifications_active_connections" + :registry (:registry metrics) + :type :gauge + :help "Active websocket connections on notifications service."}) + + mtx-message-recv + (mtx/create + {:name "websocket_notifications_message_recv_timing" + :registry (:registry metrics) + :type :summary + :help "Message receive summary timing (ms)."}) + + mtx-message-send + (mtx/create + {:name "websocket_notifications_message_send_timing" + :registry (:registry metrics) + :type :summary + :help "Message receive summary timing (ms)."}) + + cfg (assoc cfg + :mtx-active-connections mtx-active-connections + :mtx-message-recv mtx-message-recv + :mtx-message-send mtx-message-send)] (-> #(handler cfg %) (wrap-session) (wrap-keyword-params) @@ -100,18 +113,10 @@ ;; WebSocket Http Handler ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare on-connect) +(declare handle-connect) (defrecord WebSocket [conn in out sub]) -;; (defonce metrics-active-connections -;; (mtx/gauge {:id "notificatons__active_connections" -;; :help "Active connections to the notifications service."})) - -;; (defonce metrics-message-counter -;; (mtx/counter {:id "notificatons__messages_counter" -;; :help "A total number of messages handled by the notifications service."})) - (defn- ws-send [conn data] (try @@ -122,51 +127,55 @@ false))) (defn websocket - [{:keys [file-id team-id redis mtx-active-conn mtx-msg-counter] :as cfg}] + [{:keys [file-id team-id redis] :as cfg}] (let [in (a/chan 32) - out (a/chan 32)] - {:on-connect - (fn [conn] - (mtx-active-conn :inc) - (let [sub (rd/subscribe redis {:xform (map t/decode-str) - :topics [file-id team-id]}) - ws (WebSocket. conn in out sub nil cfg)] + out (a/chan 32) + mtx-active-connections (:mtx-active-connections cfg) + mtx-message-send (:mtx-message-send cfg) + mtx-message-recv (:mtx-message-recv cfg) - ;; message forwarding loop - (a/go-loop [] - (let [val (a/!! in message))) + (on-error [_conn _e] + (a/close! out) + (a/close! in)) - :on-bytes - (constantly nil)})) + (on-close [_conn _status _reason] + (mtx-active-connections :dec) + (a/close! out) + (a/close! in)) + + (on-message [_ws message] + (let [message (t/decode-str message)] + (a/>!! in message)))] + + {:on-connect on-connect + :on-error on-error + :on-close on-close + :on-text (mtx/wrap-summary on-message mtx-message-recv) + :on-bytes (constantly nil)}))) (declare handle-message) (declare start-loop!) -(defn- on-connect +(defn- handle-connect [{:keys [conn] :as ws}] (a/go (try