Improve msgbus subscription handling.

This commit is contained in:
Andrey Antukh 2021-02-23 13:17:01 +01:00
parent 8fd37dbad5
commit 7e1ee087d3
4 changed files with 194 additions and 124 deletions

View file

@ -32,9 +32,9 @@
<AppenderRef ref="main" level="debug" /> <AppenderRef ref="main" level="debug" />
</Logger> </Logger>
<Logger name="app" level="debug" additivity="false"> <Logger name="app" level="trace" additivity="false">
<AppenderRef ref="main" level="debug" /> <AppenderRef ref="main" level="trace" />
<AppenderRef ref="zmq" level="debug" /> <AppenderRef ref="zmq" level="trace" />
</Logger> </Logger>
<Logger name="user" level="debug" additivity="false"> <Logger name="user" level="debug" additivity="false">

View file

@ -62,9 +62,14 @@
snd-conn (.connect ^RedisClient rclient ^RedisCodec codec) snd-conn (.connect ^RedisClient rclient ^RedisCodec codec)
rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec) rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)
pub-buff (a/chan (a/dropping-buffer buffer-size)) ;; Channel used for receive publications from the application.
rcv-buff (a/chan (a/dropping-buffer buffer-size)) pub-chan (a/chan (a/dropping-buffer buffer-size))
sub-buff (a/chan 1)
;; Channel used for receive data from redis
rcv-chan (a/chan (a/dropping-buffer buffer-size))
;; Channel used for receive subscription requests.
sub-chan (a/chan)
cch (a/chan 1)] cch (a/chan 1)]
(.setTimeout ^StatefulRedisConnection snd-conn ^Duration (dt/duration {:seconds 10})) (.setTimeout ^StatefulRedisConnection snd-conn ^Duration (dt/duration {:seconds 10}))
@ -73,10 +78,10 @@
(log/debugf "initializing msgbus (uri: '%s')" (str uri)) (log/debugf "initializing msgbus (uri: '%s')" (str uri))
;; Start the sending (publishing) loop ;; Start the sending (publishing) loop
(impl-publish-loop snd-conn pub-buff cch) (impl-publish-loop snd-conn pub-chan cch)
;; Start the receiving (subscribing) loop ;; Start the receiving (subscribing) loop
(impl-subscribe-loop rcv-conn rcv-buff sub-buff cch) (impl-subscribe-loop rcv-conn rcv-chan sub-chan cch)
(with-meta (with-meta
(fn run (fn run
@ -84,14 +89,14 @@
([command params] ([command params]
(a/go (a/go
(case command (case command
:pub (a/>! pub-buff params) :pub (a/>! pub-chan params)
:sub (a/>! sub-buff params))))) :sub (a/>! sub-chan params)))))
{::snd-conn snd-conn {::snd-conn snd-conn
::rcv-conn rcv-conn ::rcv-conn rcv-conn
::cch cch ::cch cch
::pub-buff pub-buff ::pub-chan pub-chan
::rcv-buff rcv-buff}))) ::rcv-chan rcv-chan})))
(defmethod ig/halt-key! ::msgbus (defmethod ig/halt-key! ::msgbus
[_ f] [_ f]
@ -99,14 +104,14 @@
(.close ^StatefulRedisConnection (::snd-conn mdata)) (.close ^StatefulRedisConnection (::snd-conn mdata))
(.close ^StatefulRedisPubSubConnection (::rcv-conn mdata)) (.close ^StatefulRedisPubSubConnection (::rcv-conn mdata))
(a/close! (::cch mdata)) (a/close! (::cch mdata))
(a/close! (::pub-buff mdata)) (a/close! (::pub-chan mdata))
(a/close! (::rcv-buff mdata)))) (a/close! (::rcv-chan mdata))))
(defn- impl-publish-loop (defn- impl-publish-loop
[conn pub-buff cch] [conn pub-chan cch]
(let [rac (.async ^StatefulRedisConnection conn)] (let [rac (.async ^StatefulRedisConnection conn)]
(a/go-loop [] (a/go-loop []
(let [[val _] (a/alts! [cch pub-buff] :priority true)] (let [[val _] (a/alts! [cch pub-chan] :priority true)]
(when (some? val) (when (some? val)
(let [result (a/<! (impl-redis-pub rac val))] (let [result (a/<! (impl-redis-pub rac val))]
(when (ex/exception? result) (when (ex/exception? result)
@ -114,7 +119,7 @@
(recur)))))) (recur))))))
(defn- impl-subscribe-loop (defn- impl-subscribe-loop
[conn rcv-buff sub-buff cch] [conn rcv-chan sub-chan cch]
;; Add a unique listener to connection ;; Add a unique listener to connection
(.addListener conn (reify RedisPubSubListener (.addListener conn (reify RedisPubSubListener
(message [it pattern topic message]) (message [it pattern topic message])
@ -122,48 +127,96 @@
;; There are no back pressure, so we use a slidding ;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends ;; buffer for cases when the pubsub broker sends
;; more messages that we can process. ;; more messages that we can process.
(a/>!! rcv-buff {:topic topic :message (blob/decode message)})) (let [val {:topic topic :message (blob/decode message)}]
(when-not (a/offer! rcv-chan val)
(log/warn "dropping message on subscription loop"))))
(psubscribed [it pattern count]) (psubscribed [it pattern count])
(punsubscribed [it pattern count]) (punsubscribed [it pattern count])
(subscribed [it topic count]) (subscribed [it topic count])
(unsubscribed [it topic count]))) (unsubscribed [it topic count])))
(a/go-loop [chans {}] (let [chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
(let [[val port] (a/alts! [sub-buff cch rcv-buff] :priority true)]
(cond
;; Stop condition; just do nothing
(= port cch)
nil
;; If we receive a message on sub-buff this means that a new subscribe-to-single-topic
;; subscription is requested by the notifications module. (fn [nsubs topic chan]
(= port sub-buff) (let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))]
(let [topic (:topic val) (when (= 1 (count nsubs))
output (:chan val) (let [result (a/<!! (impl-redis-sub conn topic))]
chans (update chans topic (fnil conj #{}) output)] (log/tracef "opening subscription to %s" topic)
(when (= 1 (count (get chans topic)))
(let [result (a/<! (impl-redis-sub conn topic))]
(when (ex/exception? result) (when (ex/exception? result)
(log/errorf result "unexpected exception on subscribing to '%s'" topic)))) (log/errorf result "unexpected exception on subscribing to '%s'" topic))))
(recur chans)) nsubs))
subscribe-to-topics
(fn [state topics chan]
(let [topics (into #{} (map str) topics)
state (update state :chans assoc chan topics)]
(reduce (fn [state topic]
(update-in state [:topics topic] subscribe-to-single-topic topic chan))
state
topics)))
unsubscribe-from-single-topic
(fn [nsubs topic chan]
;; (log/tracef "unsubscribe-from-single-topic %s | %s | %s" nsubs topic chan)
(let [nsubs (disj nsubs chan)]
(when (empty? nsubs)
(let [result (a/<!! (impl-redis-unsub conn topic))]
(log/tracef "closing subscription to %s" topic)
(when (ex/exception? result)
(log/errorf result "unexpected exception on unsubscribing from '%s'" topic))))
nsubs))
unsubscribe-channels
(fn [state pending]
;; (log/tracef "unsubscribe-channels %s" (pr-str pending))
(reduce (fn [state ch]
(let [topics (get-in state [:chans ch])
state (update state :chans dissoc ch)]
;; (log/tracef "unsubscribe-channels topics=%s" topics)
(reduce (fn [state topic]
(update-in state [:topics topic] unsubscribe-from-single-topic topic ch))
state
topics)))
state
pending))]
;; Asynchronous subscription loop; terminates when sub-chan is
;; closed.
(a/go-loop []
(when-let [{:keys [topics chan]} (a/<! sub-chan)]
(send-off chans subscribe-to-topics topics chan)
(recur)))
(a/go-loop []
(let [[val port] (a/alts! [cch rcv-chan])]
(cond
;; Stop condition; close all underlying subscriptions and
;; exit. The close operation is performed asynchronously.
(= port cch)
(send-off chans (fn [state]
(log/tracef "close")
(->> (vals state)
(mapcat identity)
(filter some?)
(run! a/close!))))
;; This means we receive data from redis and we need to ;; This means we receive data from redis and we need to
;; forward it to the underlying subscriptions. ;; forward it to the underlying subscriptions.
(= port rcv-buff) (= port rcv-chan)
(let [topic (:topic val) (let [topic (:topic val) ; topic is already string
pending (loop [chans (seq (get chans topic)) pending (loop [chans (seq (get-in @chans [:topics topic]))
pending #{}] pending #{}]
(if-let [ch (first chans)] (if-let [ch (first chans)]
(if (a/>! ch (:message val)) (if (a/>! ch (:message val))
(recur (rest chans) pending) (recur (rest chans) pending)
(recur (rest chans) (conj pending ch))) (recur (rest chans) (conj pending ch)))
pending)) pending))]
chans (update chans topic #(reduce disj % pending))] ;; (log/tracef "received message => pending: %s" (pr-str pending))
(when (empty? (get chans topic)) (some->> (seq pending)
(let [result (a/<! (impl-redis-unsub conn topic))] (send-off chans unsubscribe-channels))
(when (ex/exception? result)
(log/errorf result "unexpected exception on unsubscribing from '%s'" topic)))) (recur)))))))
(recur chans))))))
(defn- impl-redis-pub (defn- impl-redis-pub
[rac {:keys [topic message]}] [rac {:keys [topic message]}]

View file

@ -115,14 +115,11 @@
[conn id] [conn id]
(db/exec-one! conn [sql:retrieve-file id])) (db/exec-one! conn [sql:retrieve-file id]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; WebSocket Http Handler ;; --- WEBSOCKET INIT
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare handle-connect) (declare handle-connect)
(defrecord WebSocket [conn in out sub])
(defn- ws-send (defn- ws-send
[conn data] [conn data]
(try (try
@ -134,8 +131,8 @@
(defn websocket (defn websocket
[{:keys [file-id team-id msgbus executor] :as cfg}] [{:keys [file-id team-id msgbus executor] :as cfg}]
(let [in (a/chan (a/dropping-buffer 64)) (let [rcv-ch (a/chan 32)
out (a/chan (a/dropping-buffer 64)) out-ch (a/chan 32)
mtx-aconn (:mtx-active-connections cfg) mtx-aconn (:mtx-active-connections cfg)
mtx-messages (:mtx-messages cfg) mtx-messages (:mtx-messages cfg)
mtx-sessions (:mtx-sessions cfg) mtx-sessions (:mtx-sessions cfg)
@ -143,46 +140,51 @@
ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])] ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])]
(letfn [(on-connect [conn] (letfn [(on-connect [conn]
(log/debugf "on-connect %s" (:session-id cfg))
(mtx-aconn :inc) (mtx-aconn :inc)
;; A subscription channel should use a lossy buffer ;; A subscription channel should use a lossy buffer
;; because we can't penalize normal clients when one ;; because we can't penalize normal clients when one
;; slow client is connected to the room. ;; slow client is connected to the room.
(let [sub (a/chan (a/dropping-buffer 64)) (let [sub-ch (a/chan (a/dropping-buffer 128))
ws (WebSocket. conn in out sub nil cfg)] cfg (assoc cfg
:conn conn
:rcv-ch rcv-ch
:out-ch out-ch
:sub-ch sub-ch)]
;; Subscribe to corresponding topics (log/tracef "on-connect %s" (:session-id cfg))
(a/<!! (msgbus :sub {:topic (str file-id) :chan sub}))
(a/<!! (msgbus :sub {:topic (str team-id) :chan sub}))
;; message forwarding loop ;; Forward all messages from out-ch to the websocket
;; connection
(a/go-loop [] (a/go-loop []
(let [val (a/<! out)] (let [val (a/<! out-ch)]
(when (some? val) (when (some? val)
(when (a/<! (aa/thread-call executor #(ws-send conn (t/encode-str val)))) (when (a/<! (aa/thread-call executor #(ws-send conn (t/encode-str val))))
(recur))))) (recur)))))
(a/go (a/go
(a/<! (handle-connect ws)) ;; Subscribe to corresponding topics
(a/close! sub)))) (a/<! (msgbus :sub {:topics [file-id team-id] :chan sub-ch}))
(a/<! (handle-connect cfg))
(a/close! sub-ch))))
(on-error [_conn _e] (on-error [_conn e]
(log/debugf "on-error %s" (:session-id cfg))
(mtx-aconn :dec) (mtx-aconn :dec)
(mtx-sessions :observe (/ (inst-ms (dt/duration-between created-at (dt/now))) 1000.0)) (mtx-sessions :observe (/ (inst-ms (dt/duration-between created-at (dt/now))) 1000.0))
(a/close! out) (log/tracef "on-error %s (%s)" (:session-id cfg) (ex-message e))
(a/close! in)) (a/close! out-ch)
(a/close! rcv-ch))
(on-close [_conn _status _reason] (on-close [_conn _status _reason]
(log/debugf "on-close %s" (:session-id cfg))
(mtx-aconn :dec) (mtx-aconn :dec)
(mtx-sessions :observe (/ (inst-ms (dt/duration-between created-at (dt/now))) 1000.0)) (mtx-sessions :observe (/ (inst-ms (dt/duration-between created-at (dt/now))) 1000.0))
(a/close! out) (log/tracef "on-close %s" (:session-id cfg))
(a/close! in)) (a/close! out-ch)
(a/close! rcv-ch))
(on-message [_ws message] (on-message [_ws message]
(let [message (t/decode-str message)] (let [message (t/decode-str message)]
(a/>!! in message)))] (when-not (a/offer! rcv-ch message)
(log/warn "droping ws input message, channe full"))))]
{:on-connect on-connect {:on-connect on-connect
:on-error on-error :on-error on-error
@ -190,16 +192,18 @@
:on-text (mtx/wrap-counter on-message mtx-messages ["recv"]) :on-text (mtx/wrap-counter on-message mtx-messages ["recv"])
:on-bytes (constantly nil)}))) :on-bytes (constantly nil)})))
;; --- CONNECTION INIT
(declare handle-message) (declare handle-message)
(declare start-loop!) (declare start-loop!)
(defn- handle-connect (defn- handle-connect
[{:keys [conn] :as ws}] [{:keys [conn] :as cfg}]
(a/go (a/go
(try (try
(aa/<? (handle-message ws {:type :connect})) (aa/<? (handle-message cfg {:type :connect}))
(aa/<? (start-loop! ws)) (aa/<? (start-loop! cfg))
(aa/<? (handle-message ws {:type :disconnect})) (aa/<? (handle-message cfg {:type :disconnect}))
(catch Throwable err (catch Throwable err
(log/errorf err "unexpected exception on websocket handler") (log/errorf err "unexpected exception on websocket handler")
(let [session (.getSession ^WebSocketAdapter conn)] (let [session (.getSession ^WebSocketAdapter conn)]
@ -207,36 +211,39 @@
(.disconnect session))))))) (.disconnect session)))))))
(defn- start-loop! (defn- start-loop!
[{:keys [in out sub session-id] :as ws}] [{:keys [rcv-ch out-ch sub-ch session-id] :as cfg}]
(aa/go-try (aa/go-try
(loop [] (loop []
(let [timeout (a/timeout 30000) (let [timeout (a/timeout 30000)
[val port] (a/alts! [in sub timeout])] [val port] (a/alts! [rcv-ch sub-ch timeout])]
(cond (cond
;; Process message coming from connected client ;; Process message coming from connected client
(and (= port in) (not (nil? val))) (and (= port rcv-ch) (some? val))
(do (do
(aa/<? (handle-message ws val)) (aa/<? (handle-message cfg val))
(recur)) (recur))
;; Forward message to the websocket ;; If message comes from subscription channel; we just need
(and (= port sub) (not (nil? val))) ;; to foreward it to the output channel.
(and (= port sub-ch) (some? val))
(do (do
(when-not (= (:session-id val) session-id) (when-not (= (:session-id val) session-id)
(a/>! out val)) (a/>! out-ch val))
(recur)) (recur))
;; Timeout channel signaling ;; When timeout channel is signaled, we need to send a ping
;; message to the output channel. TODO: we need to make this
;; more smart.
(= port timeout) (= port timeout)
(do (do
(a/>! out {:type :ping}) (a/>! out-ch {:type :ping})
(recur)) (recur))
:else :else
nil))))) nil)))))
;; Incoming Messages Handling ;; --- PRESENCE HANDLING API
(def ^:private (def ^:private
sql:retrieve-presence sql:retrieve-presence
@ -244,12 +251,6 @@
where file_id=? where file_id=?
and (clock_timestamp() - updated_at) < '5 min'::interval") and (clock_timestamp() - updated_at) < '5 min'::interval")
(defn- retrieve-presence
[pool file-id]
(aa/thread-try
(let [rows (db/exec! pool [sql:retrieve-presence file-id])]
(mapv (juxt :session-id :profile-id) rows))))
(def ^:private (def ^:private
sql:update-presence sql:update-presence
"insert into presence (file_id, session_id, profile_id, updated_at) "insert into presence (file_id, session_id, profile_id, updated_at)
@ -257,49 +258,66 @@
on conflict (file_id, session_id, profile_id) on conflict (file_id, session_id, profile_id)
do update set updated_at=clock_timestamp()") do update set updated_at=clock_timestamp()")
(defn- retrieve-presence
[{:keys [pool file-id] :as cfg}]
(let [rows (db/exec! pool [sql:retrieve-presence file-id])]
(mapv (juxt :session-id :profile-id) rows)))
(defn- retrieve-presence*
[{:keys [executor] :as cfg}]
(aa/with-thread executor
(retrieve-presence cfg)))
(defn- update-presence (defn- update-presence
[conn file-id session-id profile-id] [{:keys [pool file-id session-id profile-id] :as cfg}]
(aa/thread-try
(let [sql [sql:update-presence file-id session-id profile-id]] (let [sql [sql:update-presence file-id session-id profile-id]]
(db/exec-one! conn sql)))) (db/exec-one! pool sql)))
(defn- update-presence*
[{:keys [executor] :as cfg}]
(aa/with-thread executor
(update-presence cfg)))
(defn- delete-presence (defn- delete-presence
[pool file-id session-id profile-id] [{:keys [pool file-id session-id profile-id] :as cfg}]
(aa/thread-try
(db/delete! pool :presence {:file-id file-id (db/delete! pool :presence {:file-id file-id
:profile-id profile-id :profile-id profile-id
:session-id session-id}))) :session-id session-id}))
(defn- delete-presence*
[{:keys [executor] :as cfg}]
(aa/with-thread executor
(delete-presence cfg)))
;; --- INCOMING MSG PROCESSING
(defmulti handle-message (defmulti handle-message
(fn [_ message] (:type message))) (fn [_ message] (:type message)))
;; TODO: check permissions for join a file-id channel (probably using
;; single use token for avoid explicit database query).
(defmethod handle-message :connect (defmethod handle-message :connect
[{:keys [file-id profile-id session-id pool msgbus] :as ws} _message] [{:keys [file-id msgbus] :as cfg} _message]
;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id) ;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
(aa/go-try (aa/go-try
(aa/<? (update-presence pool file-id session-id profile-id)) (aa/<? (update-presence* cfg))
(let [members (aa/<? (retrieve-presence pool file-id))] (let [members (aa/<? (retrieve-presence* cfg))
(a/<! (msgbus :pub {:topic file-id val {:topic file-id :message {:type :presence :sessions members}}]
:message {:type :presence :sessions members}}))))) (a/<! (msgbus :pub val)))))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
[{:keys [profile-id file-id session-id pool msgbus] :as ws} _message] [{:keys [file-id msgbus] :as cfg} _message]
;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id) ;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id)
(aa/go-try (aa/go-try
(aa/<? (delete-presence pool file-id session-id profile-id)) (aa/<? (delete-presence* cfg))
(let [members (aa/<? (retrieve-presence pool file-id))] (let [members (aa/<? (retrieve-presence* cfg))
(a/<! (msgbus :pub {:topic file-id val {:topic file-id :message {:type :presence :sessions members}}]
:message {:type :presence :sessions members}}))))) (a/<! (msgbus :pub val)))))
(defmethod handle-message :keepalive (defmethod handle-message :keepalive
[{:keys [profile-id file-id session-id pool] :as ws} _message] [cfg _message]
(update-presence pool file-id session-id profile-id)) (update-presence* cfg))
(defmethod handle-message :pointer-update (defmethod handle-message :pointer-update
[{:keys [profile-id file-id session-id msgbus] :as ws} message] [{:keys [profile-id file-id session-id msgbus] :as cfg} message]
(let [message (assoc message (let [message (assoc message
:profile-id profile-id :profile-id profile-id
:session-id session-id)] :session-id session-id)]

View file

@ -46,8 +46,7 @@
(fn [] (fn []
(try (try
(let [ret (try (f) (catch Exception e e))] (let [ret (try (f) (catch Exception e e))]
(when-not (nil? ret) (when (some? ret) (a/>!! c ret)))
(a/>!! c ret)))
(finally (finally
(a/close! c))))) (a/close! c)))))
c c