Add several improvements to internal worker impl

Mainly for make the cron jobs do not block the scheduled executor
and offload all work to a separate threads
This commit is contained in:
Andrey Antukh 2023-02-20 13:15:11 +01:00
parent cad1851e95
commit 04b321caae
2 changed files with 64 additions and 65 deletions

View file

@ -297,10 +297,10 @@
::wrk/executor (ig/ref ::wrk/executor) ::wrk/executor (ig/ref ::wrk/executor)
::session/manager (ig/ref ::session/manager)} ::session/manager (ig/ref ::session/manager)}
:app.http.websocket/routes ::http.ws/routes
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
::mtx/metrics (ig/ref ::mtx/metrics) ::mtx/metrics (ig/ref ::mtx/metrics)
::mbus/msgbus (ig/ref :app.msgbus/msgbus) ::mbus/msgbus (ig/ref ::mbus/msgbus)
::session/manager (ig/ref ::session/manager)} ::session/manager (ig/ref ::session/manager)}
:app.http.assets/routes :app.http.assets/routes

View file

@ -27,8 +27,7 @@
(:import (:import
java.util.concurrent.ExecutorService java.util.concurrent.ExecutorService
java.util.concurrent.ForkJoinPool java.util.concurrent.ForkJoinPool
java.util.concurrent.Future java.util.concurrent.Future))
java.util.concurrent.ScheduledExecutorService))
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
@ -133,7 +132,7 @@
steals))] steals))]
(px/thread (px/thread
{:name "penpot/executors-monitor"} {:name "penpot/executors-monitor" :virtual true}
(l/info :hint "monitor: started" :name name) (l/info :hint "monitor: started" :name name)
(try (try
(loop [steals 0] (loop [steals 0]
@ -206,53 +205,52 @@
:queued res))) :queued res)))
(run-batch! [rconn] (run-batch! [rconn]
(db/with-atomic [conn pool] (try
(when-let [tasks (get-tasks conn)] (db/with-atomic [conn pool]
(->> (group-by :queue tasks) (if-let [tasks (get-tasks conn)]
(run! (partial push-tasks! conn rconn))) (->> (group-by :queue tasks)
true)))] (run! (partial push-tasks! conn rconn)))
(px/sleep (::wait-duration cfg))))
(catch InterruptedException cause
(throw cause))
(catch Exception cause
(cond
(rds/exception? cause)
(do
(l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
(db/sql-exception? cause)
(do
(l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
:else
(do
(l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))))))
(dispatcher []
(l/info :hint "dispatcher: started")
(try
(dm/with-open [rconn (rds/connect redis)]
(loop []
(run-batch! rconn)
(recur)))
(catch InterruptedException _
(l/trace :hint "dispatcher: interrupted"))
(catch Throwable cause
(l/error :hint "dispatcher: unexpected exception" :cause cause))
(finally
(l/info :hint "dispatcher: terminated"))))]
(if (db/read-only? pool) (if (db/read-only? pool)
(l/warn :hint "dispatcher: not started (db is read-only)") (l/warn :hint "dispatcher: not started (db is read-only)")
(px/thread
{:name "penpot/worker-dispatcher"}
(l/info :hint "dispatcher: started")
(try
(dm/with-open [rconn (rds/connect redis)]
(loop []
(when (px/interrupted?)
(throw (InterruptedException. "interrumpted")))
(try ;; FIXME: we don't use virtual threads here until JDBC is uptaded to >= 42.6.0
(when-not (run-batch! rconn) ;; bacause it has the necessary fixes fro make the JDBC driver properly compatible
(px/sleep (::wait-duration cfg))) ;; with Virtual Threads.
(catch InterruptedException cause (px/fn->thread dispatcher :name "penpot/worker/dispatcher" :virtual false))))
(throw cause))
(catch Exception cause
(cond
(rds/exception? cause)
(do
(l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
(db/sql-exception? cause)
(do
(l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn)))
:else
(do
(l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause)
(px/sleep (::rds/timeout rconn))))))
(recur)))
(catch InterruptedException _
(l/debug :hint "dispatcher: interrupted"))
(catch Throwable cause
(l/error :hint "dispatcher: unexpected exception" :cause cause))
(finally
(l/info :hint "dispatcher: terminated")))))))
(defmethod ig/halt-key! ::dispatcher (defmethod ig/halt-key! ::dispatcher
[_ thread] [_ thread]
@ -297,7 +295,7 @@
(defn- start-worker! (defn- start-worker!
[{:keys [::rds/redis ::worker-id ::queue] :as cfg}] [{:keys [::rds/redis ::worker-id ::queue] :as cfg}]
(px/thread (px/thread
{:name (format "penpot/worker/%s" worker-id)} {:name (format "penpot/worker/runner:%s" worker-id)}
(l/info :hint "worker: started" :worker-id worker-id :queue queue) (l/info :hint "worker: started" :worker-id worker-id :queue queue)
(try (try
(dm/with-open [rconn (rds/connect redis)] (dm/with-open [rconn (rds/connect redis)]
@ -584,22 +582,23 @@
(defn- execute-cron-task (defn- execute-cron-task
[{:keys [::db/pool] :as cfg} {:keys [id] :as task}] [{:keys [::db/pool] :as cfg} {:keys [id] :as task}]
(try (px/thread
(db/with-atomic [conn pool] {:name (str "penpot/cront-task/" id)}
(when (db/exec-one! conn [sql:lock-cron-task (d/name id)]) (try
(l/trace :hint "cron: execute task" :task-id id) (db/with-atomic [conn pool]
((:fn task) task))) (when (db/exec-one! conn [sql:lock-cron-task (d/name id)])
(catch InterruptedException _ (l/trace :hint "cron: execute task" :task-id id)
(px/interrupt! (px/current-thread)) ((:fn task) task)))
(l/debug :hint "cron: task interrupted" :task-id id)) (catch InterruptedException _
(catch Throwable cause (l/debug :hint "cron: task interrupted" :task-id id))
(l/error :hint "cron: unhandled exception on running task" (catch Throwable cause
::l/context (get-error-context cause task) (l/error :hint "cron: unhandled exception on running task"
:task-id id ::l/context (get-error-context cause task)
:cause cause)) :task-id id
(finally :cause cause))
(when-not (px/interrupted? :current) (finally
(schedule-cron-task cfg task))))) (when-not (px/interrupted? :current)
(schedule-cron-task cfg task))))))
(defn- ms-until-valid (defn- ms-until-valid
[cron] [cron]