From 9b3879b3cf21e5d4fc18aada27e48d31f0b6b982 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 28 Apr 2020 08:12:04 +0200 Subject: [PATCH] :sparkles: Minor improvements on redis subscription management. --- backend/src/uxbox/util/redis.clj | 38 +++++++++++++++----------------- backend/vendor/vertx/deps.edn | 2 +- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/backend/src/uxbox/util/redis.clj b/backend/src/uxbox/util/redis.clj index 8f85f1b5b..af59bf98f 100644 --- a/backend/src/uxbox/util/redis.clj +++ b/backend/src/uxbox/util/redis.clj @@ -47,26 +47,22 @@ (defn- impl-subscribe [^String topic ^StatefulRedisPubSubConnection conn] - (let [cmd (.async conn) - output (a/chan 1 (filter string?)) - buffer (a/chan (a/sliding-buffer 64)) - listener (reify RedisPubSubListener - (message [it pattern channel message]) - (message [it channel message] - ;; There are no back pressure, so we use a - ;; slidding buffer for cases when the pubsub - ;; broker sends more messages that we can - ;; process. - (a/put! buffer message)) - (psubscribed [it pattern count] - #_(prn "psubscribed" pattern count)) - (punsubscribed [it pattern count] - #_(prn "punsubscribed" pattern count)) - (subscribed [it channel count] - #_(prn "subscribed" channel count)) - (unsubscribed [it channel count] - #_(prn "unsubscribed" channel count)))] - (.addListener conn listener) + (let [cmd (.async conn) + output (a/chan 1 (filter string?)) + buffer (a/chan (a/sliding-buffer 64)) + sub (reify RedisPubSubListener + (message [it pattern channel message]) + (message [it channel message] + ;; There are no back pressure, so we use a slidding + ;; buffer for cases when the pubsub broker sends + ;; more messages that we can process. + (a/put! buffer message)) + (psubscribed [it pattern count]) + (punsubscribed [it pattern count]) + (subscribed [it channel count]) + (unsubscribed [it channel count]))] + (.addListener conn sub) + (a/go-loop [] (let [[val port] (a/alts! [buffer (a/timeout 5000)]) message (if (= port buffer) val ::keepalive)] @@ -74,8 +70,10 @@ (recur) (do (a/close! buffer) + (.removeListener conn sub) (when (.isOpen conn) (.close conn)))))) + (-> (.subscribe ^RedisPubSubAsyncCommands cmd (into-array String [topic])) (p/then' (constantly output))))) diff --git a/backend/vendor/vertx/deps.edn b/backend/vendor/vertx/deps.edn index ab120edb6..31b8d84f0 100644 --- a/backend/vendor/vertx/deps.edn +++ b/backend/vendor/vertx/deps.edn @@ -2,7 +2,7 @@ {org.clojure/tools.logging {:mvn/version "0.5.0"} funcool/promesa {:mvn/version "5.0.0"} metosin/reitit-core {:mvn/version "0.3.10"} - org.clojure/core.async {:mvn/version "0.7.559"} + org.clojure/core.async {:mvn/version "1.1.587"} io.vertx/vertx-core {:mvn/version "4.0.0-milestone4"} io.vertx/vertx-web {:mvn/version "4.0.0-milestone4"} io.vertx/vertx-web-client {:mvn/version "4.0.0-milestone4"}}