🔥 Replace own scheduled executor with the one defined in promesa lib

This commit is contained in:
Andrey Antukh 2023-02-20 13:12:19 +01:00
parent 012ead65b5
commit cad1851e95
4 changed files with 47 additions and 72 deletions

View file

@ -176,9 +176,6 @@
::wrk/executor ::wrk/executor
{::wrk/parallelism (cf/get :default-executor-parallelism 100)} {::wrk/parallelism (cf/get :default-executor-parallelism 100)}
::wrk/scheduled-executor
{::wrk/parallelism (cf/get :scheduled-executor-parallelism 20)}
::wrk/monitor ::wrk/monitor
{::mtx/metrics (ig/ref ::mtx/metrics) {::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/name "default" ::wrk/name "default"
@ -203,8 +200,7 @@
:redis (ig/ref ::rds/redis)} :redis (ig/ref ::rds/redis)}
:app.storage.tmp/cleaner :app.storage.tmp/cleaner
{::wrk/executor (ig/ref ::wrk/executor) {::wrk/executor (ig/ref ::wrk/executor)}
::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)}
::sto/gc-deleted-task ::sto/gc-deleted-task
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
@ -319,8 +315,7 @@
::wrk/executor (ig/ref ::wrk/executor)} ::wrk/executor (ig/ref ::wrk/executor)}
:app.rpc/rlimit :app.rpc/rlimit
{::wrk/executor (ig/ref ::wrk/executor) {::wrk/executor (ig/ref ::wrk/executor)}
::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)}
:app.rpc/methods :app.rpc/methods
{::http.client/client (ig/ref ::http.client/client) {::http.client/client (ig/ref ::http.client/client)
@ -466,8 +461,7 @@
(def worker-config (def worker-config
{::wrk/cron {::wrk/cron
{::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor) {::wrk/registry (ig/ref ::wrk/registry)
::wrk/registry (ig/ref ::wrk/registry)
::db/pool (ig/ref ::db/pool) ::db/pool (ig/ref ::db/pool)
::wrk/entries ::wrk/entries
[{:cron #app/cron "0 0 * * * ?" ;; hourly [{:cron #app/cron "0 0 * * * ?" ;; hourly

View file

@ -352,7 +352,7 @@
::limits limits})))) ::limits limits}))))
(defn- refresh-config (defn- refresh-config
[{:keys [::state ::path ::wrk/executor ::wrk/scheduled-executor] :as cfg}] [{:keys [::state ::path ::wrk/executor] :as cfg}]
(letfn [(update-config [{:keys [::updated-at] :as state}] (letfn [(update-config [{:keys [::updated-at] :as state}]
(let [updated-at' (fs/last-modified-time path)] (let [updated-at' (fs/last-modified-time path)]
(merge state (merge state
@ -367,8 +367,7 @@
state))))) state)))))
(schedule-next [state] (schedule-next [state]
(px/schedule! scheduled-executor (px/schedule! (inst-ms (::refresh state))
(inst-ms (::refresh state))
(partial refresh-config cfg)) (partial refresh-config cfg))
state)] state)]
@ -391,8 +390,7 @@
(and (fs/exists? path) (fs/regular-file? path) path))) (and (fs/exists? path) (fs/regular-file? path) path)))
(defmethod ig/pre-init-spec :app.rpc/rlimit [_] (defmethod ig/pre-init-spec :app.rpc/rlimit [_]
(s/keys :req [::wrk/executor (s/keys :req [::wrk/executor]))
::wrk/scheduled-executor]))
(defmethod ig/init-key ::rpc/rlimit (defmethod ig/init-key ::rpc/rlimit
[_ {:keys [::wrk/executor] :as cfg}] [_ {:keys [::wrk/executor] :as cfg}]

View file

@ -10,57 +10,59 @@
the operating system cleaning task should be responsible of the operating system cleaning task should be responsible of
permanently delete these files (look at systemd-tempfiles)." permanently delete these files (look at systemd-tempfiles)."
(:require (:require
[app.common.data :as d]
[app.common.logging :as l] [app.common.logging :as l]
[app.storage :as-alias sto]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.fs :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec :as px])) [promesa.exec :as px]
[promesa.exec.csp :as sp]))
(declare remove-temp-file) (declare ^:private remove-temp-file)
(defonce queue (a/chan 128)) (declare ^:private io-loop)
(defonce queue (sp/chan :buf 128))
(defmethod ig/pre-init-spec ::cleaner [_] (defmethod ig/pre-init-spec ::cleaner [_]
(s/keys :req [::sto/min-age ::wrk/scheduled-executor])) (s/keys :req [::wrk/executor]))
(defmethod ig/prep-key ::cleaner (defmethod ig/prep-key ::cleaner
[_ cfg] [_ cfg]
(merge {::sto/min-age (dt/duration "30m")} (assoc cfg ::min-age (dt/duration "30m")))
(d/without-nils cfg)))
(defmethod ig/init-key ::cleaner (defmethod ig/init-key ::cleaner
[_ {:keys [::sto/min-age ::wrk/scheduled-executor] :as cfg}] [_ cfg]
(px/thread (px/fn->thread (partial io-loop cfg)
{:name "penpot/storage-tmp-cleaner"} {:name "penpot/storage/tmp-cleaner" :virtual true}))
(try
(l/info :hint "started tmp file cleaner")
(loop []
(when-let [path (a/<!! queue)]
(l/trace :hint "schedule tempfile deletion" :path path
:expires-at (dt/plus (dt/now) min-age))
(px/schedule! scheduled-executor
(inst-ms min-age)
(partial remove-temp-file path))
(recur)))
(catch InterruptedException _
(l/debug :hint "interrupted"))
(finally
(l/info :hint "terminated tmp file cleaner")))))
(defmethod ig/halt-key! ::cleaner (defmethod ig/halt-key! ::cleaner
[_ thread] [_ thread]
(px/interrupt! thread)) (px/interrupt! thread))
(defn- io-loop
[{:keys [::min-age] :as cfg}]
(l/info :hint "started tmp file cleaner")
(try
(loop []
(when-let [path (sp/take! queue)]
(l/debug :hint "schedule tempfile deletion" :path path
:expires-at (dt/plus (dt/now) min-age))
(px/schedule! (inst-ms min-age) (partial remove-temp-file cfg path))
(recur)))
(catch InterruptedException _
(l/trace :hint "cleaner interrupted"))
(finally
(l/info :hint "cleaner terminated"))))
(defn- remove-temp-file (defn- remove-temp-file
"Permanently delete tempfile" "Permanently delete tempfile"
[path] [{:keys [::wrk/executor path]}]
(l/trace :hint "permanently delete tempfile" :path path)
(when (fs/exists? path) (when (fs/exists? path)
(fs/delete path))) (px/run! executor
(fn []
(l/debug :hint "permanently delete tempfile" :path path)
(fs/delete path)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API ;; API
@ -72,7 +74,7 @@
:or {prefix "penpot." :or {prefix "penpot."
suffix ".tmp"}}] suffix ".tmp"}}]
(let [candidate (fs/tempfile :suffix suffix :prefix prefix)] (let [candidate (fs/tempfile :suffix suffix :prefix prefix)]
(a/offer! queue candidate) (sp/offer! queue candidate)
candidate)) candidate))
(defn create-tempfile (defn create-tempfile
@ -80,5 +82,5 @@
:or {prefix "penpot." :or {prefix "penpot."
suffix ".tmp"}}] suffix ".tmp"}}]
(let [path (fs/create-tempfile :suffix suffix :prefix prefix)] (let [path (fs/create-tempfile :suffix suffix :prefix prefix)]
(a/offer! queue path) (sp/offer! queue path)
path)) path))

View file

@ -22,6 +22,7 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]) [promesa.exec :as px])
(:import (:import
java.util.concurrent.ExecutorService java.util.concurrent.ExecutorService
@ -32,7 +33,6 @@
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(s/def ::executor #(instance? ExecutorService %)) (s/def ::executor #(instance? ExecutorService %))
(s/def ::scheduled-executor #(instance? ScheduledExecutorService %))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor ;; Executor
@ -48,28 +48,14 @@
(let [prefix (if (vector? skey) (-> skey first name) "default") (let [prefix (if (vector? skey) (-> skey first name) "default")
tname (str "penpot/" prefix "/%s") tname (str "penpot/" prefix "/%s")
factory (px/forkjoin-thread-factory :name tname)] factory (px/forkjoin-thread-factory :name tname)]
(px/forkjoin-executor (px/forkjoin-executor {:factory factory
:factory factory :parallelism parallelism
:parallelism parallelism :async true})))
:async? true)))
(defmethod ig/halt-key! ::executor (defmethod ig/halt-key! ::executor
[_ instance] [_ instance]
(px/shutdown! instance)) (px/shutdown! instance))
(defmethod ig/pre-init-spec ::scheduled-executor [_]
(s/keys :req [::parallelism]))
(defmethod ig/init-key ::scheduled-executor
[_ {:keys [::parallelism]}]
(px/scheduled-executor
:parallelism parallelism
:factory (px/thread-factory :name "penpot/scheduled-executor/%s")))
(defmethod ig/halt-key! ::scheduled-executor
[_ instance]
(px/shutdown! instance))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TASKS REGISTRY ;; TASKS REGISTRY
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -531,7 +517,7 @@
(s/def ::entries (s/coll-of (s/nilable ::cron-task))) (s/def ::entries (s/coll-of (s/nilable ::cron-task)))
(defmethod ig/pre-init-spec ::cron [_] (defmethod ig/pre-init-spec ::cron [_]
(s/keys :req [::scheduled-executor ::db/pool ::entries ::registry])) (s/keys :req [::db/pool ::entries ::registry]))
(defmethod ig/init-key ::cron (defmethod ig/init-key ::cron
[_ {:keys [::entries ::registry ::db/pool] :as cfg}] [_ {:keys [::entries ::registry ::db/pool] :as cfg}]
@ -622,16 +608,11 @@
next (dt/next-valid-instant-from cron now)] next (dt/next-valid-instant-from cron now)]
(inst-ms (dt/diff now next)))) (inst-ms (dt/diff now next))))
(def ^:private
xf-without-done
(remove #(.isDone ^Future %)))
(defn- schedule-cron-task (defn- schedule-cron-task
[{:keys [::scheduled-executor ::running] :as cfg} {:keys [cron] :as task}] [{:keys [::running] :as cfg} {:keys [cron] :as task}]
(let [ft (px/schedule! scheduled-executor (let [ft (px/schedule! (ms-until-valid cron)
(ms-until-valid cron)
(partial execute-cron-task cfg task))] (partial execute-cron-task cfg task))]
(swap! running #(into #{ft} xf-without-done %)))) (swap! running #(into #{ft} (filter p/pending?) %))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;