♻️ Normalize internal naming on the worker module

This commit is contained in:
Andrey Antukh 2022-11-22 14:51:26 +01:00
parent 10bf6c5e56
commit 13a092b192
5 changed files with 59 additions and 84 deletions

View file

@ -26,73 +26,49 @@
java.util.concurrent.Executors
java.util.concurrent.ForkJoinPool
java.util.concurrent.Future
java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory
java.util.concurrent.ForkJoinWorkerThread
java.util.concurrent.ScheduledExecutorService
java.util.concurrent.ThreadFactory
java.util.concurrent.atomic.AtomicLong))
java.util.concurrent.ScheduledExecutorService))
(set! *warn-on-reflection* true)
(s/def ::executor #(instance? ExecutorService %))
(s/def ::scheduler #(instance? ScheduledExecutorService %))
(s/def ::scheduled-executor #(instance? ScheduledExecutorService %))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare ^:private get-fj-thread-factory)
(declare ^:private get-thread-factory)
(s/def ::parallelism ::us/integer)
(defmethod ig/pre-init-spec ::executor [_]
(s/keys :opt-un [::parallelism]))
(s/keys :req-un [::parallelism]))
(defmethod ig/init-key ::executor
[skey {:keys [parallelism]}]
(let [prefix (if (vector? skey) (-> skey first name keyword) :default)]
(if parallelism
(ForkJoinPool. (int parallelism) (get-fj-thread-factory prefix) nil false)
(Executors/newCachedThreadPool (get-thread-factory prefix)))))
(let [prefix (if (vector? skey) (-> skey first name keyword) :default)
tname (str "penpot/" prefix "/%s")
factory (px/forkjoin-thread-factory :name tname)]
(px/forkjoin-executor
:factory factory
:parallelism parallelism
:async? true)))
(defmethod ig/halt-key! ::executor
[_ instance]
(.shutdown ^ExecutorService instance))
(defmethod ig/pre-init-spec ::scheduler [_]
(s/keys :req-un [::prefix]
:opt-un [::parallelism]))
(defmethod ig/pre-init-spec ::scheduled-executor [_]
(s/keys :opt-un [::parallelism]))
(defmethod ig/init-key ::scheduler
[_ {:keys [parallelism prefix] :or {parallelism 1}}]
(px/scheduled-pool parallelism (get-thread-factory prefix)))
(defmethod ig/init-key ::scheduled-executor
[_ {:keys [parallelism] :or {parallelism 1}}]
(px/scheduled-executor
:parallelism parallelism
:factory (px/thread-factory :name "penpot/scheduled-executor/%s")))
(defmethod ig/halt-key! ::scheduler
[_ instance]
(.shutdown ^ExecutorService instance))
(defn- get-fj-thread-factory
^ForkJoinPool$ForkJoinWorkerThreadFactory
[prefix]
(let [^AtomicLong counter (AtomicLong. 0)]
(reify ForkJoinPool$ForkJoinWorkerThreadFactory
(newThread [_ pool]
(let [thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool)
tname (str "penpot/" (name prefix) "-" (.getAndIncrement counter))]
(.setName ^ForkJoinWorkerThread thread ^String tname)
thread)))))
(defn- get-thread-factory
^ThreadFactory
[prefix]
(let [^AtomicLong counter (AtomicLong. 0)]
(reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable)
(.setDaemon true)
(.setName (str "penpot/" (name prefix) "-" (.getAndIncrement counter))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor Monitor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -190,6 +166,30 @@
:queue :default}
(d/without-nils cfg)))
(defmethod ig/init-key ::worker
[_ {:keys [pool name queue] :as cfg}]
(let [close-ch (a/chan 1)
cfg (assoc cfg :close-ch close-ch)]
(if (db/read-only? pool)
(l/warn :hint "worker not started, db is read-only"
:name (d/name name)
:queue (d/name queue))
(do
(l/info :hint "worker initialized"
:name (d/name name)
:queue (d/name queue))
(event-loop cfg)))
(reify
java.lang.AutoCloseable
(close [_]
(a/close! close-ch)))))
(defmethod ig/halt-key! ::worker
[_ instance]
(.close ^java.lang.AutoCloseable instance))
(defn- event-loop
"Main, worker eventloop"
[{:keys [pool poll-interval close-ch] :as cfg}]
@ -235,29 +235,6 @@
(a/<! (a/timeout poll-ms))
(recur)))))))
(defmethod ig/init-key ::worker
[_ {:keys [pool name queue] :as cfg}]
(let [close-ch (a/chan 1)
cfg (assoc cfg :close-ch close-ch)]
(if (db/read-only? pool)
(l/warn :hint "worker not started, db is read-only"
:name (d/name name)
:queue (d/name queue))
(do
(l/info :hint "worker initialized"
:name (d/name name)
:queue (d/name queue))
(event-loop cfg)))
(reify
java.lang.AutoCloseable
(close [_]
(a/close! close-ch)))))
(defmethod ig/halt-key! ::worker
[_ instance]
(.close ^java.lang.AutoCloseable instance))
;; --- SUBMIT
(s/def ::task keyword?)
@ -460,7 +437,7 @@
(s/def ::entries (s/coll-of (s/nilable ::cron-task)))
(defmethod ig/pre-init-spec ::cron [_]
(s/keys :req-un [::executor ::scheduler ::db/pool ::entries ::tasks]))
(s/keys :req-un [::executor ::scheduled-executor ::db/pool ::entries ::tasks]))
(defmethod ig/init-key ::cron
[_ {:keys [entries tasks pool] :as cfg}]
@ -557,8 +534,8 @@
(remove #(.isDone ^Future %)))
(defn- schedule-cron-task
[{:keys [scheduler running] :as cfg} {:keys [cron] :as task}]
(let [ft (px/schedule! scheduler
[{:keys [scheduled-executor running] :as cfg} {:keys [cron] :as task}]
(let [ft (px/schedule! scheduled-executor
(ms-until-valid cron)
(partial execute-cron-task cfg task))]
(swap! running #(into #{ft} xf-without-done %))))