diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 83207bc93..f93d787f4 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -183,10 +183,11 @@ :svgc (ig/ref :app.svgparse/svgc)} :app.notifications/handler - {:msgbus (ig/ref :app.msgbus/msgbus) - :pool (ig/ref :app.db/pool) - :session (ig/ref :app.http.session/session) - :metrics (ig/ref :app.metrics/metrics)} + {:msgbus (ig/ref :app.msgbus/msgbus) + :pool (ig/ref :app.db/pool) + :session (ig/ref :app.http.session/session) + :metrics (ig/ref :app.metrics/metrics) + :executor (ig/ref :app.worker/executor)} :app.worker/executor {:name "worker"} diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 791604ce3..9aca801a3 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -12,12 +12,14 @@ (:require [app.common.spec :as us] [app.util.blob :as blob] + [app.util.time :as dt] [clojure.core.async :as a] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig] [promesa.core :as p]) (:import + java.time.Duration io.lettuce.core.RedisClient io.lettuce.core.RedisURI io.lettuce.core.api.StatefulRedisConnection @@ -59,15 +61,18 @@ snd-conn (.connect ^RedisClient rclient ^RedisCodec codec) rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec) - snd-buff (a/chan (a/sliding-buffer buffer-size)) + pub-buff (a/chan (a/sliding-buffer buffer-size)) rcv-buff (a/chan (a/sliding-buffer buffer-size)) sub-buff (a/chan 1) cch (a/chan 1)] + (.setTimeout ^StatefulRedisConnection snd-conn ^Duration (dt/duration {:seconds 10})) + (.setTimeout ^StatefulRedisPubSubConnection rcv-conn ^Duration (dt/duration {:seconds 10})) + (log/debugf "initializing msgbus (uri: '%s')" (str uri)) ;; Start the sending (publishing) loop - (impl-publish-loop snd-conn snd-buff cch) + (impl-publish-loop snd-conn pub-buff cch) ;; Start the receiving (subscribing) loop (impl-subscribe-loop rcv-conn rcv-buff sub-buff cch) @@ -78,13 +83,13 @@ ([command params] (a/go (case command - :pub (a/>! snd-buff params) + :pub (a/>! pub-buff params) :sub (a/>! sub-buff params))))) {::snd-conn snd-conn ::rcv-conn rcv-conn ::cch cch - ::snd-buff snd-buff + ::pub-buff pub-buff ::rcv-buff rcv-buff}))) (defmethod ig/halt-key! ::msgbus @@ -93,25 +98,14 @@ (.close ^StatefulRedisConnection (::snd-conn mdata)) (.close ^StatefulRedisPubSubConnection (::rcv-conn mdata)) (a/close! (::cch mdata)) - (a/close! (::snd-buff mdata)) + (a/close! (::pub-buff mdata)) (a/close! (::rcv-buff mdata)))) -(defn- impl-redis-pub - [rac {:keys [topic message]}] - (let [topic (str topic) - message (blob/encode message) - res (a/chan 1)] - (-> (.publish ^RedisAsyncCommands rac ^String topic ^bytes message) - (p/finally (fn [_ e] - (when e (a/>!! res e)) - (a/close! res)))) - res)) - (defn- impl-publish-loop - [conn in-buff cch] + [conn rcv-buff cch] (let [rac (.async ^StatefulRedisConnection conn)] (a/go-loop [] - (let [[val _] (a/alts! [in-buff cch])] + (let [[val _] (a/alts! [rcv-buff cch])] (when (some? val) (let [result (a/!! rcv-buff {:topic topic :message (blob/decode message)})) (psubscribed [it pattern count]) (punsubscribed [it pattern count]) (subscribed [it topic count]) (unsubscribed [it topic count]))) (a/go-loop [chans {}] - (let [[val port] (a/alts! [sub-buff cch in-buff] :priority true)] + (let [[val port] (a/alts! [sub-buff cch rcv-buff] :priority true)] (cond ;; Stop condition; just do nothing (= port cch) @@ -150,7 +144,7 @@ ;; This means we receive data from redis and we need to ;; forward it to the underlying subscriptions. - (= port in-buff) + (= port rcv-buff) (let [topic (:topic val) pending (loop [chans (seq (get chans topic)) pending #{}] @@ -164,6 +158,16 @@ (a/ (.publish ^RedisAsyncCommands rac ^String topic ^bytes message) + (p/finally (fn [_ e] + (when e (a/>!! res e)) + (a/close! res)))) + res)) (defn impl-redis-sub [conn topic] @@ -175,7 +179,6 @@ (a/close! res)))) res)) - (defn impl-redis-unsub [conn topic] (let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn) diff --git a/backend/src/app/notifications.clj b/backend/src/app/notifications.clj index 6dae8dab7..36daf02e6 100644 --- a/backend/src/app/notifications.clj +++ b/backend/src/app/notifications.clj @@ -16,6 +16,7 @@ [app.util.async :as aa] [app.util.time :as dt] [app.util.transit :as t] + [app.worker :as wrk] [clojure.core.async :as a] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] @@ -39,7 +40,7 @@ (s/def ::msgbus fn?) (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::msgbus ::db/pool ::session ::mtx/metrics])) + (s/keys :req-un [::msgbus ::db/pool ::session ::mtx/metrics ::wrk/executor])) (defmethod ig/init-key ::handler [_ {:keys [session metrics] :as cfg}] @@ -132,20 +133,22 @@ false))) (defn websocket - [{:keys [file-id team-id msgbus] :as cfg}] - (let [in (a/chan 32) - out (a/chan 32) + [{:keys [file-id team-id msgbus executor] :as cfg}] + (let [in (a/chan (a/dropping-buffer 64)) + out (a/chan (a/dropping-buffer 64)) mtx-aconn (:mtx-active-connections cfg) mtx-messages (:mtx-messages cfg) mtx-sessions (:mtx-sessions cfg) created-at (dt/now) - ws-send (mtx/wrap-counter ws-send mtx-messages ["send"])] (letfn [(on-connect [conn] (log/debugf "on-connect %s" (:session-id cfg)) (mtx-aconn :inc) - (let [sub (a/chan) + ;; A subscription channel should use a lossy buffer + ;; because we can't penalize normal clients when one + ;; slow client is connected to the room. + (let [sub (a/chan (a/dropping-buffer 64)) ws (WebSocket. conn in out sub nil cfg)] ;; Subscribe to corresponding topics @@ -155,8 +158,8 @@ ;; message forwarding loop (a/go-loop [] (let [val (a/