♻️ Refactor general resource and concurrency model on backend

This commit is contained in:
Andrey Antukh 2022-02-18 18:01:21 +01:00 committed by Alonso Torres
parent d24f16563f
commit 7cf27ac86d
32 changed files with 917 additions and 797 deletions

View file

@ -22,37 +22,83 @@
[integrant.core :as ig]
[promesa.exec :as px])
(:import
org.eclipse.jetty.util.thread.QueuedThreadPool
java.util.concurrent.ExecutorService
java.util.concurrent.Executors
java.util.concurrent.Executor))
java.util.concurrent.ForkJoinPool
java.util.concurrent.ForkJoinWorkerThread
java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory
java.util.concurrent.atomic.AtomicLong
java.util.concurrent.Executors))
(s/def ::executor #(instance? Executor %))
(set! *warn-on-reflection* true)
(s/def ::executor #(instance? ExecutorService %))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::name keyword?)
(s/def ::prefix keyword?)
(s/def ::parallelism ::us/integer)
(s/def ::min-threads ::us/integer)
(s/def ::max-threads ::us/integer)
(s/def ::idle-timeout ::us/integer)
(defmethod ig/pre-init-spec ::executor [_]
(s/keys :req-un [::min-threads ::max-threads ::idle-timeout ::name]))
(s/keys :req-un [::prefix ::parallelism]))
(defn- get-thread-factory
^ForkJoinPool$ForkJoinWorkerThreadFactory
[prefix counter]
(reify ForkJoinPool$ForkJoinWorkerThreadFactory
(newThread [_ pool]
(let [^ForkJoinWorkerThread thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool)
^String thread-name (str (name prefix) "-" (.getAndIncrement ^AtomicLong counter))]
(.setName thread thread-name)
thread))))
(defmethod ig/init-key ::executor
[_ {:keys [min-threads max-threads idle-timeout name]}]
(doto (QueuedThreadPool. (int max-threads)
(int min-threads)
(int idle-timeout))
(.setStopTimeout 500)
(.setName (d/name name))
(.start)))
[_ {:keys [parallelism prefix]}]
(let [counter (AtomicLong. 0)]
(ForkJoinPool. (int parallelism) (get-thread-factory prefix counter) nil false)))
(defmethod ig/halt-key! ::executor
[_ instance]
(.stop ^QueuedThreadPool instance))
(.shutdown ^ForkJoinPool instance))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor Monitor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::executors (s/map-of keyword? ::executor))
(defmethod ig/pre-init-spec ::executors-monitor [_]
(s/keys :req-un [::executors ::mtx/metrics]))
(defmethod ig/init-key ::executors-monitor
[_ {:keys [executors metrics interval] :or {interval 2500}}]
(letfn [(log-stats [scheduler]
(doseq [[key ^ForkJoinPool executor] executors]
(let [labels (into-array String [(name key)])]
(mtx/run! metrics {:id :executors-active-threads
:labels labels
:val (.getPoolSize executor)})
(mtx/run! metrics {:id :executors-running-threads
:labels labels
:val (.getRunningThreadCount executor)})
(mtx/run! metrics {:id :executors-queued-submissions
:labels labels
:val (.getQueuedSubmissionCount executor)})))
(when-not (.isShutdown scheduler)
(px/schedule! scheduler interval (partial log-stats scheduler))))]
(let [scheduler (px/scheduled-pool 1)]
(px/schedule! scheduler interval (partial log-stats scheduler))
scheduler)))
(defmethod ig/halt-key! ::executors-monitor
[_ instance]
(.shutdown ^ExecutorService instance))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Worker
@ -60,7 +106,6 @@
(declare event-loop-fn)
(declare event-loop)
(declare instrument-tasks)
(s/def ::queue keyword?)
(s/def ::parallelism ::us/integer)
@ -420,11 +465,6 @@
(def sql:lock-scheduled-task
"select id from scheduled_task where id=? for update skip locked")
(defn exception->string
[error]
(with-out-str
(.printStackTrace ^Throwable error (java.io.PrintWriter. *out*))))
(defn- execute-scheduled-task
[{:keys [executor pool] :as cfg} {:keys [id] :as task}]
(letfn [(run-task [conn]
@ -460,59 +500,27 @@
;; --- INSTRUMENTATION
(defn instrument!
[registry]
(mtx/instrument-vars!
[#'submit!]
{:registry registry
:type :counter
:labels ["name"]
:name "tasks_submit_total"
:help "A counter of task submissions."
:wrap (fn [rootf mobj]
(let [mdata (meta rootf)
origf (::original mdata rootf)]
(with-meta
(fn [conn params]
(let [tname (:name params)]
(mobj :inc [tname])
(origf conn params)))
{::original origf})))})
(mtx/instrument-vars!
[#'app.worker/run-task]
{:registry registry
:type :summary
:quantiles []
:name "tasks_checkout_timing"
:help "Latency measured between scheduled_at and execution time."
:wrap (fn [rootf mobj]
(let [mdata (meta rootf)
origf (::original mdata rootf)]
(with-meta
(fn [tasks item]
(let [now (inst-ms (dt/now))
sat (inst-ms (:scheduled-at item))]
(mobj :observe (- now sat))
(origf tasks item)))
{::original origf})))}))
(defn- wrap-task-handler
[metrics tname f]
(let [labels (into-array String [tname])]
(fn [params]
(let [start (System/nanoTime)]
(try
(f params)
(finally
(mtx/run! metrics
{:id :tasks-timing
:val (/ (- (System/nanoTime) start) 1000000)
:labels labels})))))))
(defmethod ig/pre-init-spec ::registry [_]
(s/keys :req-un [::mtx/metrics ::tasks]))
(defmethod ig/init-key ::registry
[_ {:keys [metrics tasks]}]
(let [mobj (mtx/create
{:registry (:registry metrics)
:type :summary
:labels ["name"]
:quantiles []
:name "tasks_timing"
:help "Background task execution timing."})]
(reduce-kv (fn [res k v]
(let [tname (name k)]
(l/debug :action "register task" :name tname)
(assoc res k (mtx/wrap-summary v mobj [tname]))))
{}
tasks)))
(reduce-kv (fn [res k v]
(let [tname (name k)]
(l/debug :hint "register task" :name tname)
(assoc res k (wrap-task-handler metrics tname v))))
{}
tasks))