Add better logging of elapsed time for cron tasks

This commit is contained in:
Andrey Antukh 2024-04-02 14:01:42 +02:00 committed by Andrés Moya
parent cfb5e9aa66
commit 9c9d09a816

View file

@ -69,8 +69,8 @@
;; TASKS REGISTRY ;; TASKS REGISTRY
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- wrap-task-handler (defn- wrap-with-metrics
[metrics tname f] [f metrics tname]
(let [labels (into-array String [tname])] (let [labels (into-array String [tname])]
(fn [params] (fn [params]
(let [tp (dt/tpoint)] (let [tp (dt/tpoint)]
@ -90,10 +90,10 @@
(defmethod ig/init-key ::registry (defmethod ig/init-key ::registry
[_ {:keys [::mtx/metrics ::tasks]}] [_ {:keys [::mtx/metrics ::tasks]}]
(l/inf :hint "registry initialized" :tasks (count tasks)) (l/inf :hint "registry initialized" :tasks (count tasks))
(reduce-kv (fn [registry k v] (reduce-kv (fn [registry k f]
(let [tname (name k)] (let [tname (name k)]
(l/trc :hint "register task" :name tname) (l/trc :hint "register task" :name tname)
(assoc registry tname (wrap-task-handler metrics tname v)))) (assoc registry tname (wrap-with-metrics f metrics tname))))
{} {}
tasks)) tasks))
@ -157,7 +157,7 @@
(px/interrupt! thread)) (px/interrupt! thread))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SCHEDULER ;; DISPATCHER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- decode-task-row (defn- decode-task-row
@ -252,7 +252,7 @@
(if (db/read-only? pool) (if (db/read-only? pool)
(l/wrn :hint "dispatcher: not started (db is read-only)") (l/wrn :hint "dispatcher: not started (db is read-only)")
(px/fn->thread dispatcher :name "penpot/worker/dispatcher" :virtual true)))) (px/fn->thread dispatcher :name "penpot/worker/dispatcher" :virtual false))))
(defmethod ig/halt-key! ::dispatcher (defmethod ig/halt-key! ::dispatcher
[_ thread] [_ thread]
@ -579,29 +579,34 @@
(some? (db/exec-one! conn [sql (d/name id)])))) (some? (db/exec-one! conn [sql (d/name id)]))))
(defn- execute-cron-task (defn- execute-cron-task
[{:keys [::db/pool] :as cfg} {:keys [id] :as task}] [cfg {:keys [id] :as task}]
(px/thread (px/thread
{:name (str "penpot/cront-task/" id)} {:name (str "penpot/cron-task/" id)}
(let [tpoint (dt/tpoint)]
(try (try
(db/with-atomic [conn pool] (db/tx-run! cfg (fn [{:keys [::db/conn]}]
(db/exec-one! conn ["SET statement_timeout=0;"]) (db/exec-one! conn ["SET LOCAL statement_timeout=0;"])
(db/exec-one! conn ["SET idle_in_transaction_session_timeout=0;"]) (db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout=0;"])
(when (lock-scheduled-task! conn id) (when (lock-scheduled-task! conn id)
(l/dbg :hint "cron: execute task" :task-id id) (l/dbg :hint "cron: start task" :task-id id)
((:fn task) task)) ((:fn task) task)
(db/rollback! conn)) (let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "cron: end task" :task-id id :elapsed elapsed)))))
(catch InterruptedException _ (catch InterruptedException _
(l/debug :hint "cron: task interrupted" :task-id id)) (let [elapsed (dt/format-duration (tpoint))]
(l/debug :hint "cron: task interrupted" :task-id id :elapsed elapsed)))
(catch Throwable cause (catch Throwable cause
(let [elapsed (dt/format-duration (tpoint))]
(binding [l/*context* (get-error-context cause task)] (binding [l/*context* (get-error-context cause task)]
(l/err :hint "cron: unhandled exception on running task" (l/err :hint "cron: unhandled exception on running task"
:task-id id :task-id id
:cause cause))) :elapsed elapsed
:cause cause))))
(finally (finally
(when-not (px/interrupted? :current) (when-not (px/interrupted? :current)
(schedule-cron-task cfg task)))))) (schedule-cron-task cfg task)))))))
(defn- ms-until-valid (defn- ms-until-valid
[cron] [cron]
@ -618,8 +623,8 @@
(l/dbg :hint "cron: schedule task" :task-id id (l/dbg :hint "cron: schedule task" :task-id id
:ts (dt/format-duration ts) :ts (dt/format-duration ts)
:at (dt/format-instant (dt/in-future ts))) :at (dt/format-instant (dt/in-future ts)))
(swap! running #(into #{ft} (filter p/pending?) %))))
(swap! running #(into #{ft} (filter p/pending?) %))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SUBMIT API ;; SUBMIT API