diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 9aca801a3..22d36d834 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -10,6 +10,7 @@ (ns app.msgbus "The msgbus abstraction implemented using redis as underlying backend." (:require + [app.common.exceptions :as ex] [app.common.spec :as us] [app.util.blob :as blob] [app.util.time :as dt] @@ -61,8 +62,8 @@ snd-conn (.connect ^RedisClient rclient ^RedisCodec codec) rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec) - pub-buff (a/chan (a/sliding-buffer buffer-size)) - rcv-buff (a/chan (a/sliding-buffer buffer-size)) + pub-buff (a/chan (a/dropping-buffer buffer-size)) + rcv-buff (a/chan (a/dropping-buffer buffer-size)) sub-buff (a/chan 1) cch (a/chan 1)] @@ -102,15 +103,15 @@ (a/close! (::rcv-buff mdata)))) (defn- impl-publish-loop - [conn rcv-buff cch] + [conn pub-buff cch] (let [rac (.async ^StatefulRedisConnection conn)] (a/go-loop [] - (let [[val _] (a/alts! [rcv-buff cch])] + (let [[val _] (a/alts! [cch pub-buff] :priority true)] (when (some? val) (let [result (a/