Allow pluggable backends on msgbus module.

Prepare it to use different backends than redis.
This commit is contained in:
Andrey Antukh 2021-03-07 20:30:41 +01:00
parent e446f47e2c
commit c16a24a59a
2 changed files with 153 additions and 150 deletions

View file

@ -58,7 +58,9 @@
{} {}
:app.msgbus/msgbus :app.msgbus/msgbus
{:uri (:redis-uri config)} {:backend (:msgbus-backend config :redis)
:pool (ig/ref :app.db/pool)
:redis-uri (:redis-uri config)}
:app.tokens/tokens :app.tokens/tokens
{:sprops (ig/ref :app.setup/props)} {:sprops (ig/ref :app.setup/props)}

View file

@ -13,6 +13,7 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.config :as cfg] [app.config :as cfg]
[app.db :as db]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.core.async :as a] [clojure.core.async :as a]
@ -33,56 +34,36 @@
io.lettuce.core.pubsub.StatefulRedisPubSubConnection io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands)) io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands))
(declare impl-publish-loop) (s/def ::redis-uri ::us/string)
(declare impl-redis-pub)
(declare impl-redis-sub)
(declare impl-redis-unsub)
(declare impl-subscribe-loop)
;; --- STATE INIT: Publisher
(s/def ::uri ::us/string)
(s/def ::buffer-size ::us/integer) (s/def ::buffer-size ::us/integer)
(defmulti init-backend :backend)
(defmulti stop-backend :backend)
(defmulti init-pub-loop :backend)
(defmulti init-sub-loop :backend)
(defmethod ig/pre-init-spec ::msgbus [_] (defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::uri] (s/keys :req-un [::db/pool]
:opt-un [::buffer-size])) :opt-un [::buffer-size ::redis-uri]))
(defmethod ig/prep-key ::msgbus (defmethod ig/prep-key ::msgbus
[_ cfg] [_ cfg]
(merge {:buffer-size 128} cfg)) (merge {:buffer-size 128} cfg))
(defmethod ig/init-key ::msgbus (defmethod ig/init-key ::msgbus
[_ {:keys [uri buffer-size] :as cfg}] [_ {:keys [backend buffer-size] :as cfg}]
(let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) (log/debugf "initializing msgbus (backend=%s)" (name backend))
uri (RedisURI/create uri) (let [backend (init-backend cfg)
rclient (RedisClient/create ^RedisURI uri)
snd-conn (.connect ^RedisClient rclient ^RedisCodec codec)
rcv-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)
;; Channel used for receive publications from the application. ;; Channel used for receive publications from the application.
pub-chan (a/chan (a/dropping-buffer buffer-size)) pub-ch (a/chan (a/dropping-buffer buffer-size))
;; Channel used for receive data from redis
rcv-chan (a/chan (a/dropping-buffer buffer-size))
;; Channel used for receive subscription requests. ;; Channel used for receive subscription requests.
sub-chan (a/chan) sub-ch (a/chan)]
cch (a/chan 1)]
(.setTimeout ^StatefulRedisConnection snd-conn ^Duration (dt/duration {:seconds 10})) (init-pub-loop (assoc backend :ch pub-ch))
(.setTimeout ^StatefulRedisPubSubConnection rcv-conn ^Duration (dt/duration {:seconds 10})) (init-sub-loop (assoc backend :ch sub-ch))
(log/debugf "initializing msgbus (uri: '%s')" (str uri))
;; Start the sending (publishing) loop
(impl-publish-loop snd-conn pub-chan cch)
;; Start the receiving (subscribing) loop
(impl-subscribe-loop rcv-conn rcv-chan sub-chan cch)
(with-meta (with-meta
(fn run (fn run
@ -90,159 +71,179 @@
([command params] ([command params]
(a/go (a/go
(case command (case command
:pub (a/>! pub-chan params) :pub (a/>! pub-ch params)
:sub (a/>! sub-chan params))))) :sub (a/>! sub-ch params)))))
{::snd-conn snd-conn {::backend backend})))
::rcv-conn rcv-conn
::cch cch
::pub-chan pub-chan
::rcv-chan rcv-chan})))
(defmethod ig/halt-key! ::msgbus (defmethod ig/halt-key! ::msgbus
[_ f] [_ f]
(let [mdata (meta f)] (let [mdata (meta f)]
(.close ^StatefulRedisConnection (::snd-conn mdata)) (stop-backend (::backend mdata))))
(.close ^StatefulRedisPubSubConnection (::rcv-conn mdata))
(a/close! (::cch mdata))
(a/close! (::pub-chan mdata))
(a/close! (::rcv-chan mdata))))
(defn- impl-publish-loop
[conn pub-chan cch] ;; --- REDIS BACKEND IMPL
(let [rac (.async ^StatefulRedisConnection conn)]
(declare impl-redis-pub)
(declare impl-redis-sub)
(declare impl-redis-unsub)
(defmethod init-backend :redis
[{:keys [redis-uri] :as cfg}]
(let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)
uri (RedisURI/create redis-uri)
rclient (RedisClient/create ^RedisURI uri)
pub-conn (.connect ^RedisClient rclient ^RedisCodec codec)
sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)]
(.setTimeout ^StatefulRedisConnection pub-conn ^Duration (dt/duration {:seconds 10}))
(.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10}))
(-> cfg
(assoc :pub-conn pub-conn)
(assoc :sub-conn sub-conn)
(assoc :close-ch (a/chan 1)))))
(defmethod stop-backend :redis
[{:keys [pub-conn sub-conn close-ch] :as cfg}]
(.close ^StatefulRedisConnection pub-conn)
(.close ^StatefulRedisPubSubConnection sub-conn)
(a/close! close-ch))
(defmethod init-pub-loop :redis
[{:keys [pub-conn ch close-ch]}]
(let [rac (.async ^StatefulRedisConnection pub-conn)]
(a/go-loop [] (a/go-loop []
(let [[val _] (a/alts! [cch pub-chan] :priority true)] (let [[val _] (a/alts! [close-ch ch] :priority true)]
(when (some? val) (when (some? val)
(let [result (a/<! (impl-redis-pub rac val))] (let [result (a/<! (impl-redis-pub rac val))]
(when (ex/exception? result) (when (ex/exception? result)
(log/error result "unexpected error on publish message to redis"))) (log/error result "unexpected error on publish message to redis")))
(recur)))))) (recur))))))
(defn- impl-subscribe-loop (defmethod init-sub-loop :redis
[conn rcv-chan sub-chan cch] [{:keys [sub-conn ch close-ch buffer-size]}]
;; Add a unique listener to connection (let [rcv-ch (a/chan (a/dropping-buffer buffer-size))
(.addListener conn (reify RedisPubSubListener chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
(message [it pattern topic message])
(message [it topic message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(let [val {:topic topic :message (blob/decode message)}]
(when-not (a/offer! rcv-chan val)
(log/warn "dropping message on subscription loop"))))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it topic count])
(unsubscribed [it topic count])))
(let [chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
tprefix (str (cfg/get :tenant) ".") tprefix (str (cfg/get :tenant) ".")
rac (.async ^StatefulRedisPubSubConnection sub-conn)]
subscribe-to-single-topic ;; Add a unique listener to connection
(fn [nsubs topic chan] (.addListener sub-conn
(let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))] (reify RedisPubSubListener
(when (= 1 (count nsubs)) (message [it pattern topic message])
(let [result (a/<!! (impl-redis-sub conn topic))] (message [it topic message]
(log/tracef "opening subscription to %s" topic) ;; There are no back pressure, so we use a slidding
(when (ex/exception? result) ;; buffer for cases when the pubsub broker sends
(log/errorf result "unexpected exception on subscribing to '%s'" topic)))) ;; more messages that we can process.
nsubs)) (let [val {:topic topic :message (blob/decode message)}]
(when-not (a/offer! rcv-ch val)
(log/warn "dropping message on subscription loop"))))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it topic count])
(unsubscribed [it topic count])))
subscribe-to-topics (letfn [(subscribe-to-single-topic [nsubs topic chan]
(fn [state topics chan] (let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))]
(let [state (update state :chans assoc chan topics)] (when (= 1 (count nsubs))
(reduce (fn [state topic] (let [result (a/<!! (impl-redis-sub rac topic))]
(update-in state [:topics topic] subscribe-to-single-topic topic chan)) (log/tracef "opening subscription to %s" topic)
state (when (ex/exception? result)
topics))) (log/errorf result "unexpected exception on subscribing to '%s'" topic))))
nsubs))
unsubscribe-from-single-topic (subscribe-to-topics [state topics chan]
(fn [nsubs topic chan] (let [state (update state :chans assoc chan topics)]
(let [nsubs (disj nsubs chan)] (reduce (fn [state topic]
(when (empty? nsubs) (update-in state [:topics topic] subscribe-to-single-topic topic chan))
(let [result (a/<!! (impl-redis-unsub conn topic))] state
(log/tracef "closing subscription to %s" topic) topics)))
(when (ex/exception? result)
(log/errorf result "unexpected exception on unsubscribing from '%s'" topic))))
nsubs))
unsubscribe-channels (unsubscribe-from-single-topic [nsubs topic chan]
(fn [state pending] (let [nsubs (disj nsubs chan)]
(reduce (fn [state ch] (when (empty? nsubs)
(let [topics (get-in state [:chans ch]) (let [result (a/<!! (impl-redis-unsub rac topic))]
state (update state :chans dissoc ch)] (log/tracef "closing subscription to %s" topic)
(reduce (fn [state topic] (when (ex/exception? result)
(update-in state [:topics topic] unsubscribe-from-single-topic topic ch)) (log/errorf result "unexpected exception on unsubscribing from '%s'" topic))))
state nsubs))
topics)))
state
pending))]
;; Asynchronous subscription loop; terminates when sub-chan is (unsubscribe-channels [state pending]
;; closed. (reduce (fn [state ch]
(a/go-loop [] (let [topics (get-in state [:chans ch])
(when-let [{:keys [topics chan]} (a/<! sub-chan)] state (update state :chans dissoc ch)]
(let [topics (into #{} (map #(str tprefix %)) topics)] (reduce (fn [state topic]
(send-off chans subscribe-to-topics topics chan) (update-in state [:topics topic] unsubscribe-from-single-topic topic ch))
(recur)))) state
topics)))
state
pending))]
(a/go-loop [] ;; Asynchronous subscription loop;
(let [[val port] (a/alts! [cch rcv-chan])] (a/go-loop []
(cond (let [[val _] (a/alts! [close-ch ch])]
;; Stop condition; close all underlying subscriptions and (when-let [{:keys [topics chan]} val]
;; exit. The close operation is performed asynchronously. (let [topics (into #{} (map #(str tprefix %)) topics)]
(= port cch) (send-off chans subscribe-to-topics topics chan)
(send-off chans (fn [state] (recur)))))
(log/tracef "close")
(->> (vals state)
(mapcat identity)
(filter some?)
(run! a/close!))))
;; This means we receive data from redis and we need to (a/go-loop []
;; forward it to the underlying subscriptions. (let [[val port] (a/alts! [close-ch rcv-ch])]
(= port rcv-chan) (cond
(let [topic (:topic val) ; topic is already string ;; Stop condition; close all underlying subscriptions and
pending (loop [chans (seq (get-in @chans [:topics topic])) ;; exit. The close operation is performed asynchronously.
pending #{}] (= port close-ch)
(if-let [ch (first chans)] (send-off chans (fn [state]
(if (a/>! ch (:message val)) (log/tracef "close")
(recur (rest chans) pending) (->> (vals state)
(recur (rest chans) (conj pending ch))) (mapcat identity)
pending))] (filter some?)
;; (log/tracef "received message => pending: %s" (pr-str pending)) (run! a/close!))))
(some->> (seq pending)
(send-off chans unsubscribe-channels))
(recur))))))) ;; This means we receive data from redis and we need to
;; forward it to the underlying subscriptions.
(= port rcv-ch)
(let [topic (:topic val) ; topic is already string
pending (loop [chans (seq (get-in @chans [:topics topic]))
pending #{}]
(if-let [ch (first chans)]
(if (a/>! ch (:message val))
(recur (rest chans) pending)
(recur (rest chans) (conj pending ch)))
pending))]
;; (log/tracef "received message => pending: %s" (pr-str pending))
(some->> (seq pending)
(send-off chans unsubscribe-channels))
(recur))))))))
(defn- impl-redis-pub (defn- impl-redis-pub
[rac {:keys [topic message]}] [^RedisAsyncCommands rac {:keys [topic message]}]
(let [topic (str (cfg/get :tenant) "." topic) (let [topic (str (cfg/get :tenant) "." topic)
message (blob/encode message) message (blob/encode message)
res (a/chan 1)] res (a/chan 1)]
(-> (.publish ^RedisAsyncCommands rac ^String topic ^bytes message) (-> (.publish rac ^String topic ^bytes message)
(p/finally (fn [_ e] (p/finally (fn [_ e]
(when e (a/>!! res e)) (when e (a/>!! res e))
(a/close! res)))) (a/close! res))))
res)) res))
(defn impl-redis-sub (defn impl-redis-sub
[conn topic] [^RedisPubSubAsyncCommands rac topic]
(let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn) (let [res (a/chan 1)]
res (a/chan 1)] (-> (.subscribe rac (into-array String [topic]))
(-> (.subscribe cmd (into-array String [topic]))
(p/finally (fn [_ e] (p/finally (fn [_ e]
(when e (a/>!! res e)) (when e (a/>!! res e))
(a/close! res)))) (a/close! res))))
res)) res))
(defn impl-redis-unsub (defn impl-redis-unsub
[conn topic] [rac topic]
(let [^RedisPubSubAsyncCommands cmd (.async ^StatefulRedisPubSubConnection conn) (let [res (a/chan 1)]
res (a/chan 1)] (-> (.unsubscribe rac (into-array String [topic]))
(-> (.unsubscribe cmd (into-array String [topic]))
(p/finally (fn [_ e] (p/finally (fn [_ e]
(when e (a/>!! res e)) (when e (a/>!! res e))
(a/close! res)))) (a/close! res))))