Simplify internal executor module

This commit is contained in:
Andrey Antukh 2023-11-24 15:35:59 +01:00
parent 1bd32327e5
commit c64e14859c
3 changed files with 43 additions and 53 deletions

View file

@ -161,12 +161,7 @@
::mdef/help "Current number of threads with state RUNNING." ::mdef/help "Current number of threads with state RUNNING."
::mdef/labels ["name"] ::mdef/labels ["name"]
::mdef/type :gauge} ::mdef/type :gauge}
})
:executors-queued-submissions
{::mdef/name "penpot_executors_queued_submissions"
::mdef/help "Current number of queued submissions."
::mdef/labels ["name"]
::mdef/type :gauge}})
(def system-config (def system-config
{::db/pool {::db/pool
@ -180,13 +175,12 @@
;; Default thread pool for IO operations ;; Default thread pool for IO operations
::wrk/executor ::wrk/executor
{::wrk/parallelism (cf/get :default-executor-parallelism {}
(+ 3 (* (px/get-available-processors) 3)))}
::wrk/monitor ::wrk/monitor
{::mtx/metrics (ig/ref ::mtx/metrics) {::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/name "default" ::wrk/executor (ig/ref ::wrk/executor)
::wrk/executor (ig/ref ::wrk/executor)} ::wrk/name "default"}
:app.migrations/migrations :app.migrations/migrations
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}

View file

@ -11,7 +11,6 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.db :as-alias db] [app.db :as-alias db]
[app.storage :as-alias sto] [app.storage :as-alias sto]
[app.worker :as-alias wrk]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
[buddy.core.hash :as bh] [buddy.core.hash :as bh]
[clojure.java.io :as jio] [clojure.java.io :as jio]
@ -201,7 +200,7 @@
(str "blake2b:" result))) (str "blake2b:" result)))
(defn resolve-backend (defn resolve-backend
[{:keys [::db/pool ::wrk/executor] :as storage} backend-id] [{:keys [::db/pool] :as storage} backend-id]
(let [backend (get-in storage [::sto/backends backend-id])] (let [backend (get-in storage [::sto/backends backend-id])]
(when-not backend (when-not backend
(ex/raise :type :internal (ex/raise :type :internal
@ -209,7 +208,6 @@
:hint (dm/fmt "backend '%' not configured" backend-id))) :hint (dm/fmt "backend '%' not configured" backend-id)))
(-> backend (-> backend
(assoc ::sto/id backend-id) (assoc ::sto/id backend-id)
(assoc ::wrk/executor executor)
(assoc ::db/pool pool)))) (assoc ::db/pool pool))))
(defrecord StorageObject [id size created-at expired-at touched-at backend]) (defrecord StorageObject [id size created-at expired-at touched-at backend])

View file

@ -25,43 +25,45 @@
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px]) [promesa.exec :as px])
(:import (:import
java.util.concurrent.ExecutorService java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ForkJoinPool java.util.concurrent.Executor
java.util.concurrent.Future)) java.util.concurrent.Future))
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(s/def ::executor #(instance? ExecutorService %)) (s/def ::executor #(instance? Executor %))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor ;; Executor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::parallelism ::us/integer)
(defmethod ig/pre-init-spec ::executor [_] (defmethod ig/pre-init-spec ::executor [_]
(s/keys :req [::parallelism])) (s/keys :req []))
(defmethod ig/init-key ::executor (defmethod ig/init-key ::executor
[skey {:keys [::parallelism]}] [_ _]
(let [prefix (if (vector? skey) (-> skey first name) "default") (let [factory (px/thread-factory :prefix "penpot/default/")
tname (str "penpot/" prefix "/%s") executor (px/cached-executor :factory factory :keepalive 30000)]
ttype (cf/get :worker-executor-type :fjoin)] (l/inf :hint "starting executor")
(case ttype (reify
:fjoin java.lang.AutoCloseable
(let [factory (px/forkjoin-thread-factory :name tname)] (close [_]
(px/forkjoin-executor {:factory factory (l/inf :hint "stoping executor")
:core-size (px/get-available-processors) (px/shutdown! executor))
:parallelism parallelism
:async true}))
:cached clojure.lang.IDeref
(let [factory (px/thread-factory :name tname)] (deref [_]
(px/cached-executor :factory factory))))) {:active (.getPoolSize ^ThreadPoolExecutor executor)
:running (.getActiveCount ^ThreadPoolExecutor executor)
:completed (.getCompletedTaskCount ^ThreadPoolExecutor executor)})
Executor
(execute [_ runnable]
(.execute ^Executor executor ^Runnable runnable)))))
(defmethod ig/halt-key! ::executor (defmethod ig/halt-key! ::executor
[_ instance] [_ instance]
(px/shutdown! instance)) (.close ^java.lang.AutoCloseable instance))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TASKS REGISTRY ;; TASKS REGISTRY
@ -111,42 +113,38 @@
(defmethod ig/init-key ::monitor (defmethod ig/init-key ::monitor
[_ {:keys [::executor ::mtx/metrics ::interval ::name]}] [_ {:keys [::executor ::mtx/metrics ::interval ::name]}]
(letfn [(monitor! [^ForkJoinPool executor prev-steals] (letfn [(monitor! [executor prev-completed]
(let [running (.getRunningThreadCount executor) (let [labels (into-array String [(d/name name)])
queued (.getQueuedSubmissionCount executor) stats (deref executor)
active (.getPoolSize executor)
steals (.getStealCount executor)
labels (into-array String [(d/name name)])
steals-inc (- steals prev-steals) completed (:completed stats)
steals-inc (if (neg? steals-inc) 0 steals-inc)] completed-inc (- completed prev-completed)
completed-inc (if (neg? completed-inc) 0 completed-inc)]
(mtx/run! metrics (mtx/run! metrics
:id :executor-active-threads :id :executor-active-threads
:labels labels :labels labels
:val active) :val (:active stats))
(mtx/run! metrics (mtx/run! metrics
:id :executor-running-threads :id :executor-running-threads
:labels labels :val running)
(mtx/run! metrics
:id :executors-queued-submissions
:labels labels :labels labels
:val queued) :val (:running stats))
(mtx/run! metrics (mtx/run! metrics
:id :executors-completed-tasks :id :executors-completed-tasks
:labels labels :labels labels
:inc steals-inc) :inc completed-inc)
steals))] completed-inc))]
(px/thread (px/thread
{:name "penpot/executors-monitor" :virtual true} {:name "penpot/executors-monitor" :virtual true}
(l/inf :hint "monitor: started" :name name) (l/inf :hint "monitor: started" :name name)
(try (try
(loop [steals 0] (loop [completed 0]
(when-not (px/shutdown? executor) (px/sleep interval)
(px/sleep interval) (recur (long (monitor! executor completed))))
(recur (long (monitor! executor steals)))))
(catch InterruptedException _cause (catch InterruptedException _cause
(l/trc :hint "monitor: interrupted" :name name)) (l/trc :hint "monitor: interrupted" :name name))
(catch Throwable cause (catch Throwable cause