diff --git a/backend/src/app/notifications.clj b/backend/src/app/notifications.clj index 4b9a72ce1..61f198814 100644 --- a/backend/src/app/notifications.clj +++ b/backend/src/app/notifications.clj @@ -14,6 +14,7 @@ [app.db :as db] [app.metrics :as mtx] [app.util.async :as aa] + [app.util.time :as dt] [app.util.transit :as t] [clojure.core.async :as a] [clojure.spec.alpha :as s] @@ -46,29 +47,32 @@ mtx-active-connections (mtx/create - {:name "websocket_notifications_active_connections" + {:name "websocket_active_connections" :registry (:registry metrics) :type :gauge - :help "Active websocket connections on notifications service."}) + :help "Active websocket connections."}) - mtx-message-recv + mtx-messages (mtx/create - {:name "websocket_notifications_message_recv_timing" + {:name "websocket_message_count" :registry (:registry metrics) - :type :summary - :help "Message receive summary timing (ms)."}) + :labels ["op"] + :type :counter + :help "Counter of processed messages."}) - mtx-message-send + mtx-sessions (mtx/create - {:name "websocket_notifications_message_send_timing" + {:name "websocket_session_timing" :registry (:registry metrics) - :type :summary - :help "Message receive summary timing (ms)."}) + :quantiles [] + :help "Websocket session timing (seconds)." + :type :summary}) cfg (assoc cfg :mtx-active-connections mtx-active-connections - :mtx-message-recv mtx-message-recv - :mtx-message-send mtx-message-send)] + :mtx-messages mtx-messages + :mtx-sessions mtx-sessions + )] (-> #(handler cfg %) (wrap-session) (wrap-keyword-params) @@ -130,16 +134,17 @@ (defn websocket [{:keys [file-id team-id msgbus] :as cfg}] - (let [in (a/chan 32) - out (a/chan 32) - mtx-active-connections (:mtx-active-connections cfg) - mtx-message-send (:mtx-message-send cfg) - mtx-message-recv (:mtx-message-recv cfg) + (let [in (a/chan 32) + out (a/chan 32) + mtx-aconn (:mtx-active-connections cfg) + mtx-messages (:mtx-messages cfg) + mtx-sessions (:mtx-sessions cfg) + created-at (dt/now) - ws-send (mtx/wrap-summary ws-send mtx-message-send)] + ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])] (letfn [(on-connect [conn] - (mtx-active-connections :inc) + (mtx-aconn :inc) (let [sub (a/chan) ws (WebSocket. conn in out sub nil cfg)] @@ -159,11 +164,14 @@ (a/close! sub)))) (on-error [_conn _e] + (mtx-aconn :dec) + (mtx-sessions :observe (/ (inst-ms (dt/duration-between created-at (dt/now))) 1000.0)) (a/close! out) (a/close! in)) (on-close [_conn _status _reason] - (mtx-active-connections :dec) + (mtx-aconn :dec) + (mtx-sessions :observe (/ (inst-ms (dt/duration-between created-at (dt/now))) 1000.0)) (a/close! out) (a/close! in)) @@ -174,7 +182,7 @@ {:on-connect on-connect :on-error on-error :on-close on-close - :on-text (mtx/wrap-summary on-message mtx-message-recv) + :on-text (mtx/wrap-counter on-message mtx-messages ["recv"]) :on-bytes (constantly nil)}))) (declare handle-message)