mirror of
https://github.com/penpot/penpot.git
synced 2025-05-29 01:36:12 +02:00
✨ Move worker cron related code to a separated namespace
This commit is contained in:
parent
9c9d09a816
commit
e2ddb3e31e
2 changed files with 157 additions and 139 deletions
|
@ -22,11 +22,9 @@
|
||||||
[clojure.spec.alpha :as s]
|
[clojure.spec.alpha :as s]
|
||||||
[cuerdas.core :as str]
|
[cuerdas.core :as str]
|
||||||
[integrant.core :as ig]
|
[integrant.core :as ig]
|
||||||
[promesa.core :as p]
|
|
||||||
[promesa.exec :as px])
|
[promesa.exec :as px])
|
||||||
(:import
|
(:import
|
||||||
java.util.concurrent.Executor
|
java.util.concurrent.Executor
|
||||||
java.util.concurrent.Future
|
|
||||||
java.util.concurrent.ThreadPoolExecutor))
|
java.util.concurrent.ThreadPoolExecutor))
|
||||||
|
|
||||||
(set! *warn-on-reflection* true)
|
(set! *warn-on-reflection* true)
|
||||||
|
@ -486,146 +484,10 @@
|
||||||
|
|
||||||
(l/err :hint "worker: unhandled exception" :cause cause))))))
|
(l/err :hint "worker: unhandled exception" :cause cause))))))
|
||||||
|
|
||||||
(defn- get-error-context
|
(defn get-error-context
|
||||||
[_ item]
|
[_ item]
|
||||||
{:params item})
|
{:params item})
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; CRON
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(declare schedule-cron-task)
|
|
||||||
(declare synchronize-cron-entries!)
|
|
||||||
|
|
||||||
(s/def ::fn (s/or :var var? :fn fn?))
|
|
||||||
(s/def ::id keyword?)
|
|
||||||
(s/def ::cron dt/cron?)
|
|
||||||
(s/def ::props (s/nilable map?))
|
|
||||||
(s/def ::task keyword?)
|
|
||||||
|
|
||||||
(s/def ::cron-task
|
|
||||||
(s/keys :req-un [::cron ::task]
|
|
||||||
:opt-un [::props ::id]))
|
|
||||||
|
|
||||||
(s/def ::entries (s/coll-of (s/nilable ::cron-task)))
|
|
||||||
|
|
||||||
(defmethod ig/pre-init-spec ::cron [_]
|
|
||||||
(s/keys :req [::db/pool ::entries ::registry]))
|
|
||||||
|
|
||||||
(defmethod ig/init-key ::cron
|
|
||||||
[_ {:keys [::entries ::registry ::db/pool] :as cfg}]
|
|
||||||
(if (db/read-only? pool)
|
|
||||||
(l/wrn :hint "cron: not started (db is read-only)")
|
|
||||||
(let [running (atom #{})
|
|
||||||
entries (->> entries
|
|
||||||
(filter some?)
|
|
||||||
;; If id is not defined, use the task as id.
|
|
||||||
(map (fn [{:keys [id task] :as item}]
|
|
||||||
(if (some? id)
|
|
||||||
(assoc item :id (d/name id))
|
|
||||||
(assoc item :id (d/name task)))))
|
|
||||||
(map (fn [item]
|
|
||||||
(update item :task d/name)))
|
|
||||||
(map (fn [{:keys [task] :as item}]
|
|
||||||
(let [f (get registry task)]
|
|
||||||
(when-not f
|
|
||||||
(ex/raise :type :internal
|
|
||||||
:code :task-not-found
|
|
||||||
:hint (str/fmt "task %s not configured" task)))
|
|
||||||
(-> item
|
|
||||||
(dissoc :task)
|
|
||||||
(assoc :fn f))))))
|
|
||||||
|
|
||||||
cfg (assoc cfg ::entries entries ::running running)]
|
|
||||||
|
|
||||||
(l/inf :hint "cron: started" :tasks (count entries))
|
|
||||||
(synchronize-cron-entries! cfg)
|
|
||||||
|
|
||||||
(->> (filter some? entries)
|
|
||||||
(run! (partial schedule-cron-task cfg)))
|
|
||||||
|
|
||||||
(reify
|
|
||||||
clojure.lang.IDeref
|
|
||||||
(deref [_] @running)
|
|
||||||
|
|
||||||
java.lang.AutoCloseable
|
|
||||||
(close [_]
|
|
||||||
(l/inf :hint "cron: terminated")
|
|
||||||
(doseq [item @running]
|
|
||||||
(when-not (.isDone ^Future item)
|
|
||||||
(.cancel ^Future item true))))))))
|
|
||||||
|
|
||||||
(defmethod ig/halt-key! ::cron
|
|
||||||
[_ instance]
|
|
||||||
(some-> instance d/close!))
|
|
||||||
|
|
||||||
(def sql:upsert-cron-task
|
|
||||||
"insert into scheduled_task (id, cron_expr)
|
|
||||||
values (?, ?)
|
|
||||||
on conflict (id)
|
|
||||||
do update set cron_expr=?")
|
|
||||||
|
|
||||||
(defn- synchronize-cron-entries!
|
|
||||||
[{:keys [::db/pool ::entries]}]
|
|
||||||
(db/with-atomic [conn pool]
|
|
||||||
(doseq [{:keys [id cron]} entries]
|
|
||||||
(l/trc :hint "register cron task" :id id :cron (str cron))
|
|
||||||
(db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)]))))
|
|
||||||
|
|
||||||
(defn- lock-scheduled-task!
|
|
||||||
[conn id]
|
|
||||||
(let [sql (str "SELECT id FROM scheduled_task "
|
|
||||||
" WHERE id=? FOR UPDATE SKIP LOCKED")]
|
|
||||||
(some? (db/exec-one! conn [sql (d/name id)]))))
|
|
||||||
|
|
||||||
(defn- execute-cron-task
|
|
||||||
[cfg {:keys [id] :as task}]
|
|
||||||
(px/thread
|
|
||||||
{:name (str "penpot/cron-task/" id)}
|
|
||||||
(let [tpoint (dt/tpoint)]
|
|
||||||
(try
|
|
||||||
(db/tx-run! cfg (fn [{:keys [::db/conn]}]
|
|
||||||
(db/exec-one! conn ["SET LOCAL statement_timeout=0;"])
|
|
||||||
(db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout=0;"])
|
|
||||||
(when (lock-scheduled-task! conn id)
|
|
||||||
(l/dbg :hint "cron: start task" :task-id id)
|
|
||||||
((:fn task) task)
|
|
||||||
(let [elapsed (dt/format-duration (tpoint))]
|
|
||||||
(l/dbg :hint "cron: end task" :task-id id :elapsed elapsed)))))
|
|
||||||
|
|
||||||
(catch InterruptedException _
|
|
||||||
(let [elapsed (dt/format-duration (tpoint))]
|
|
||||||
(l/debug :hint "cron: task interrupted" :task-id id :elapsed elapsed)))
|
|
||||||
|
|
||||||
(catch Throwable cause
|
|
||||||
(let [elapsed (dt/format-duration (tpoint))]
|
|
||||||
(binding [l/*context* (get-error-context cause task)]
|
|
||||||
(l/err :hint "cron: unhandled exception on running task"
|
|
||||||
:task-id id
|
|
||||||
:elapsed elapsed
|
|
||||||
:cause cause))))
|
|
||||||
(finally
|
|
||||||
(when-not (px/interrupted? :current)
|
|
||||||
(schedule-cron-task cfg task)))))))
|
|
||||||
|
|
||||||
(defn- ms-until-valid
|
|
||||||
[cron]
|
|
||||||
(s/assert dt/cron? cron)
|
|
||||||
(let [now (dt/now)
|
|
||||||
next (dt/next-valid-instant-from cron now)]
|
|
||||||
(dt/diff now next)))
|
|
||||||
|
|
||||||
(defn- schedule-cron-task
|
|
||||||
[{:keys [::running] :as cfg} {:keys [cron id] :as task}]
|
|
||||||
(let [ts (ms-until-valid cron)
|
|
||||||
ft (px/schedule! ts (partial execute-cron-task cfg task))]
|
|
||||||
|
|
||||||
(l/dbg :hint "cron: schedule task" :task-id id
|
|
||||||
:ts (dt/format-duration ts)
|
|
||||||
:at (dt/format-instant (dt/in-future ts)))
|
|
||||||
|
|
||||||
(swap! running #(into #{ft} (filter p/pending?) %))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
;; SUBMIT API
|
;; SUBMIT API
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
156
backend/src/app/worker/cron.clj
Normal file
156
backend/src/app/worker/cron.clj
Normal file
|
@ -0,0 +1,156 @@
|
||||||
|
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
;;
|
||||||
|
;; Copyright (c) KALEIDOS INC
|
||||||
|
|
||||||
|
(ns app.worker.cron
|
||||||
|
(:require
|
||||||
|
[app.common.data :as d]
|
||||||
|
[app.common.exceptions :as ex]
|
||||||
|
[app.common.logging :as l]
|
||||||
|
[app.db :as db]
|
||||||
|
[app.util.time :as dt]
|
||||||
|
[app.worker :as wrk]
|
||||||
|
[clojure.spec.alpha :as s]
|
||||||
|
[cuerdas.core :as str]
|
||||||
|
[integrant.core :as ig]
|
||||||
|
[promesa.core :as p]
|
||||||
|
[promesa.exec :as px])
|
||||||
|
(:import
|
||||||
|
java.util.concurrent.Future))
|
||||||
|
|
||||||
|
(set! *warn-on-reflection* true)
|
||||||
|
|
||||||
|
(def sql:upsert-cron-task
|
||||||
|
"insert into scheduled_task (id, cron_expr)
|
||||||
|
values (?, ?)
|
||||||
|
on conflict (id)
|
||||||
|
do update set cron_expr=?")
|
||||||
|
|
||||||
|
(defn- synchronize-cron-entries!
|
||||||
|
[{:keys [::db/pool ::entries]}]
|
||||||
|
(db/with-atomic [conn pool]
|
||||||
|
(doseq [{:keys [id cron]} entries]
|
||||||
|
(l/trc :hint "register cron task" :id id :cron (str cron))
|
||||||
|
(db/exec-one! conn [sql:upsert-cron-task id (str cron) (str cron)]))))
|
||||||
|
|
||||||
|
(defn- lock-scheduled-task!
|
||||||
|
[conn id]
|
||||||
|
(let [sql (str "SELECT id FROM scheduled_task "
|
||||||
|
" WHERE id=? FOR UPDATE SKIP LOCKED")]
|
||||||
|
(some? (db/exec-one! conn [sql (d/name id)]))))
|
||||||
|
|
||||||
|
(declare ^:private schedule-cron-task)
|
||||||
|
|
||||||
|
(defn- execute-cron-task
|
||||||
|
[cfg {:keys [id] :as task}]
|
||||||
|
(px/thread
|
||||||
|
{:name (str "penpot/cron-task/" id)}
|
||||||
|
(let [tpoint (dt/tpoint)]
|
||||||
|
(try
|
||||||
|
(db/tx-run! cfg (fn [{:keys [::db/conn]}]
|
||||||
|
(db/exec-one! conn ["SET LOCAL statement_timeout=0;"])
|
||||||
|
(db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout=0;"])
|
||||||
|
(when (lock-scheduled-task! conn id)
|
||||||
|
(l/dbg :hint "start task" :task-id id)
|
||||||
|
((:fn task) task)
|
||||||
|
(let [elapsed (dt/format-duration (tpoint))]
|
||||||
|
(l/dbg :hint "end task" :task-id id :elapsed elapsed)))))
|
||||||
|
|
||||||
|
(catch InterruptedException _
|
||||||
|
(let [elapsed (dt/format-duration (tpoint))]
|
||||||
|
(l/debug :hint "task interrupted" :task-id id :elapsed elapsed)))
|
||||||
|
|
||||||
|
(catch Throwable cause
|
||||||
|
(let [elapsed (dt/format-duration (tpoint))]
|
||||||
|
(binding [l/*context* (wrk/get-error-context cause task)]
|
||||||
|
(l/err :hint "unhandled exception on running task"
|
||||||
|
:task-id id
|
||||||
|
:elapsed elapsed
|
||||||
|
:cause cause))))
|
||||||
|
(finally
|
||||||
|
(when-not (px/interrupted? :current)
|
||||||
|
(schedule-cron-task cfg task)))))))
|
||||||
|
|
||||||
|
(defn- ms-until-valid
|
||||||
|
[cron]
|
||||||
|
(s/assert dt/cron? cron)
|
||||||
|
(let [now (dt/now)
|
||||||
|
next (dt/next-valid-instant-from cron now)]
|
||||||
|
(dt/diff now next)))
|
||||||
|
|
||||||
|
(defn- schedule-cron-task
|
||||||
|
[{:keys [::running] :as cfg} {:keys [cron id] :as task}]
|
||||||
|
(let [ts (ms-until-valid cron)
|
||||||
|
ft (px/schedule! ts (partial execute-cron-task cfg task))]
|
||||||
|
|
||||||
|
(l/dbg :hint "schedule task" :task-id id
|
||||||
|
:ts (dt/format-duration ts)
|
||||||
|
:at (dt/format-instant (dt/in-future ts)))
|
||||||
|
|
||||||
|
(swap! running #(into #{ft} (filter p/pending?) %))))
|
||||||
|
|
||||||
|
|
||||||
|
(s/def ::fn (s/or :var var? :fn fn?))
|
||||||
|
(s/def ::id keyword?)
|
||||||
|
(s/def ::cron dt/cron?)
|
||||||
|
(s/def ::props (s/nilable map?))
|
||||||
|
(s/def ::task keyword?)
|
||||||
|
|
||||||
|
(s/def ::wrk/task
|
||||||
|
(s/keys :req-un [::cron ::task]
|
||||||
|
:opt-un [::props ::id]))
|
||||||
|
|
||||||
|
(s/def ::wrk/entries (s/coll-of (s/nilable ::wrk/task)))
|
||||||
|
|
||||||
|
(defmethod ig/pre-init-spec ::wrk/cron [_]
|
||||||
|
(s/keys :req [::db/pool ::wrk/entries ::wrk/registry]))
|
||||||
|
|
||||||
|
(defmethod ig/init-key ::wrk/cron
|
||||||
|
[_ {:keys [::wrk/entries ::wrk/registry ::db/pool] :as cfg}]
|
||||||
|
(if (db/read-only? pool)
|
||||||
|
(l/wrn :hint "service not started (db is read-only)")
|
||||||
|
(let [running (atom #{})
|
||||||
|
entries (->> entries
|
||||||
|
(filter some?)
|
||||||
|
;; If id is not defined, use the task as id.
|
||||||
|
(map (fn [{:keys [id task] :as item}]
|
||||||
|
(if (some? id)
|
||||||
|
(assoc item :id (d/name id))
|
||||||
|
(assoc item :id (d/name task)))))
|
||||||
|
(map (fn [item]
|
||||||
|
(update item :task d/name)))
|
||||||
|
(map (fn [{:keys [task] :as item}]
|
||||||
|
(let [f (get registry task)]
|
||||||
|
(when-not f
|
||||||
|
(ex/raise :type :internal
|
||||||
|
:code :task-not-found
|
||||||
|
:hint (str/fmt "task %s not configured" task)))
|
||||||
|
(-> item
|
||||||
|
(dissoc :task)
|
||||||
|
(assoc :fn f))))))
|
||||||
|
|
||||||
|
cfg (assoc cfg ::entries entries ::running running)]
|
||||||
|
|
||||||
|
(l/inf :hint "started" :tasks (count entries))
|
||||||
|
(synchronize-cron-entries! cfg)
|
||||||
|
|
||||||
|
(->> (filter some? entries)
|
||||||
|
(run! (partial schedule-cron-task cfg)))
|
||||||
|
|
||||||
|
(reify
|
||||||
|
clojure.lang.IDeref
|
||||||
|
(deref [_] @running)
|
||||||
|
|
||||||
|
java.lang.AutoCloseable
|
||||||
|
(close [_]
|
||||||
|
(l/inf :hint "terminated")
|
||||||
|
(doseq [item @running]
|
||||||
|
(when-not (.isDone ^Future item)
|
||||||
|
(.cancel ^Future item true))))))))
|
||||||
|
|
||||||
|
(defmethod ig/halt-key! ::wrk/cron
|
||||||
|
[_ instance]
|
||||||
|
(some-> instance d/close!))
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue