Minor improvements on redis subscription management.

This commit is contained in:
Andrey Antukh 2020-04-28 08:12:04 +02:00 committed by Alonso Torres
parent 8b6f72a09b
commit 9b3879b3cf
2 changed files with 19 additions and 21 deletions

View file

@ -47,26 +47,22 @@
(defn- impl-subscribe
[^String topic ^StatefulRedisPubSubConnection conn]
(let [cmd (.async conn)
output (a/chan 1 (filter string?))
buffer (a/chan (a/sliding-buffer 64))
listener (reify RedisPubSubListener
(message [it pattern channel message])
(message [it channel message]
;; There are no back pressure, so we use a
;; slidding buffer for cases when the pubsub
;; broker sends more messages that we can
;; process.
(a/put! buffer message))
(psubscribed [it pattern count]
#_(prn "psubscribed" pattern count))
(punsubscribed [it pattern count]
#_(prn "punsubscribed" pattern count))
(subscribed [it channel count]
#_(prn "subscribed" channel count))
(unsubscribed [it channel count]
#_(prn "unsubscribed" channel count)))]
(.addListener conn listener)
(let [cmd (.async conn)
output (a/chan 1 (filter string?))
buffer (a/chan (a/sliding-buffer 64))
sub (reify RedisPubSubListener
(message [it pattern channel message])
(message [it channel message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(a/put! buffer message))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it channel count])
(unsubscribed [it channel count]))]
(.addListener conn sub)
(a/go-loop []
(let [[val port] (a/alts! [buffer (a/timeout 5000)])
message (if (= port buffer) val ::keepalive)]
@ -74,8 +70,10 @@
(recur)
(do
(a/close! buffer)
(.removeListener conn sub)
(when (.isOpen conn)
(.close conn))))))
(-> (.subscribe ^RedisPubSubAsyncCommands cmd (into-array String [topic]))
(p/then' (constantly output)))))