Improve websockets impl

Make it more extensible and move all the websocket unrelated stuff
to the new hooks API. Also adds observability from repl.
This commit is contained in:
Andrey Antukh 2022-06-27 14:49:08 +02:00
parent 935639411c
commit cbc5811290
8 changed files with 429 additions and 265 deletions

View file

@ -9,28 +9,103 @@
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.pprint :as pp]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.util.time :as dt]
[app.util.websocket :as ws] [app.util.websocket :as ws]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[yetti.websocket :as yws])) [yetti.websocket :as yws]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; WEBSOCKET HOOKS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def state (atom {}))
(defn- on-connect
[{:keys [metrics]} wsp]
(let [created-at (dt/now)]
(swap! state assoc (::ws/id @wsp) wsp)
(mtx/run! metrics {:id :websocket-active-connections :inc 1})
(fn []
(swap! state dissoc (::ws/id @wsp))
(mtx/run! metrics {:id :websocket-active-connections :dec 1})
(mtx/run! metrics {:id :websocket-session-timing
:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}))))
(defn- on-rcv-message
[{:keys [metrics]} _ message]
(mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1})
message)
(defn- on-snd-message
[{:keys [metrics]} _ message]
(mtx/run! metrics {:id :websocket-messages-total :labels ["send"] :inc 1})
message)
;; REPL HELPERS
(defn repl-get-connections-for-file
[file-id]
(->> (vals @state)
(filter #(= file-id (-> % deref ::file-subscription :file-id)))
(map deref)
(map ::ws/id)))
(defn repl-get-connections-for-team
[team-id]
(->> (vals @state)
(filter #(= team-id (-> % deref ::team-subscription :team-id)))
(map deref)
(map ::ws/id)))
(defn repl-close-connection
[id]
(when-let [wsp (get @state id)]
(a/>!! (::ws/close-ch @wsp) [8899 "closed from server"])
(a/close! (::ws/close-ch @wsp))))
(defn repl-get-connection-info
[id]
(when-let [wsp (get @state id)]
{:id id
:created-at (dt/instant id)
:profile-id (::profile-id @wsp)
:session-id (::session-id @wsp)
:user-agent (::ws/user-agent @wsp)
:ip-addr (::ws/remote-addr @wsp)
:last-activity-at (::ws/last-activity-at @wsp)
:http-session-id (::ws/http-session-id @wsp)
:subscribed-file (-> wsp deref ::file-subscription :file-id)
:subscribed-team (-> wsp deref ::team-subscription :team-id)}))
(defn repl-print-connection-info
[id]
(some-> id repl-get-connection-info pp/pprint))
(defn repl-print-connection-info-for-file
[file-id]
(some->> (repl-get-connections-for-file file-id)
(map repl-get-connection-info)
(pp/pprint)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; WEBSOCKET HANDLER ;; WEBSOCKET HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmulti handle-message (defmulti handle-message
(fn [_ message] (fn [_ _ message]
(:type message))) (:type message)))
(defmethod handle-message :connect (defmethod handle-message :connect
[wsp _] [cfg wsp _]
(l/trace :fn "handle-message" :event :connect)
(let [msgbus-fn (:msgbus @wsp) (let [msgbus-fn (:msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp) profile-id (::profile-id @wsp)
session-id (::session-id @wsp) session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp) output-ch (::ws/output-ch @wsp)
@ -38,94 +113,122 @@
xform (remove #(= (:session-id %) session-id)) xform (remove #(= (:session-id %) session-id))
channel (a/chan (a/dropping-buffer 16) xform)] channel (a/chan (a/dropping-buffer 16) xform)]
(swap! wsp assoc ::profile-subs-channel channel) (l/trace :fn "handle-message" :event :connect :conn-id conn-id)
;; Subscribe to the profile channel and forward all messages to
;; websocket output channel (send them to the client).
(swap! wsp assoc ::profile-subscription channel)
(a/pipe channel output-ch false) (a/pipe channel output-ch false)
(msgbus-fn :cmd :sub :topic profile-id :chan channel))) (msgbus-fn :cmd :sub :topic profile-id :chan channel)))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
[wsp _] [cfg wsp _]
(l/trace :fn "handle-message" :event :disconnect) (let [msgbus-fn (:msgbus cfg)
(a/go conn-id (::ws/id @wsp)
(let [msgbus-fn (:msgbus @wsp) profile-id (::profile-id @wsp)
profile-id (::profile-id @wsp) session-id (::session-id @wsp)
session-id (::session-id @wsp) profile-ch (::profile-subscription @wsp)
profile-ch (::profile-subs-channel @wsp) fsub (::file-subscription @wsp)
subs (::subscriptions @wsp)] tsub (::team-subscription @wsp)
message {:type :disconnect
:subs-id profile-id
:profile-id profile-id
:session-id session-id}]
(l/trace :fn "handle-message"
:event :disconnect
:conn-id conn-id)
(a/go
;; Close the main profile subscription ;; Close the main profile subscription
(a/close! profile-ch) (a/close! profile-ch)
(a/<! (msgbus-fn :cmd :purge :chans [profile-ch])) (a/<! (msgbus-fn :cmd :purge :chans [profile-ch]))
;; Close all other active subscrption on this websocket context. ;; Close tram subscription if exists
(doseq [{:keys [channel topic]} (map second subs)] (when-let [channel (:channel tsub)]
(a/close! channel) (a/close! channel)
(a/<! (msgbus-fn :cmd :pub :topic topic (a/<! (msgbus-fn :cmd :purge :chans [channel])))
:message {:type :disconnect
:profile-id profile-id (when-let [{:keys [topic channel]} fsub]
:session-id session-id})) (a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel])))))) (a/<! (msgbus-fn :cmd :purge :chans [channel]))
(a/<! (msgbus-fn :cmd :pub :topic topic :message message))))))
(defmethod handle-message :subscribe-team (defmethod handle-message :subscribe-team
[wsp {:keys [team-id] :as params}] [cfg wsp {:keys [team-id] :as params}]
(l/trace :fn "handle-message" :event :subscribe-team :team-id team-id) (let [msgbus-fn (:msgbus cfg)
conn-id (::ws/id @wsp)
(let [msgbus-fn (:msgbus @wsp)
session-id (::session-id @wsp) session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp) output-ch (::ws/output-ch @wsp)
subs (get-in @wsp [::subscriptions team-id]) prev-subs (get @wsp ::team-subscription)
xform (comp xform (comp
(remove #(= (:session-id %) session-id)) (remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id team-id)))] (map #(assoc % :subs-id team-id)))
(a/go
(when (not= (:team-id subs) team-id)
;; if it exists we just need to close that
(when-let [channel (:channel subs)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel])))
(let [channel (a/chan (a/dropping-buffer 64) xform)]
;; Message forwarding
(a/pipe channel output-ch false)
(let [state {:team-id team-id :channel channel :topic team-id}]
(swap! wsp update ::subscriptions assoc team-id state))
(a/<! (msgbus-fn :cmd :sub :topic team-id :chan channel)))))))
(defmethod handle-message :subscribe-file
[wsp {:keys [subs-id file-id] :as params}]
(l/trace :fn "handle-message" :event :subscribe-file :subs-id subs-id :file-id file-id)
(let [msgbus-fn (:msgbus @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
xform (comp
(remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id subs-id)))
channel (a/chan (a/dropping-buffer 64) xform)] channel (a/chan (a/dropping-buffer 64) xform)]
;; Message forwarding (l/trace :fn "handle-message"
(a/go-loop [] :event :subscribe-team
(when-let [{:keys [type] :as message} (a/<! channel)] :team-id team-id
(when (or (= :join-file type) :conn-id conn-id)
(= :leave-file type)
(= :disconnect type)) (a/pipe channel output-ch false)
(let [message {:type :presence
:file-id file-id (let [state {:team-id team-id :channel channel :topic team-id}]
:session-id session-id (swap! wsp assoc ::team-subscription state))
:profile-id profile-id}]
(a/<! (msgbus-fn :cmd :pub (a/go
:topic file-id ;; Close previous subscription if exists
:message message)))) (when-let [channel (:channel prev-subs)]
(a/>! output-ch message) (a/close! channel)
(recur))) (a/<! (msgbus-fn :cmd :purge :chans [channel]))))
(a/go
(a/<! (msgbus-fn :cmd :sub :topic team-id :chan channel)))))
(defmethod handle-message :subscribe-file
[cfg wsp {:keys [file-id] :as params}]
(let [msgbus-fn (:msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
prev-subs (::file-subscription @wsp)
xform (comp (remove #(= (:session-id %) session-id))
(map #(assoc % :subs-id file-id)))
channel (a/chan (a/dropping-buffer 64) xform)]
(l/trace :fn "handle-message"
:event :subscribe-file
:file-id file-id
:conn-id conn-id)
(let [state {:file-id file-id :channel channel :topic file-id}] (let [state {:file-id file-id :channel channel :topic file-id}]
(swap! wsp update ::subscriptions assoc subs-id state)) (swap! wsp assoc ::file-subscription state))
(a/go
;; Close previous subscription if exists
(when-let [channel (:channel prev-subs)]
(a/close! channel)
(a/<! (msgbus-fn :cmd :purge :chans [channel]))))
;; Message forwarding
(a/go
(loop []
(when-let [{:keys [type] :as message} (a/<! channel)]
(when (or (= :join-file type)
(= :leave-file type)
(= :disconnect type))
(let [message {:type :presence
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(a/<! (msgbus-fn :cmd :pub
:topic file-id
:message message))))
(a/>! output-ch message)
(recur))))
(a/go (a/go
;; Subscribe to file topic ;; Subscribe to file topic
@ -134,6 +237,7 @@
;; Notifify the rest of participants of the new connection. ;; Notifify the rest of participants of the new connection.
(let [message {:type :join-file (let [message {:type :join-file
:file-id file-id :file-id file-id
:subs-id file-id
:session-id session-id :session-id session-id
:profile-id profile-id}] :profile-id profile-id}]
(a/<! (msgbus-fn :cmd :pub (a/<! (msgbus-fn :cmd :pub
@ -141,49 +245,59 @@
:message message)))))) :message message))))))
(defmethod handle-message :unsubscribe-file (defmethod handle-message :unsubscribe-file
[wsp {:keys [subs-id] :as params}] [cfg wsp {:keys [file-id] :as params}]
(l/trace :fn "handle-message" :event :unsubscribe-file :subs-id subs-id) (let [msgbus-fn (:msgbus cfg)
(let [msgbus-fn (:msgbus @wsp) conn-id (::ws/id @wsp)
session-id (::session-id @wsp) session-id (::session-id @wsp)
profile-id (::profile-id @wsp)] profile-id (::profile-id @wsp)
subs (::file-subscription @wsp)
message {:type :leave-file
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(l/trace :fn "handle-message"
:event :unsubscribe-file
:file-id file-id
:conn-id conn-id)
(a/go (a/go
(when-let [{:keys [file-id channel]} (get-in @wsp [::subscriptions subs-id])] (when (= (:file-id subs) file-id)
(let [message {:type :leave-file (let [channel (:channel subs)]
:file-id file-id
:session-id session-id
:profile-id profile-id}]
(a/close! channel) (a/close! channel)
(a/<! (msgbus-fn :cmd :pub :topic file-id :message message)) (a/<! (msgbus-fn :cmd :purge :chans [channel]))
(a/<! (msgbus-fn :cmd :purge :chans [channel]))))))) (a/<! (msgbus-fn :cmd :pub :topic file-id :message message)))))))
(defmethod handle-message :keepalive (defmethod handle-message :keepalive
[_ _] [_ _ _]
(l/trace :fn "handle-message" :event :keepalive) (l/trace :fn "handle-message" :event :keepalive)
(a/go :nothing)) (a/go :nothing))
(defmethod handle-message :pointer-update (defmethod handle-message :pointer-update
[wsp {:keys [subs-id] :as message}] [cfg wsp {:keys [file-id] :as message}]
(a/go (let [msgbus-fn (:msgbus cfg)
;; Only allow receive pointer updates when active subscription profile-id (::profile-id @wsp)
(when-let [{:keys [topic]} (get-in @wsp [::subscriptions subs-id])] session-id (::session-id @wsp)
(let [msgbus-fn (:msgbus @wsp) subs (::file-subscription @wsp)
profile-id (::profile-id @wsp) message (-> message
session-id (::session-id @wsp) (assoc :subs-id file-id)
message (-> message (assoc :profile-id profile-id)
(dissoc :subs-id) (assoc :session-id session-id))]
(assoc :profile-id profile-id) (a/go
(assoc :session-id session-id))] ;; Only allow receive pointer updates when active subscription
(when subs
(a/<! (msgbus-fn :cmd :pub (a/<! (msgbus-fn :cmd :pub
:topic topic :topic file-id
:message message)))))) :message message))))))
(defmethod handle-message :default (defmethod handle-message :default
[_ message] [_ wsp message]
(a/go (let [conn-id (::ws/id @wsp)]
(l/log :level :warn (l/warn :hint "received unexpected message"
:msg "received unexpected message" :message message
:message message))) :conn-id conn-id)
(a/go :none)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HTTP HANDLER ;; HTTP HANDLER
@ -201,12 +315,7 @@
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ cfg] [_ cfg]
(fn [{:keys [profile-id params] :as req} respond raise] (fn [{:keys [profile-id params] :as req} respond raise]
(let [{:keys [session-id]} (us/conform ::handler-params params) (let [{:keys [session-id]} (us/conform ::handler-params params)]
cfg (-> cfg
(assoc ::profile-id profile-id)
(assoc ::session-id session-id))]
(l/trace :hint "http request to websocket" :profile-id profile-id :session-id session-id)
(cond (cond
(not profile-id) (not profile-id)
(raise (ex/error :type :authentication (raise (ex/error :type :authentication
@ -218,6 +327,15 @@
:hint "this endpoint only accepts websocket connections")) :hint "this endpoint only accepts websocket connections"))
:else :else
(->> (ws/handler handle-message cfg) (do
(yws/upgrade req) (l/trace :hint "websocket request" :profile-id profile-id :session-id session-id)
(respond))))))
(->> (ws/handler
::ws/on-rcv-message (partial on-rcv-message cfg)
::ws/on-snd-message (partial on-snd-message cfg)
::ws/on-connect (partial on-connect cfg)
::ws/handler (partial handle-message cfg)
::profile-id profile-id
::session-id session-id)
(yws/upgrade req)
(respond)))))))

View file

@ -160,7 +160,6 @@
"Function responsible to attach local subscription to the "Function responsible to attach local subscription to the
state. Intended to be used in agent." state. Intended to be used in agent."
[state cfg topics chan done-ch] [state cfg topics chan done-ch]
(l/trace :hint "subscribe-to-topics" :topics topics ::l/async false)
(aa/with-closing done-ch (aa/with-closing done-ch
(let [state (update state :chans assoc chan topics)] (let [state (update state :chans assoc chan topics)]
(reduce (fn [state topic] (reduce (fn [state topic]
@ -184,15 +183,15 @@
useful when client disconnects or in-bulk unsubscribe useful when client disconnects or in-bulk unsubscribe
operations. Intended to be executed in agent." operations. Intended to be executed in agent."
[state cfg channels done-ch] [state cfg channels done-ch]
(l/trace :hint "unsubscribe-channels" :chans (count channels) ::l/async false)
(aa/with-closing done-ch (aa/with-closing done-ch
(reduce #(unsubscribe-single-channel %1 cfg %2) state channels))) (reduce #(unsubscribe-single-channel %1 cfg %2) state channels)))
(defn- subscribe (defn- subscribe
[{:keys [::state executor] :as cfg} {:keys [topic topics chan]}] [{:keys [::state executor] :as cfg} {:keys [topic topics chan]}]
(let [done-ch (a/chan) (let [done-ch (a/chan)
topics (into [] (map prefix-topic) (if topic [topic] topics))] topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/trace :hint "subscribe" :topics topics) (l/debug :hint "subscribe" :topics topics)
(send-via executor state subscribe-to-topics cfg topics chan done-ch) (send-via executor state subscribe-to-topics cfg topics chan done-ch)
done-ch)) done-ch))

View file

@ -10,9 +10,10 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.transit :as t] [app.common.transit :as t]
[app.metrics :as mtx] [app.loggers.audit :refer [parse-client-ip]]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.core.async :as a] [clojure.core.async :as a]
[yetti.request :as yr]
[yetti.util :as yu] [yetti.util :as yu]
[yetti.websocket :as yws]) [yetti.websocket :as yws])
(:import (:import
@ -25,8 +26,10 @@
(declare process-output) (declare process-output)
(declare ws-ping!) (declare ws-ping!)
(declare ws-send!) (declare ws-send!)
(declare filter-options)
(def noop (constantly nil)) (def noop (constantly nil))
(def identity-3 (fn [_ _ o] o))
(defn handler (defn handler
"A WebSocket upgrade handler factory. Returns a handler that can be "A WebSocket upgrade handler factory. Returns a handler that can be
@ -39,94 +42,123 @@
It also accepts some options that allows you parametrize the It also accepts some options that allows you parametrize the
protocol behavior. The options map will be used as-as for the protocol behavior. The options map will be used as-as for the
initial data of the `ws` data structure" initial data of the `ws` data structure"
([handle-message] (handler handle-message {})) [& {:keys [::on-rcv-message
([handle-message {:keys [::input-buff-size ::on-snd-message
::output-buff-size ::on-connect
::idle-timeout ::input-buff-size
metrics] ::output-buff-size
:or {input-buff-size 64 ::handler
output-buff-size 64 ::idle-timeout]
idle-timeout 30000} :or {input-buff-size 64
:as options}] output-buff-size 64
(fn [{:keys [::yws/channel] :as request}] idle-timeout 30000
(let [input-ch (a/chan input-buff-size) on-connect noop
output-ch (a/chan output-buff-size) on-snd-message identity-3
pong-ch (a/chan (a/sliding-buffer 6)) on-rcv-message identity-3}
close-ch (a/chan) :as options}]
options (atom (assert (fn? on-rcv-message) "'on-rcv-message' should be a function")
(-> options (assert (fn? on-snd-message) "'on-snd-message' should be a function")
(assoc ::input-ch input-ch) (assert (fn? on-connect) "'on-connect' should be a function")
(assoc ::output-ch output-ch)
(assoc ::close-ch close-ch)
(assoc ::channel channel)
(dissoc ::metrics)))
terminated (atom false) (fn [{:keys [::yws/channel session-id] :as request}]
created-at (dt/now) (let [input-ch (a/chan input-buff-size)
output-ch (a/chan output-buff-size)
pong-ch (a/chan (a/sliding-buffer 6))
close-ch (a/chan)
stop-ch (a/chan)
on-open ip-addr (parse-client-ip request)
(fn [channel] uagent (yr/get-header request "user-agent")
(mtx/run! metrics {:id :websocket-active-connections :inc 1}) id (inst-ms (dt/now))
(yws/idle-timeout! channel (dt/duration idle-timeout)))
on-terminate options (-> (filter-options options)
(fn [& _args] (merge {::id id
(when (compare-and-set! terminated false true) ::input-ch input-ch
(mtx/run! metrics {:id :websocket-active-connections :dec 1}) ::output-ch output-ch
(mtx/run! metrics {:id :websocket-session-timing :val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}) ::close-ch close-ch
::stop-ch stop-ch
::channel channel
::remote-addr ip-addr
::http-session-id session-id
::user-agent uagent})
(atom))
(a/close! close-ch) ;; call the on-connect hook and memoize the on-terminate instance
(a/close! pong-ch) on-terminate (on-connect options)
(a/close! output-ch)
(a/close! input-ch)))
on-error on-ws-open
(fn [_ error] (fn [channel]
(on-terminate) (l/trace :fn "on-ws-open" :conn-id id)
;; TODO: properly log timeout exceptions (yws/idle-timeout! channel (dt/duration idle-timeout)))
(when-not (or (instance? java.nio.channels.ClosedChannelException error)
(instance? java.net.SocketException error))
(l/error :hint (ex-message error) :cause error)))
on-message on-ws-terminate
(fn [_ message] (fn [_ code reason]
(mtx/run! metrics {:id :websocket-messages-total :labels ["recv"] :inc 1}) (l/trace :fn "on-ws-terminate" :conn-id id :code code :reason reason)
(try (a/close! close-ch))
(let [message (t/decode-str message)]
(a/offer! input-ch message))
(catch Throwable e
(l/warn :hint "error on decoding incoming message from websocket"
:wsmsg (pr-str message)
:cause e)
(on-terminate))))
on-pong on-ws-error
(fn [_ buffers] (fn [_ error]
(a/>!! pong-ch (yu/copy-many buffers)))] (a/close! close-ch)
(when-not (or (instance? java.nio.channels.ClosedChannelException error)
(instance? java.net.SocketException error))
(l/error :hint (ex-message error) :cause error)))
;; launch heartbeat process on-ws-message
(-> @options (fn [_ message]
(assoc ::pong-ch pong-ch) (try
(assoc ::on-close on-terminate) (let [message (on-rcv-message options message)
(process-heartbeat)) message (t/decode-str message)]
(a/offer! input-ch message)
(swap! options assoc ::last-activity-at (dt/now)))
(catch Throwable e
(l/warn :hint "error on decoding incoming message from websocket"
:wsmsg (pr-str message)
:cause e)
(a/>! close-ch [8801 "decode error"])
(a/close! close-ch))))
;; Forward all messages from output-ch to the websocket on-ws-pong
;; connection (fn [_ buffers]
(a/go-loop [] (a/>!! pong-ch (yu/copy-many buffers)))]
(when-let [val (a/<! output-ch)]
(mtx/run! metrics {:id :websocket-messages-total :labels ["send"] :inc 1})
(a/<! (ws-send! channel (t/encode-str val)))
(recur)))
;; React on messages received from the client ;; Launch heartbeat process
(process-input options handle-message) (-> @options
(assoc ::pong-ch pong-ch)
(process-heartbeat))
{:on-open on-open ;; Wait a close signal
:on-error on-error (a/go
:on-close on-terminate (let [[code reason] (a/<! close-ch)]
:on-text on-message (a/close! stop-ch)
:on-pong on-pong})))) (a/close! pong-ch)
(a/close! output-ch)
(a/close! input-ch)
(when (and code reason)
(l/trace :hint "close channel condition" :code code :reason reason)
(yws/close! channel code reason))
(when (fn? on-terminate)
(on-terminate))))
;; Forward all messages from output-ch to the websocket
;; connection
(a/go-loop []
(when-let [val (a/<! output-ch)]
(let [val (on-snd-message options val)]
(a/<! (ws-send! channel (t/encode-str val)))
(recur))))
;; React on messages received from the client
(process-input options handler)
{:on-open on-ws-open
:on-error on-ws-error
:on-close on-ws-terminate
:on-text on-ws-message
:on-pong on-ws-pong})))
(defn- ws-send! (defn- ws-send!
[channel s] [channel s]
@ -172,14 +204,14 @@
(defn- process-input (defn- process-input
[wsp handler] [wsp handler]
(let [{:keys [::input-ch ::output-ch ::close-ch]} @wsp (let [{:keys [::input-ch ::output-ch ::stop-ch]} @wsp
handler (wrap-handler handler)] handler (wrap-handler handler)]
(a/go (a/go
(a/<! (handler wsp {:type :connect})) (a/<! (handler wsp {:type :connect}))
(a/<! (a/go-loop [] (a/<! (a/go-loop []
(when-let [message (a/<! input-ch)] (when-let [message (a/<! input-ch)]
(let [[val port] (a/alts! [(handler wsp message) close-ch])] (let [[val port] (a/alts! [stop-ch (handler wsp message)] :priority true)]
(when-not (= port close-ch) (when-not (= port stop-ch)
(cond (cond
(ex/ex-info? val) (ex/ex-info? val)
(a/>! output-ch {:type :error :error (ex-data val)}) (a/>! output-ch {:type :error :error (ex-data val)})
@ -193,19 +225,21 @@
(a/<! (handler wsp {:type :disconnect}))))) (a/<! (handler wsp {:type :disconnect})))))
(defn- process-heartbeat (defn- process-heartbeat
[{:keys [::channel ::close-ch ::on-close ::pong-ch [{:keys [::channel ::stop-ch ::close-ch ::pong-ch
::heartbeat-interval ::max-missed-heartbeats] ::heartbeat-interval ::max-missed-heartbeats]
:or {heartbeat-interval 2000 :or {heartbeat-interval 2000
max-missed-heartbeats 4}}] max-missed-heartbeats 4}}]
(let [beats (atom #{})] (let [beats (atom #{})]
(a/go-loop [i 0] (a/go-loop [i 0]
(let [[_ port] (a/alts! [close-ch (a/timeout heartbeat-interval)])] (let [[_ port] (a/alts! [stop-ch (a/timeout heartbeat-interval)] :priority true)]
(when (and (yws/connected? channel) (when (and (yws/connected? channel)
(not= port close-ch)) (not= port stop-ch))
(a/<! (ws-ping! channel (encode-beat i))) (a/<! (ws-ping! channel (encode-beat i)))
(let [issued (swap! beats conj (long i))] (let [issued (swap! beats conj (long i))]
(if (>= (count issued) max-missed-heartbeats) (if (>= (count issued) max-missed-heartbeats)
(on-close channel -1 "heartbeat-timeout") (do
(a/>! close-ch [8802 "heart-beat timeout"])
(a/close! close-ch))
(recur (inc i))))))) (recur (inc i)))))))
(a/go-loop [] (a/go-loop []
@ -213,3 +247,11 @@
(swap! beats disj (decode-beat buffer)) (swap! beats disj (decode-beat buffer))
(recur))))) (recur)))))
(defn- filter-options
"Remove from options all namespace qualified keys that matches the
current namespace."
[options]
(into {}
(remove (fn [[key]]
(= (namespace key) "app.util.websocket")))
options))

View file

@ -173,8 +173,7 @@
(when (is-authenticated? profile) (when (is-authenticated? profile)
(->> (rx/of (profile-fetched profile) (->> (rx/of (profile-fetched profile)
(fetch-teams) (fetch-teams)
(get-redirect-event) (get-redirect-event))
(ws/initialize))
(rx/observe-on :async))))))) (rx/observe-on :async)))))))
(s/def ::invitation-token ::us/not-empty-string) (s/def ::invitation-token ::us/not-empty-string)

View file

@ -7,14 +7,19 @@
(ns app.main.data.websocket (ns app.main.data.websocket
(:require (:require
[app.common.data.macros :as dm] [app.common.data.macros :as dm]
[app.common.logging :as l]
[app.common.uri :as u] [app.common.uri :as u]
[app.config :as cf] [app.config :as cf]
[app.util.websocket :as ws] [app.util.websocket :as ws]
[beicon.core :as rx] [beicon.core :as rx]
[potok.core :as ptk])) [potok.core :as ptk]))
(l/set-level! :error)
(dm/export ws/send!) (dm/export ws/send!)
(defonce ws-conn (volatile! nil))
(defn- prepare-uri (defn- prepare-uri
[params] [params]
(let [base (-> (u/join cf/public-uri "ws/notifications") (let [base (-> (u/join cf/public-uri "ws/notifications")
@ -30,35 +35,34 @@
[message] [message]
(ptk/reify ::send-message (ptk/reify ::send-message
ptk/EffectEvent ptk/EffectEvent
(effect [_ state _] (effect [_ _ _]
(let [ws-conn (:ws-conn state)] (some-> @ws-conn (ws/send! message)))))
(ws/send! ws-conn message)))))
(defn initialize (defn initialize
[] []
(ptk/reify ::initialize (ptk/reify ::initialize
ptk/UpdateEvent
(update [_ state]
(let [sid (:session-id state)
uri (prepare-uri {:session-id sid})]
(assoc state :ws-conn (ws/create uri))))
ptk/WatchEvent ptk/WatchEvent
(watch [_ state stream] (watch [_ state stream]
(let [ws-conn (:ws-conn state) (l/trace :hint "event:initialize" :fn "watch")
stoper (rx/merge (let [sid (:session-id state)
(rx/filter (ptk/type? ::finalize) stream) uri (prepare-uri {:session-id sid})
(rx/filter (ptk/type? ::initialize) stream))] ws (ws/create uri)]
(->> (rx/merge (vreset! ws-conn ws)
(->> (ws/get-rcv-stream ws-conn)
(rx/filter ws/message-event?) (let [stoper (rx/merge
(rx/map :payload) (rx/filter (ptk/type? ::finalize) stream)
(rx/map #(ptk/data-event ::message %))) (rx/filter (ptk/type? ::initialize) stream))]
(->> (ws/get-rcv-stream ws-conn)
(rx/filter ws/opened-event?) (->> (rx/merge
(rx/map (fn [_] (ptk/data-event ::opened {}))))) (->> (ws/get-rcv-stream ws)
(rx/take-until stoper)))))) (rx/filter ws/message-event?)
(rx/map :payload)
(rx/map #(ptk/data-event ::message %)))
(->> (ws/get-rcv-stream ws)
(rx/filter ws/opened-event?)
(rx/map (fn [_] (ptk/data-event ::opened {})))))
(rx/take-until stoper)))))))
;; --- Finalize Websocket ;; --- Finalize Websocket
@ -66,5 +70,6 @@
[] []
(ptk/reify ::finalize (ptk/reify ::finalize
ptk/EffectEvent ptk/EffectEvent
(effect [_ state _] (effect [_ _ _]
(some-> (:ws-conn state) ws/close!)))) (l/trace :hint "event:finalize" :fn "effect")
(some-> @ws-conn ws/close!))))

View file

@ -9,7 +9,6 @@
[app.common.data :as d] [app.common.data :as d]
[app.common.pages.changes-spec :as pcs] [app.common.pages.changes-spec :as pcs]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid]
[app.main.data.websocket :as dws] [app.main.data.websocket :as dws]
[app.main.data.workspace.changes :as dch] [app.main.data.workspace.changes :as dch]
[app.main.data.workspace.libraries :as dwl] [app.main.data.workspace.libraries :as dwl]
@ -34,51 +33,53 @@
(ptk/reify ::initialize (ptk/reify ::initialize
ptk/WatchEvent ptk/WatchEvent
(watch [_ state stream] (watch [_ state stream]
(let [subs-id (uuid/next) (let [stoper (rx/filter (ptk/type? ::finalize) stream)
stoper (rx/filter (ptk/type? ::finalize) stream) profile-id (:profile-id state)
initmsg [{:type :subscribe-file initmsg [{:type :subscribe-file
:subs-id subs-id :file-id file-id}
:file-id file-id} {:type :subscribe-team
{:type :subscribe-team :team-id team-id}]
:team-id team-id}]
endmsg {:type :unsubscribe-file endmsg {:type :unsubscribe-file
:subs-id subs-id} :file-id file-id}
stream (->> (rx/merge stream (->> (rx/merge
;; Send the subscription message ;; Send the subscription message
(->> (rx/from initmsg) (->> (rx/from initmsg)
(rx/map dws/send)) (rx/map dws/send))
;; Subscribe to notifications of the subscription ;; Subscribe to notifications of the subscription
(->> stream (->> stream
(rx/filter (ptk/type? ::dws/message)) (rx/filter (ptk/type? ::dws/message))
(rx/map deref) ;; :library-change events occur in a different file, but need to be processed anyway (rx/map deref)
(rx/filter #(or (= subs-id (:subs-id %)) (= (:type %) :library-change))) (rx/filter (fn [{:keys [subs-id] :as msg}]
(rx/map process-message)) (or (= subs-id team-id)
(= subs-id profile-id)
(= subs-id file-id))))
(rx/map process-message))
;; On reconnect, send again the subscription messages ;; On reconnect, send again the subscription messages
(->> stream (->> stream
(rx/filter (ptk/type? ::dws/opened)) (rx/filter (ptk/type? ::dws/opened))
(rx/mapcat #(->> (rx/from initmsg) (rx/mapcat #(->> (rx/from initmsg)
(rx/map dws/send)))) (rx/map dws/send))))
;; Emit presence event for current user; ;; Emit presence event for current user;
;; this is because websocket server don't ;; this is because websocket server don't
;; emits this for the same user. ;; emits this for the same user.
(rx/of (handle-presence {:type :connect (rx/of (handle-presence {:type :connect
:session-id (:session-id state) :session-id (:session-id state)
:profile-id (:profile-id state)})) :profile-id (:profile-id state)}))
;; Emit to all other connected users the current pointer ;; Emit to all other connected users the current pointer
;; position changes. ;; position changes.
(->> stream (->> stream
(rx/filter ms/pointer-event?) (rx/filter ms/pointer-event?)
(rx/sample 50) (rx/sample 50)
(rx/map #(handle-pointer-send subs-id file-id (:pt %))))) (rx/map #(handle-pointer-send file-id (:pt %)))))
(rx/take-until stoper))] (rx/take-until stoper))]
(rx/concat stream (rx/of (dws/send endmsg))))))) (rx/concat stream (rx/of (dws/send endmsg)))))))
@ -95,13 +96,12 @@
nil)) nil))
(defn- handle-pointer-send (defn- handle-pointer-send
[subs-id file-id point] [file-id point]
(ptk/reify ::handle-pointer-send (ptk/reify ::handle-pointer-send
ptk/WatchEvent ptk/WatchEvent
(watch [_ state _] (watch [_ state _]
(let [page-id (:current-page-id state) (let [page-id (:current-page-id state)
message {:type :pointer-update message {:type :pointer-update
:subs-id subs-id
:file-id file-id :file-id file-id
:page-id page-id :page-id page-id
:position point}] :position point}]

View file

@ -163,7 +163,9 @@
(rx/map #(shapes-changes-persisted file-id %))))))) (rx/map #(shapes-changes-persisted file-id %)))))))
(rx/catch (fn [cause] (rx/catch (fn [cause]
(rx/concat (rx/concat
(rx/of (rt/assign-exception cause)) (if (= :authentication (:type cause))
(rx/empty)
(rx/of (rt/assign-exception cause)))
(rx/throw cause)))))))))) (rx/throw cause))))))))))

View file

@ -106,7 +106,6 @@
(js/console.groupEnd msg))) (js/console.groupEnd msg)))
;; Error on parsing an SVG ;; Error on parsing an SVG
;; TODO: looks unused and deprecated ;; TODO: looks unused and deprecated
(defmethod ptk/handle-error :svg-parser (defmethod ptk/handle-error :svg-parser