diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 17344c73e..18509bbe0 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -176,9 +176,6 @@ ::wrk/executor {::wrk/parallelism (cf/get :default-executor-parallelism 100)} - ::wrk/scheduled-executor - {::wrk/parallelism (cf/get :scheduled-executor-parallelism 20)} - ::wrk/monitor {::mtx/metrics (ig/ref ::mtx/metrics) ::wrk/name "default" @@ -203,8 +200,7 @@ :redis (ig/ref ::rds/redis)} :app.storage.tmp/cleaner - {::wrk/executor (ig/ref ::wrk/executor) - ::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)} + {::wrk/executor (ig/ref ::wrk/executor)} ::sto/gc-deleted-task {::db/pool (ig/ref ::db/pool) @@ -319,8 +315,7 @@ ::wrk/executor (ig/ref ::wrk/executor)} :app.rpc/rlimit - {::wrk/executor (ig/ref ::wrk/executor) - ::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)} + {::wrk/executor (ig/ref ::wrk/executor)} :app.rpc/methods {::http.client/client (ig/ref ::http.client/client) @@ -466,8 +461,7 @@ (def worker-config {::wrk/cron - {::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor) - ::wrk/registry (ig/ref ::wrk/registry) + {::wrk/registry (ig/ref ::wrk/registry) ::db/pool (ig/ref ::db/pool) ::wrk/entries [{:cron #app/cron "0 0 * * * ?" ;; hourly diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index 028a59a78..4966cf979 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -352,7 +352,7 @@ ::limits limits})))) (defn- refresh-config - [{:keys [::state ::path ::wrk/executor ::wrk/scheduled-executor] :as cfg}] + [{:keys [::state ::path ::wrk/executor] :as cfg}] (letfn [(update-config [{:keys [::updated-at] :as state}] (let [updated-at' (fs/last-modified-time path)] (merge state @@ -367,8 +367,7 @@ state))))) (schedule-next [state] - (px/schedule! scheduled-executor - (inst-ms (::refresh state)) + (px/schedule! (inst-ms (::refresh state)) (partial refresh-config cfg)) state)] @@ -391,8 +390,7 @@ (and (fs/exists? path) (fs/regular-file? path) path))) (defmethod ig/pre-init-spec :app.rpc/rlimit [_] - (s/keys :req [::wrk/executor - ::wrk/scheduled-executor])) + (s/keys :req [::wrk/executor])) (defmethod ig/init-key ::rpc/rlimit [_ {:keys [::wrk/executor] :as cfg}] diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj index 3e64e6bfc..057e82dad 100644 --- a/backend/src/app/storage/tmp.clj +++ b/backend/src/app/storage/tmp.clj @@ -10,57 +10,59 @@ the operating system cleaning task should be responsible of permanently delete these files (look at systemd-tempfiles)." (:require - [app.common.data :as d] [app.common.logging :as l] - [app.storage :as-alias sto] [app.util.time :as dt] [app.worker :as wrk] - [clojure.core.async :as a] [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] - [promesa.exec :as px])) + [promesa.exec :as px] + [promesa.exec.csp :as sp])) -(declare remove-temp-file) -(defonce queue (a/chan 128)) +(declare ^:private remove-temp-file) +(declare ^:private io-loop) + +(defonce queue (sp/chan :buf 128)) (defmethod ig/pre-init-spec ::cleaner [_] - (s/keys :req [::sto/min-age ::wrk/scheduled-executor])) + (s/keys :req [::wrk/executor])) (defmethod ig/prep-key ::cleaner [_ cfg] - (merge {::sto/min-age (dt/duration "30m")} - (d/without-nils cfg))) + (assoc cfg ::min-age (dt/duration "30m"))) (defmethod ig/init-key ::cleaner - [_ {:keys [::sto/min-age ::wrk/scheduled-executor] :as cfg}] - (px/thread - {:name "penpot/storage-tmp-cleaner"} - (try - (l/info :hint "started tmp file cleaner") - (loop [] - (when-let [path (a/thread (partial io-loop cfg) + {:name "penpot/storage/tmp-cleaner" :virtual true})) (defmethod ig/halt-key! ::cleaner [_ thread] (px/interrupt! thread)) +(defn- io-loop + [{:keys [::min-age] :as cfg}] + (l/info :hint "started tmp file cleaner") + (try + (loop [] + (when-let [path (sp/take! queue)] + (l/debug :hint "schedule tempfile deletion" :path path + :expires-at (dt/plus (dt/now) min-age)) + (px/schedule! (inst-ms min-age) (partial remove-temp-file cfg path)) + (recur))) + (catch InterruptedException _ + (l/trace :hint "cleaner interrupted")) + (finally + (l/info :hint "cleaner terminated")))) + (defn- remove-temp-file "Permanently delete tempfile" - [path] - (l/trace :hint "permanently delete tempfile" :path path) + [{:keys [::wrk/executor path]}] (when (fs/exists? path) - (fs/delete path))) + (px/run! executor + (fn [] + (l/debug :hint "permanently delete tempfile" :path path) + (fs/delete path))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; API @@ -72,7 +74,7 @@ :or {prefix "penpot." suffix ".tmp"}}] (let [candidate (fs/tempfile :suffix suffix :prefix prefix)] - (a/offer! queue candidate) + (sp/offer! queue candidate) candidate)) (defn create-tempfile @@ -80,5 +82,5 @@ :or {prefix "penpot." suffix ".tmp"}}] (let [path (fs/create-tempfile :suffix suffix :prefix prefix)] - (a/offer! queue path) + (sp/offer! queue path) path)) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 8f4b9a50f..61681fb30 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -22,6 +22,7 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] + [promesa.core :as p] [promesa.exec :as px]) (:import java.util.concurrent.ExecutorService @@ -32,7 +33,6 @@ (set! *warn-on-reflection* true) (s/def ::executor #(instance? ExecutorService %)) -(s/def ::scheduled-executor #(instance? ScheduledExecutorService %)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Executor @@ -48,28 +48,14 @@ (let [prefix (if (vector? skey) (-> skey first name) "default") tname (str "penpot/" prefix "/%s") factory (px/forkjoin-thread-factory :name tname)] - (px/forkjoin-executor - :factory factory - :parallelism parallelism - :async? true))) + (px/forkjoin-executor {:factory factory + :parallelism parallelism + :async true}))) (defmethod ig/halt-key! ::executor [_ instance] (px/shutdown! instance)) -(defmethod ig/pre-init-spec ::scheduled-executor [_] - (s/keys :req [::parallelism])) - -(defmethod ig/init-key ::scheduled-executor - [_ {:keys [::parallelism]}] - (px/scheduled-executor - :parallelism parallelism - :factory (px/thread-factory :name "penpot/scheduled-executor/%s"))) - -(defmethod ig/halt-key! ::scheduled-executor - [_ instance] - (px/shutdown! instance)) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; TASKS REGISTRY ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -531,7 +517,7 @@ (s/def ::entries (s/coll-of (s/nilable ::cron-task))) (defmethod ig/pre-init-spec ::cron [_] - (s/keys :req [::scheduled-executor ::db/pool ::entries ::registry])) + (s/keys :req [::db/pool ::entries ::registry])) (defmethod ig/init-key ::cron [_ {:keys [::entries ::registry ::db/pool] :as cfg}] @@ -622,16 +608,11 @@ next (dt/next-valid-instant-from cron now)] (inst-ms (dt/diff now next)))) -(def ^:private - xf-without-done - (remove #(.isDone ^Future %))) - (defn- schedule-cron-task - [{:keys [::scheduled-executor ::running] :as cfg} {:keys [cron] :as task}] - (let [ft (px/schedule! scheduled-executor - (ms-until-valid cron) + [{:keys [::running] :as cfg} {:keys [cron] :as task}] + (let [ft (px/schedule! (ms-until-valid cron) (partial execute-cron-task cfg task))] - (swap! running #(into #{ft} xf-without-done %)))) + (swap! running #(into #{ft} (filter p/pending?) %)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;