diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index f39a89891..39f961afb 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -9,28 +9,103 @@ (:require [app.common.exceptions :as ex] [app.common.logging :as l] + [app.common.pprint :as pp] [app.common.spec :as us] [app.db :as db] [app.metrics :as mtx] + [app.util.time :as dt] [app.util.websocket :as ws] [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] [yetti.websocket :as yws])) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; WEBSOCKET HOOKS +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def state (atom {})) + +(defn- on-connect + [{:keys [metrics]} wsp] + (let [created-at (dt/now)] + (swap! state assoc (::ws/id @wsp) wsp) + (mtx/run! metrics {:id :websocket-active-connections :inc 1}) + (fn [] + (swap! state dissoc (::ws/id @wsp)) + (mtx/run! metrics {:id :websocket-active-connections :dec 1}) + (mtx/run! metrics {:id :websocket-session-timing + :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)})))) + +(defn- on-rcv-message + [{:keys [metrics]} _ message] + (mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1}) + message) + +(defn- on-snd-message + [{:keys [metrics]} _ message] + (mtx/run! metrics {:id :websocket-messages-total :labels ["send"] :inc 1}) + message) + +;; REPL HELPERS + +(defn repl-get-connections-for-file + [file-id] + (->> (vals @state) + (filter #(= file-id (-> % deref ::file-subscription :file-id))) + (map deref) + (map ::ws/id))) + +(defn repl-get-connections-for-team + [team-id] + (->> (vals @state) + (filter #(= team-id (-> % deref ::team-subscription :team-id))) + (map deref) + (map ::ws/id))) + +(defn repl-close-connection + [id] + (when-let [wsp (get @state id)] + (a/>!! (::ws/close-ch @wsp) [8899 "closed from server"]) + (a/close! (::ws/close-ch @wsp)))) + +(defn repl-get-connection-info + [id] + (when-let [wsp (get @state id)] + {:id id + :created-at (dt/instant id) + :profile-id (::profile-id @wsp) + :session-id (::session-id @wsp) + :user-agent (::ws/user-agent @wsp) + :ip-addr (::ws/remote-addr @wsp) + :last-activity-at (::ws/last-activity-at @wsp) + :http-session-id (::ws/http-session-id @wsp) + :subscribed-file (-> wsp deref ::file-subscription :file-id) + :subscribed-team (-> wsp deref ::team-subscription :team-id)})) + +(defn repl-print-connection-info + [id] + (some-> id repl-get-connection-info pp/pprint)) + +(defn repl-print-connection-info-for-file + [file-id] + (some->> (repl-get-connections-for-file file-id) + (map repl-get-connection-info) + (pp/pprint))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; WEBSOCKET HANDLER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defmulti handle-message - (fn [_ message] + (fn [_ _ message] (:type message))) (defmethod handle-message :connect - [wsp _] - (l/trace :fn "handle-message" :event :connect) + [cfg wsp _] - (let [msgbus-fn (:msgbus @wsp) + (let [msgbus-fn (:msgbus cfg) + conn-id (::ws/id @wsp) profile-id (::profile-id @wsp) session-id (::session-id @wsp) output-ch (::ws/output-ch @wsp) @@ -38,94 +113,122 @@ xform (remove #(= (:session-id %) session-id)) channel (a/chan (a/dropping-buffer 16) xform)] - (swap! wsp assoc ::profile-subs-channel channel) + (l/trace :fn "handle-message" :event :connect :conn-id conn-id) + + ;; Subscribe to the profile channel and forward all messages to + ;; websocket output channel (send them to the client). + (swap! wsp assoc ::profile-subscription channel) (a/pipe channel output-ch false) (msgbus-fn :cmd :sub :topic profile-id :chan channel))) (defmethod handle-message :disconnect - [wsp _] - (l/trace :fn "handle-message" :event :disconnect) - (a/go - (let [msgbus-fn (:msgbus @wsp) - profile-id (::profile-id @wsp) - session-id (::session-id @wsp) - profile-ch (::profile-subs-channel @wsp) - subs (::subscriptions @wsp)] + [cfg wsp _] + (let [msgbus-fn (:msgbus cfg) + conn-id (::ws/id @wsp) + profile-id (::profile-id @wsp) + session-id (::session-id @wsp) + profile-ch (::profile-subscription @wsp) + fsub (::file-subscription @wsp) + tsub (::team-subscription @wsp) + message {:type :disconnect + :subs-id profile-id + :profile-id profile-id + :session-id session-id}] + + (l/trace :fn "handle-message" + :event :disconnect + :conn-id conn-id) + + (a/go ;; Close the main profile subscription (a/close! profile-ch) (a/! output-ch message) - (recur))) + (l/trace :fn "handle-message" + :event :subscribe-team + :team-id team-id + :conn-id conn-id) + + (a/pipe channel output-ch false) + + (let [state {:team-id team-id :channel channel :topic team-id}] + (swap! wsp assoc ::team-subscription state)) + + (a/go + ;; Close previous subscription if exists + (when-let [channel (:channel prev-subs)] + (a/close! channel) + (a/! output-ch message) + (recur)))) (a/go ;; Subscribe to file topic @@ -134,6 +237,7 @@ ;; Notifify the rest of participants of the new connection. (let [message {:type :join-file :file-id file-id + :subs-id file-id :session-id session-id :profile-id profile-id}] (a/ message - (dissoc :subs-id) - (assoc :profile-id profile-id) - (assoc :session-id session-id))] - + [cfg wsp {:keys [file-id] :as message}] + (let [msgbus-fn (:msgbus cfg) + profile-id (::profile-id @wsp) + session-id (::session-id @wsp) + subs (::file-subscription @wsp) + message (-> message + (assoc :subs-id file-id) + (assoc :profile-id profile-id) + (assoc :session-id session-id))] + (a/go + ;; Only allow receive pointer updates when active subscription + (when subs (a/ cfg - (assoc ::profile-id profile-id) - (assoc ::session-id session-id))] - - (l/trace :hint "http request to websocket" :profile-id profile-id :session-id session-id) + (let [{:keys [session-id]} (us/conform ::handler-params params)] (cond (not profile-id) (raise (ex/error :type :authentication @@ -218,6 +327,15 @@ :hint "this endpoint only accepts websocket connections")) :else - (->> (ws/handler handle-message cfg) - (yws/upgrade req) - (respond)))))) + (do + (l/trace :hint "websocket request" :profile-id profile-id :session-id session-id) + + (->> (ws/handler + ::ws/on-rcv-message (partial on-rcv-message cfg) + ::ws/on-snd-message (partial on-snd-message cfg) + ::ws/on-connect (partial on-connect cfg) + ::ws/handler (partial handle-message cfg) + ::profile-id profile-id + ::session-id session-id) + (yws/upgrade req) + (respond))))))) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 4bae83abd..e14bf9e12 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -160,7 +160,6 @@ "Function responsible to attach local subscription to the state. Intended to be used in agent." [state cfg topics chan done-ch] - (l/trace :hint "subscribe-to-topics" :topics topics ::l/async false) (aa/with-closing done-ch (let [state (update state :chans assoc chan topics)] (reduce (fn [state topic] @@ -184,15 +183,15 @@ useful when client disconnects or in-bulk unsubscribe operations. Intended to be executed in agent." [state cfg channels done-ch] - (l/trace :hint "unsubscribe-channels" :chans (count channels) ::l/async false) (aa/with-closing done-ch (reduce #(unsubscribe-single-channel %1 cfg %2) state channels))) + (defn- subscribe [{:keys [::state executor] :as cfg} {:keys [topic topics chan]}] (let [done-ch (a/chan) topics (into [] (map prefix-topic) (if topic [topic] topics))] - (l/trace :hint "subscribe" :topics topics) + (l/debug :hint "subscribe" :topics topics) (send-via executor state subscribe-to-topics cfg topics chan done-ch) done-ch)) diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index e4f8a12fe..4909049fe 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -10,9 +10,10 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.transit :as t] - [app.metrics :as mtx] + [app.loggers.audit :refer [parse-client-ip]] [app.util.time :as dt] [clojure.core.async :as a] + [yetti.request :as yr] [yetti.util :as yu] [yetti.websocket :as yws]) (:import @@ -25,8 +26,10 @@ (declare process-output) (declare ws-ping!) (declare ws-send!) +(declare filter-options) (def noop (constantly nil)) +(def identity-3 (fn [_ _ o] o)) (defn handler "A WebSocket upgrade handler factory. Returns a handler that can be @@ -39,94 +42,123 @@ It also accepts some options that allows you parametrize the protocol behavior. The options map will be used as-as for the initial data of the `ws` data structure" - ([handle-message] (handler handle-message {})) - ([handle-message {:keys [::input-buff-size - ::output-buff-size - ::idle-timeout - metrics] - :or {input-buff-size 64 - output-buff-size 64 - idle-timeout 30000} - :as options}] - (fn [{:keys [::yws/channel] :as request}] - (let [input-ch (a/chan input-buff-size) - output-ch (a/chan output-buff-size) - pong-ch (a/chan (a/sliding-buffer 6)) - close-ch (a/chan) + [& {:keys [::on-rcv-message + ::on-snd-message + ::on-connect + ::input-buff-size + ::output-buff-size + ::handler + ::idle-timeout] + :or {input-buff-size 64 + output-buff-size 64 + idle-timeout 30000 + on-connect noop + on-snd-message identity-3 + on-rcv-message identity-3} + :as options}] - options (atom - (-> options - (assoc ::input-ch input-ch) - (assoc ::output-ch output-ch) - (assoc ::close-ch close-ch) - (assoc ::channel channel) - (dissoc ::metrics))) + (assert (fn? on-rcv-message) "'on-rcv-message' should be a function") + (assert (fn? on-snd-message) "'on-snd-message' should be a function") + (assert (fn? on-connect) "'on-connect' should be a function") - terminated (atom false) - created-at (dt/now) + (fn [{:keys [::yws/channel session-id] :as request}] + (let [input-ch (a/chan input-buff-size) + output-ch (a/chan output-buff-size) + pong-ch (a/chan (a/sliding-buffer 6)) + close-ch (a/chan) + stop-ch (a/chan) - on-open - (fn [channel] - (mtx/run! metrics {:id :websocket-active-connections :inc 1}) - (yws/idle-timeout! channel (dt/duration idle-timeout))) + ip-addr (parse-client-ip request) + uagent (yr/get-header request "user-agent") + id (inst-ms (dt/now)) - on-terminate - (fn [& _args] - (when (compare-and-set! terminated false true) - (mtx/run! metrics {:id :websocket-active-connections :dec 1}) - (mtx/run! metrics {:id :websocket-session-timing :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}) + options (-> (filter-options options) + (merge {::id id + ::input-ch input-ch + ::output-ch output-ch + ::close-ch close-ch + ::stop-ch stop-ch + ::channel channel + ::remote-addr ip-addr + ::http-session-id session-id + ::user-agent uagent}) + (atom)) - (a/close! close-ch) - (a/close! pong-ch) - (a/close! output-ch) - (a/close! input-ch))) + ;; call the on-connect hook and memoize the on-terminate instance + on-terminate (on-connect options) - on-error - (fn [_ error] - (on-terminate) - ;; TODO: properly log timeout exceptions - (when-not (or (instance? java.nio.channels.ClosedChannelException error) - (instance? java.net.SocketException error)) - (l/error :hint (ex-message error) :cause error))) + on-ws-open + (fn [channel] + (l/trace :fn "on-ws-open" :conn-id id) + (yws/idle-timeout! channel (dt/duration idle-timeout))) - on-message - (fn [_ message] - (mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1}) - (try - (let [message (t/decode-str message)] - (a/offer! input-ch message)) - (catch Throwable e - (l/warn :hint "error on decoding incoming message from websocket" - :wsmsg (pr-str message) - :cause e) - (on-terminate)))) + on-ws-terminate + (fn [_ code reason] + (l/trace :fn "on-ws-terminate" :conn-id id :code code :reason reason) + (a/close! close-ch)) - on-pong - (fn [_ buffers] - (a/>!! pong-ch (yu/copy-many buffers)))] + on-ws-error + (fn [_ error] + (a/close! close-ch) + (when-not (or (instance? java.nio.channels.ClosedChannelException error) + (instance? java.net.SocketException error)) + (l/error :hint (ex-message error) :cause error))) - ;; launch heartbeat process - (-> @options - (assoc ::pong-ch pong-ch) - (assoc ::on-close on-terminate) - (process-heartbeat)) + on-ws-message + (fn [_ message] + (try + (let [message (on-rcv-message options message) + message (t/decode-str message)] + (a/offer! input-ch message) + (swap! options assoc ::last-activity-at (dt/now))) + (catch Throwable e + (l/warn :hint "error on decoding incoming message from websocket" + :wsmsg (pr-str message) + :cause e) + (a/>! close-ch [8801 "decode error"]) + (a/close! close-ch)))) - ;; Forward all messages from output-ch to the websocket - ;; connection - (a/go-loop [] - (when-let [val (a/!! pong-ch (yu/copy-many buffers)))] - ;; React on messages received from the client - (process-input options handle-message) + ;; Launch heartbeat process + (-> @options + (assoc ::pong-ch pong-ch) + (process-heartbeat)) - {:on-open on-open - :on-error on-error - :on-close on-terminate - :on-text on-message - :on-pong on-pong})))) + ;; Wait a close signal + (a/go + (let [[code reason] (a/! output-ch {:type :error :error (ex-data val)}) @@ -193,19 +225,21 @@ (a/= (count issued) max-missed-heartbeats) - (on-close channel -1 "heartbeat-timeout") + (do + (a/>! close-ch [8802 "heart-beat timeout"]) + (a/close! close-ch)) (recur (inc i))))))) (a/go-loop [] @@ -213,3 +247,11 @@ (swap! beats disj (decode-beat buffer)) (recur))))) +(defn- filter-options + "Remove from options all namespace qualified keys that matches the + current namespace." + [options] + (into {} + (remove (fn [[key]] + (= (namespace key) "app.util.websocket"))) + options)) diff --git a/frontend/src/app/main/data/users.cljs b/frontend/src/app/main/data/users.cljs index 7ea945437..eee3b49cc 100644 --- a/frontend/src/app/main/data/users.cljs +++ b/frontend/src/app/main/data/users.cljs @@ -173,8 +173,7 @@ (when (is-authenticated? profile) (->> (rx/of (profile-fetched profile) (fetch-teams) - (get-redirect-event) - (ws/initialize)) + (get-redirect-event)) (rx/observe-on :async))))))) (s/def ::invitation-token ::us/not-empty-string) diff --git a/frontend/src/app/main/data/websocket.cljs b/frontend/src/app/main/data/websocket.cljs index 1fbb26770..3612770d4 100644 --- a/frontend/src/app/main/data/websocket.cljs +++ b/frontend/src/app/main/data/websocket.cljs @@ -7,14 +7,19 @@ (ns app.main.data.websocket (:require [app.common.data.macros :as dm] + [app.common.logging :as l] [app.common.uri :as u] [app.config :as cf] [app.util.websocket :as ws] [beicon.core :as rx] [potok.core :as ptk])) +(l/set-level! :error) + (dm/export ws/send!) +(defonce ws-conn (volatile! nil)) + (defn- prepare-uri [params] (let [base (-> (u/join cf/public-uri "ws/notifications") @@ -30,35 +35,34 @@ [message] (ptk/reify ::send-message ptk/EffectEvent - (effect [_ state _] - (let [ws-conn (:ws-conn state)] - (ws/send! ws-conn message))))) + (effect [_ _ _] + (some-> @ws-conn (ws/send! message))))) (defn initialize [] (ptk/reify ::initialize - ptk/UpdateEvent - (update [_ state] - (let [sid (:session-id state) - uri (prepare-uri {:session-id sid})] - (assoc state :ws-conn (ws/create uri)))) - ptk/WatchEvent (watch [_ state stream] - (let [ws-conn (:ws-conn state) - stoper (rx/merge - (rx/filter (ptk/type? ::finalize) stream) - (rx/filter (ptk/type? ::initialize) stream))] + (l/trace :hint "event:initialize" :fn "watch") + (let [sid (:session-id state) + uri (prepare-uri {:session-id sid}) + ws (ws/create uri)] - (->> (rx/merge - (->> (ws/get-rcv-stream ws-conn) - (rx/filter ws/message-event?) - (rx/map :payload) - (rx/map #(ptk/data-event ::message %))) - (->> (ws/get-rcv-stream ws-conn) - (rx/filter ws/opened-event?) - (rx/map (fn [_] (ptk/data-event ::opened {}))))) - (rx/take-until stoper)))))) + (vreset! ws-conn ws) + + (let [stoper (rx/merge + (rx/filter (ptk/type? ::finalize) stream) + (rx/filter (ptk/type? ::initialize) stream))] + + (->> (rx/merge + (->> (ws/get-rcv-stream ws) + (rx/filter ws/message-event?) + (rx/map :payload) + (rx/map #(ptk/data-event ::message %))) + (->> (ws/get-rcv-stream ws) + (rx/filter ws/opened-event?) + (rx/map (fn [_] (ptk/data-event ::opened {}))))) + (rx/take-until stoper))))))) ;; --- Finalize Websocket @@ -66,5 +70,6 @@ [] (ptk/reify ::finalize ptk/EffectEvent - (effect [_ state _] - (some-> (:ws-conn state) ws/close!)))) + (effect [_ _ _] + (l/trace :hint "event:finalize" :fn "effect") + (some-> @ws-conn ws/close!)))) diff --git a/frontend/src/app/main/data/workspace/notifications.cljs b/frontend/src/app/main/data/workspace/notifications.cljs index 6d1527b7c..a9388898c 100644 --- a/frontend/src/app/main/data/workspace/notifications.cljs +++ b/frontend/src/app/main/data/workspace/notifications.cljs @@ -9,7 +9,6 @@ [app.common.data :as d] [app.common.pages.changes-spec :as pcs] [app.common.spec :as us] - [app.common.uuid :as uuid] [app.main.data.websocket :as dws] [app.main.data.workspace.changes :as dch] [app.main.data.workspace.libraries :as dwl] @@ -34,51 +33,53 @@ (ptk/reify ::initialize ptk/WatchEvent (watch [_ state stream] - (let [subs-id (uuid/next) - stoper (rx/filter (ptk/type? ::finalize) stream) + (let [stoper (rx/filter (ptk/type? ::finalize) stream) + profile-id (:profile-id state) - initmsg [{:type :subscribe-file - :subs-id subs-id - :file-id file-id} - {:type :subscribe-team - :team-id team-id}] + initmsg [{:type :subscribe-file + :file-id file-id} + {:type :subscribe-team + :team-id team-id}] - endmsg {:type :unsubscribe-file - :subs-id subs-id} + endmsg {:type :unsubscribe-file + :file-id file-id} - stream (->> (rx/merge - ;; Send the subscription message - (->> (rx/from initmsg) - (rx/map dws/send)) + stream (->> (rx/merge + ;; Send the subscription message + (->> (rx/from initmsg) + (rx/map dws/send)) - ;; Subscribe to notifications of the subscription - (->> stream - (rx/filter (ptk/type? ::dws/message)) - (rx/map deref) ;; :library-change events occur in a different file, but need to be processed anyway - (rx/filter #(or (= subs-id (:subs-id %)) (= (:type %) :library-change))) - (rx/map process-message)) + ;; Subscribe to notifications of the subscription + (->> stream + (rx/filter (ptk/type? ::dws/message)) + (rx/map deref) + (rx/filter (fn [{:keys [subs-id] :as msg}] + (or (= subs-id team-id) + (= subs-id profile-id) + (= subs-id file-id)))) + (rx/map process-message)) - ;; On reconnect, send again the subscription messages - (->> stream - (rx/filter (ptk/type? ::dws/opened)) - (rx/mapcat #(->> (rx/from initmsg) - (rx/map dws/send)))) + ;; On reconnect, send again the subscription messages + (->> stream + (rx/filter (ptk/type? ::dws/opened)) + (rx/mapcat #(->> (rx/from initmsg) + (rx/map dws/send)))) - ;; Emit presence event for current user; - ;; this is because websocket server don't - ;; emits this for the same user. - (rx/of (handle-presence {:type :connect - :session-id (:session-id state) - :profile-id (:profile-id state)})) + ;; Emit presence event for current user; + ;; this is because websocket server don't + ;; emits this for the same user. + (rx/of (handle-presence {:type :connect + :session-id (:session-id state) + :profile-id (:profile-id state)})) - ;; Emit to all other connected users the current pointer - ;; position changes. - (->> stream - (rx/filter ms/pointer-event?) - (rx/sample 50) - (rx/map #(handle-pointer-send subs-id file-id (:pt %))))) + ;; Emit to all other connected users the current pointer + ;; position changes. + (->> stream + (rx/filter ms/pointer-event?) + (rx/sample 50) + (rx/map #(handle-pointer-send file-id (:pt %))))) - (rx/take-until stoper))] + (rx/take-until stoper))] (rx/concat stream (rx/of (dws/send endmsg))))))) @@ -95,13 +96,12 @@ nil)) (defn- handle-pointer-send - [subs-id file-id point] + [file-id point] (ptk/reify ::handle-pointer-send ptk/WatchEvent (watch [_ state _] (let [page-id (:current-page-id state) message {:type :pointer-update - :subs-id subs-id :file-id file-id :page-id page-id :position point}] diff --git a/frontend/src/app/main/data/workspace/persistence.cljs b/frontend/src/app/main/data/workspace/persistence.cljs index 7b81dc8cd..445f5001c 100644 --- a/frontend/src/app/main/data/workspace/persistence.cljs +++ b/frontend/src/app/main/data/workspace/persistence.cljs @@ -163,7 +163,9 @@ (rx/map #(shapes-changes-persisted file-id %))))))) (rx/catch (fn [cause] (rx/concat - (rx/of (rt/assign-exception cause)) + (if (= :authentication (:type cause)) + (rx/empty) + (rx/of (rt/assign-exception cause))) (rx/throw cause)))))))))) diff --git a/frontend/src/app/main/errors.cljs b/frontend/src/app/main/errors.cljs index b55d1561f..7a8edbbbe 100644 --- a/frontend/src/app/main/errors.cljs +++ b/frontend/src/app/main/errors.cljs @@ -106,7 +106,6 @@ (js/console.groupEnd msg))) - ;; Error on parsing an SVG ;; TODO: looks unused and deprecated (defmethod ptk/handle-error :svg-parser