Improve msgbus internal API

This commit is contained in:
Andrey Antukh 2022-09-05 19:00:20 +02:00
parent 9c68432936
commit a555e13b6a
7 changed files with 131 additions and 103 deletions

View file

@ -35,12 +35,12 @@
(declare ^:private redis-connect)
(declare ^:private redis-disconnect)
(declare ^:private start-io-loop)
(declare ^:private subscribe)
(declare ^:private purge)
(declare ^:private redis-pub)
(declare ^:private redis-sub)
(declare ^:private redis-unsub)
(declare ^:private start-io-loop!)
(declare ^:private subscribe-to-topics)
(declare ^:private unsubscribe-channels)
(defmethod ig/prep-key ::msgbus
[_ cfg]
@ -48,42 +48,68 @@
:timeout (dt/duration {:seconds 30})}
(d/without-nils cfg)))
(s/def ::cmd-ch ::aa/channel)
(s/def ::rcv-ch ::aa/channel)
(s/def ::pub-ch ::aa/channel)
(s/def ::state ::us/agent)
(s/def ::pconn ::redis/connection)
(s/def ::sconn ::redis/connection)
(s/def ::msgbus
(s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor]))
(s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor]))
(defmethod ig/init-key ::msgbus
[_ {:keys [buffer-size] :as cfg}]
[_ {:keys [buffer-size executor] :as cfg}]
(l/info :hint "initialize msgbus" :buffer-size buffer-size)
(let [cmd-ch (a/chan buffer-size)
rcv-ch (a/chan (a/dropping-buffer buffer-size))
pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic)
state (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent" ::l/async false))
cfg (-> (redis-connect cfg)
state (agent {})
msgbus (-> (redis-connect cfg)
(assoc ::cmd-ch cmd-ch)
(assoc ::rcv-ch rcv-ch)
(assoc ::pub-ch pub-ch)
(assoc ::state state))]
(assoc ::state state)
(assoc ::wrk/executor executor))]
(start-io-loop cfg)
(us/verify! ::msgbus msgbus)
(with-meta
(fn [& {:keys [cmd] :as params}]
(a/go
(case cmd
:pub (a/>! pub-ch params)
:sub (a/<! (subscribe cfg params))
:purge (a/<! (purge cfg params))
(l/error :hint "unexpeced error on msgbus command processing" :params params))))
cfg)))
(set-error-handler! state #(l/error :cause % :hint "unexpected error on agent" ::l/async false))
(set-error-mode! state :continue)
(start-io-loop! msgbus)
msgbus))
(defn sub!
[{:keys [::state ::wrk/executor] :as cfg} & {:keys [topic topics chan]}]
(let [done-ch (a/chan)
topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/debug :hint "subscribe" :topics topics)
(send-via executor state subscribe-to-topics cfg topics chan done-ch)
done-ch))
(defn pub!
[{::keys [pub-ch]} & {:as params}]
(a/go
(a/>! pub-ch params)))
(defn purge!
[{:keys [::state ::wrk/executor] :as msgbus} chans]
(l/trace :hint "purge" :chans (count chans))
(let [done-ch (a/chan)]
(send-via executor state unsubscribe-channels msgbus chans done-ch)
done-ch))
(defmethod ig/halt-key! ::msgbus
[_ f]
(let [mdata (meta f)]
(redis-disconnect mdata)
(a/close! (::cmd-ch mdata))
(a/close! (::rcv-ch mdata))))
[_ msgbus]
(redis-disconnect msgbus)
(a/close! (::cmd-ch msgbus))
(a/close! (::rcv-ch msgbus))
(a/close! (::pub-ch msgbus)))
;; --- IMPL
@ -91,9 +117,8 @@
[{:keys [timeout redis] :as cfg}]
(let [pconn (redis/connect redis :timeout timeout)
sconn (redis/connect redis :type :pubsub :timeout timeout)]
(-> cfg
(assoc ::pconn pconn)
(assoc ::sconn sconn))))
{::pconn pconn
::sconn sconn}))
(defn- redis-disconnect
[{:keys [::pconn ::sconn] :as cfg}]
@ -152,22 +177,6 @@
(aa/with-closing done-ch
(reduce #(unsubscribe-single-channel %1 cfg %2) state channels)))
(defn- subscribe
[{:keys [::state executor] :as cfg} {:keys [topic topics chan]}]
(let [done-ch (a/chan)
topics (into [] (map prefix-topic) (if topic [topic] topics))]
(l/debug :hint "subscribe" :topics topics)
(send-via executor state subscribe-to-topics cfg topics chan done-ch)
done-ch))
(defn- purge
[{:keys [::state executor] :as cfg} {:keys [chans]}]
(l/trace :hint "purge" :chans (count chans))
(let [done-ch (a/chan)]
(send-via executor state unsubscribe-channels cfg chans done-ch)
done-ch))
(defn- create-listener
[rcv-ch]
(redis/pubsub-listener
@ -179,8 +188,8 @@
(when-not (a/offer! rcv-ch val)
(l/warn :msg "dropping message on subscription loop"))))))
(defn start-io-loop
[{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}]
(defn start-io-loop!
[{:keys [::sconn ::rcv-ch ::pub-ch ::state ::wrk/executor] :as cfg}]
(redis/add-listener! sconn (create-listener rcv-ch))
(letfn [(send-to-topic [topic message]
(a/go-loop [chans (seq (get-in @state [:topics topic]))