🎉 Add profile activity registry logger.

This commit is contained in:
Andrey Antukh 2021-04-07 17:45:22 +02:00
parent c40d9d9a7c
commit 8daf6e822e
6 changed files with 70 additions and 56 deletions

View file

@ -90,10 +90,9 @@
(defmethod ig/init-key ::worker
[_ {:keys [pool poll-interval name queue] :as cfg}]
(l/log :level :info
:msg "start"
:name (d/name name)
:queue (d/name queue))
(l/info :action "start worker"
:name (d/name name)
:queue (d/name queue))
(let [close-ch (a/chan 1)
poll-ms (inst-ms poll-interval)]
(a/go-loop []
@ -102,32 +101,31 @@
;; Terminate the loop if close channel is closed or
;; event-loop-fn returns nil.
(or (= port close-ch) (nil? val))
(l/log :level :debug :msg "stop condition found")
(l/debug :msg "stop condition found")
(db/pool-closed? pool)
(do
(l/log :level :debug :msg "eventloop aboirted because pool is closed")
(l/debug :msg "eventloop aborted because pool is closed")
(a/close! close-ch))
(and (instance? java.sql.SQLException val)
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState ^java.sql.SQLException val)))
(do
(l/log :level :error :hint "connection error, trying resume in some instants")
(l/error :hint "connection error, trying resume in some instants")
(a/<! (a/timeout poll-interval))
(recur))
(and (instance? java.sql.SQLException val)
(= "40001" (.getSQLState ^java.sql.SQLException val)))
(do
(l/log :level :debug :msg "serialization failure (retrying in some instants)")
(l/debug :msg "serialization failure (retrying in some instants)")
(a/<! (a/timeout poll-ms))
(recur))
(instance? Exception val)
(do
(l/log :level :error
:cause val
:hint "unexpected error ocurried on polling the database (will resume in some instants)")
(l/error :cause val
:hint "unexpected error ocurried on polling the database (will resume in some instants)")
(a/<! (a/timeout poll-ms))
(recur))
@ -185,15 +183,14 @@
interval (db/interval duration)
props (-> options extract-props db/tjson)
id (uuid/next)]
(l/log :level :debug
:action "submit task"
:name (d/name task)
:in duration)
(l/debug :action "submit task"
:name (d/name task)
:in duration)
(db/exec-one! conn [sql:insert-new-task id (d/name task) props (d/name queue) priority max-retries interval])
id))
;; --- RUNNER
;; --- RUNNER
(def ^:private
sql:mark-as-retry
@ -249,9 +246,8 @@
(let [task-fn (get tasks name)]
(if task-fn
(task-fn item)
(l/log :level :warn
:msg "no task handler found"
:name (d/name name)))
(l/warn :msg "no task handler found"
:name (d/name name)))
{:status :completed :task item}))
(defn get-error-context
@ -276,10 +272,9 @@
(let [cdata (get-error-context error item)]
(l/update-thread-context! cdata)
(l/log :level :error
:cause error
:hint "unhandled exception on task"
:id (:id cdata))
(l/error :cause error
:hint "unhandled exception on task"
:id (:id cdata))
(if (>= (:retry-num item) (:max-retries item))
{:status :failed :task item :error error}
@ -289,21 +284,19 @@
[{:keys [tasks]} item]
(let [name (d/name (:name item))]
(try
(l/log :level :debug
:action "start task"
:name name
:id (:id item)
:retry (:retry-num item))
(l/debug :action "start task"
:name name
:id (:id item)
:retry (:retry-num item))
(handle-task tasks item)
(catch Exception e
(handle-exception e item))
(finally
(l/log :level :debug
:action "start task"
:name name
:id (:id item)
:retry (:retry-num item))))))
(l/debug :action "end task"
:name name
:id (:id item)
:retry (:retry-num item))))))
(def sql:select-next-tasks
"select * from task as t
@ -407,12 +400,8 @@
(defn- synchronize-schedule-item
[conn {:keys [id cron]}]
(let [cron (str cron)
id (name id)]
(l/log :level :debug
:action "initialize scheduled task"
:id id
:cron cron)
(let [cron (str cron)]
(l/debug :action "initialize scheduled task" :id id :cron cron)
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
(defn- synchronize-schedule
@ -433,9 +422,7 @@
(letfn [(run-task [conn]
(try
(when (db/exec-one! conn [sql:lock-scheduled-task (d/name id)])
(l/log :level :debug
:action "execute scheduled task"
:id id)
(l/debug :action "execute scheduled task" :id id)
((:fn task) task))
(catch Throwable e
e)))
@ -444,10 +431,9 @@
(db/with-atomic [conn pool]
(let [result (run-task conn)]
(when (ex/exception? result)
(l/log :level :error
:cause result
:hint "unhandled exception on scheduled task"
:id id)))))]
(l/error :cause result
:hint "unhandled exception on scheduled task"
:id id)))))]
(try
(px/run! executor handle-task)
@ -520,9 +506,7 @@
:help "Background task execution timing."})]
(reduce-kv (fn [res k v]
(let [tname (name k)]
(l/log :level :debug
:action "register task"
:name tname)
(l/debug :action "register task" :name tname)
(assoc res k (mtx/wrap-summary v mobj [tname]))))
{}
tasks)))