♻️ Refactor logging.

This commit is contained in:
Andrey Antukh 2021-04-06 23:25:34 +02:00
parent 44d64e4831
commit e12a6e65a6
33 changed files with 383 additions and 193 deletions

View file

@ -14,10 +14,10 @@
[app.common.spec :as us]
[app.config :as cfg]
[app.util.blob :as blob]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]
[promesa.core :as p])
(:import
@ -60,7 +60,8 @@
(defmethod ig/init-key ::msgbus
[_ {:keys [backend buffer-size] :as cfg}]
(log/debugf "initializing msgbus (backend=%s)" (name backend))
(l/debug :action "initialize msgbus"
:backend (name backend))
(let [cfg (init-backend cfg)
;; Channel used for receive publications from the application.
@ -165,13 +166,14 @@
(when-let [val (a/<! pub-ch)]
(let [result (a/<! (impl-redis-pub rac val))]
(when (ex/exception? result)
(log/error result "unexpected error on publish message to redis")))
(l/error :cause result
:hint "unexpected error on publish message to redis")))
(recur)))))
(defmethod init-sub-loop :redis
[{:keys [::sub-conn ::sub-ch buffer-size]}]
(let [rcv-ch (a/chan (a/dropping-buffer buffer-size))
chans (agent {} :error-handler #(log/error % "unexpected error on agent"))
chans (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent"))
rac (.async ^StatefulRedisPubSubConnection sub-conn)]
;; Add a unique listener to connection
@ -184,7 +186,7 @@
;; more messages that we can process.
(let [val {:topic topic :message (blob/decode message)}]
(when-not (a/offer! rcv-ch val)
(log/warn "dropping message on subscription loop"))))
(l/warn :msg "dropping message on subscription loop"))))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it topic count])
@ -194,9 +196,12 @@
(let [nsubs (if (nil? nsubs) #{chan} (conj nsubs chan))]
(when (= 1 (count nsubs))
(let [result (a/<!! (impl-redis-sub rac topic))]
(log/tracef "opening subscription to %s" topic)
(l/trace :action "open subscription"
:topic topic)
(when (ex/exception? result)
(log/errorf result "unexpected exception on subscribing to '%s'" topic))))
(l/error :cause result
:hint "unexpected exception on subscribing"
:topic topic))))
nsubs))
(subscribe-to-topics [state topics chan]
@ -210,9 +215,12 @@
(let [nsubs (disj nsubs chan)]
(when (empty? nsubs)
(let [result (a/<!! (impl-redis-unsub rac topic))]
(log/tracef "closing subscription to %s" topic)
(l/trace :action "close subscription"
:topic topic)
(when (ex/exception? result)
(log/errorf result "unexpected exception on unsubscribing from '%s'" topic))))
(l/error :cause result
:hint "unexpected exception on unsubscribing"
:topic topic))))
nsubs))
(unsubscribe-channels [state pending]
@ -246,7 +254,6 @@
(recur (rest chans) pending)
(recur (rest chans) (conj pending ch)))
pending))]
;; (log/tracef "received message => pending: %s" (pr-str pending))
(some->> (seq pending)
(send-off chans unsubscribe-channels))