mirror of
https://github.com/penpot/penpot.git
synced 2025-06-11 10:31:37 +02:00
♻️ Replace mount with integrant.
This commit is contained in:
parent
31d7aacec1
commit
9f12456456
76 changed files with 2403 additions and 2215 deletions
|
@ -5,105 +5,174 @@
|
|||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
|
||||
|
||||
(ns app.worker
|
||||
"Async tasks abstraction (impl)."
|
||||
(:require
|
||||
[app.common.exceptions :as ex]
|
||||
[app.common.spec :as us]
|
||||
[app.common.uuid :as uuid]
|
||||
[app.config :as cfg]
|
||||
[app.db :as db]
|
||||
[app.tasks.clean-tasks-table]
|
||||
[app.tasks.delete-object]
|
||||
[app.tasks.delete-profile]
|
||||
[app.tasks.file-media-gc]
|
||||
[app.tasks.file-xlog-gc]
|
||||
[app.tasks.remove-media]
|
||||
[app.tasks.sendmail]
|
||||
[app.util.async :as aa]
|
||||
[app.util.time :as dt]
|
||||
[clojure.core.async :as a]
|
||||
[clojure.pprint :refer [pprint]]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[mount.core :as mount :refer [defstate]]
|
||||
[cuerdas.core :as str]
|
||||
[integrant.core :as ig]
|
||||
[promesa.exec :as px])
|
||||
(:import
|
||||
org.eclipse.jetty.util.thread.QueuedThreadPool
|
||||
java.util.concurrent.ExecutorService
|
||||
java.util.concurrent.Executors
|
||||
java.time.Instant))
|
||||
java.util.concurrent.Executor
|
||||
java.time.Duration
|
||||
java.time.Instant
|
||||
java.util.Date))
|
||||
|
||||
(declare start-scheduler-worker!)
|
||||
(declare start-worker!)
|
||||
(declare thread-pool)
|
||||
(declare stop!)
|
||||
(s/def ::executor #(instance? Executor %))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Entry Point (state initialization)
|
||||
;; Executor
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def ^:private tasks
|
||||
{"delete-profile" #'app.tasks.delete-profile/handler
|
||||
"delete-object" #'app.tasks.delete-object/handler
|
||||
"remove-media" #'app.tasks.remove-media/handler
|
||||
"sendmail" #'app.tasks.sendmail/handler})
|
||||
(s/def ::name ::us/string)
|
||||
(s/def ::min-threads ::us/integer)
|
||||
(s/def ::max-threads ::us/integer)
|
||||
(s/def ::idle-timeout ::us/integer)
|
||||
|
||||
(def ^:private schedule
|
||||
[{:id "remove-deleted-media"
|
||||
:cron #app/cron "0 0 0 */1 * ? *" ;; daily
|
||||
:fn #'app.tasks.remove-media/trim-media-storage}
|
||||
(defmethod ig/pre-init-spec ::executor [_]
|
||||
(s/keys :opt-un [::min-threads ::max-threads ::idle-timeout ::name]))
|
||||
|
||||
{:id "file-media-gc"
|
||||
:cron #app/cron "0 0 0 */1 * ? *" ;; daily
|
||||
:fn #'app.tasks.file-media-gc/handler}
|
||||
(defmethod ig/prep-key ::executor
|
||||
[_ cfg]
|
||||
(merge {:min-threads 0
|
||||
:max-threads 256
|
||||
:idle-timeout 60000
|
||||
:name "worker"}
|
||||
cfg))
|
||||
|
||||
{:id "file-xlog-gc"
|
||||
:cron #app/cron "0 0 0 */1 * ?" ;; daily
|
||||
:fn #'app.tasks.file-xlog-gc/handler}
|
||||
(defmethod ig/init-key ::executor
|
||||
[_ {:keys [min-threads max-threads idle-timeout name]}]
|
||||
(doto (QueuedThreadPool. (int max-threads)
|
||||
(int min-threads)
|
||||
(int idle-timeout))
|
||||
(.setStopTimeout 500)
|
||||
(.setName name)
|
||||
(.start)))
|
||||
|
||||
{:id "clean-tasks-table"
|
||||
:cron #app/cron "0 0 0 */1 * ?" ;; daily
|
||||
:fn #'app.tasks.clean-tasks-table/handler}
|
||||
])
|
||||
|
||||
(defstate executor
|
||||
:start (thread-pool {:min-threads 0
|
||||
:max-threads 256})
|
||||
:stop (stop! executor))
|
||||
|
||||
(defstate worker
|
||||
:start (start-worker!
|
||||
{:tasks tasks
|
||||
:name "worker1"
|
||||
:batch-size 1
|
||||
:executor executor})
|
||||
:stop (stop! worker))
|
||||
|
||||
(defstate scheduler-worker
|
||||
:start (start-scheduler-worker! {:schedule schedule
|
||||
:executor executor})
|
||||
:stop (stop! scheduler-worker))
|
||||
(defmethod ig/halt-key! ::executor
|
||||
[_ instance]
|
||||
(.stop ^QueuedThreadPool instance))
|
||||
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Tasks Worker Impl
|
||||
;; Worker
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(declare event-loop-fn)
|
||||
|
||||
(s/def ::queue ::us/string)
|
||||
(s/def ::parallelism ::us/integer)
|
||||
(s/def ::batch-size ::us/integer)
|
||||
(s/def ::tasks (s/map-of string? ::us/fn))
|
||||
(s/def ::poll-interval ::dt/duration)
|
||||
|
||||
(defmethod ig/pre-init-spec ::worker [_]
|
||||
(s/keys :req-un [::executor
|
||||
::db/pool
|
||||
::batch-size
|
||||
::name
|
||||
::poll-interval
|
||||
::queue
|
||||
::tasks]))
|
||||
|
||||
(defmethod ig/prep-key ::worker
|
||||
[_ cfg]
|
||||
(merge {:batch-size 2
|
||||
:name "worker"
|
||||
:poll-interval (dt/duration {:seconds 5})
|
||||
:queue "default"}
|
||||
cfg))
|
||||
|
||||
(defmethod ig/init-key ::worker
|
||||
[_ {:keys [pool poll-interval name queue] :as cfg}]
|
||||
(log/infof "Starting worker '%s' on queue '%s'." name queue)
|
||||
(let [cch (a/chan 1)
|
||||
poll-ms (inst-ms poll-interval)]
|
||||
(a/go-loop []
|
||||
(let [[val port] (a/alts! [cch (event-loop-fn cfg)] :priority true)]
|
||||
(cond
|
||||
;; Terminate the loop if close channel is closed or
|
||||
;; event-loop-fn returns nil.
|
||||
(or (= port cch) (nil? val))
|
||||
(log/infof "Stop condition found. Shutdown worker: '%s'" name)
|
||||
|
||||
(db/pool-closed? pool)
|
||||
(do
|
||||
(log/info "Worker eventloop is aborted because pool is closed.")
|
||||
(a/close! cch))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState ^java.sql.SQLException val)))
|
||||
(do
|
||||
(log/error "Connection error, trying resume in some instants.")
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(= "40001" (.getSQLState ^java.sql.SQLException val)))
|
||||
(do
|
||||
(log/debug "Serialization failure (retrying in some instants).")
|
||||
(a/<! (a/timeout poll-ms))
|
||||
(recur))
|
||||
|
||||
(instance? Exception val)
|
||||
(do
|
||||
(log/errorf val "Unexpected error ocurried on polling the database (will resume in some instants).")
|
||||
(a/<! (a/timeout poll-ms))
|
||||
(recur))
|
||||
|
||||
(= ::handled val)
|
||||
(recur)
|
||||
|
||||
(= ::empty val)
|
||||
(do
|
||||
(a/<! (a/timeout poll-ms))
|
||||
(recur)))))
|
||||
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(a/close! cch)))))
|
||||
|
||||
|
||||
(defmethod ig/halt-key! ::worker
|
||||
[_ instance]
|
||||
(.close ^java.lang.AutoCloseable instance))
|
||||
|
||||
|
||||
(def ^:private
|
||||
sql:mark-as-retry
|
||||
"update task
|
||||
set scheduled_at = clock_timestamp() + '10 seconds'::interval,
|
||||
set scheduled_at = clock_timestamp() + ?::interval,
|
||||
modified_at = clock_timestamp(),
|
||||
error = ?,
|
||||
status = 'retry',
|
||||
retry_num = retry_num + ?
|
||||
where id = ?")
|
||||
|
||||
(def default-delay
|
||||
(dt/duration {:seconds 10}))
|
||||
|
||||
(defn- mark-as-retry
|
||||
[conn {:keys [task error inc-by]
|
||||
:or {inc-by 1}}]
|
||||
[conn {:keys [task error inc-by delay]
|
||||
:or {inc-by 1 delay default-delay}}]
|
||||
(let [explain (ex-message error)
|
||||
sqlv [sql:mark-as-retry explain inc-by (:id task)]]
|
||||
delay (db/interval delay)
|
||||
sqlv [sql:mark-as-retry delay explain inc-by (:id task)]]
|
||||
(db/exec-one! conn sqlv)
|
||||
nil))
|
||||
|
||||
|
@ -118,7 +187,7 @@
|
|||
nil))
|
||||
|
||||
(defn- mark-as-completed
|
||||
[conn {:keys [task] :as opts}]
|
||||
[conn {:keys [task] :as cfg}]
|
||||
(let [now (dt/now)]
|
||||
(db/update! conn :task
|
||||
{:completed-at now
|
||||
|
@ -138,40 +207,45 @@
|
|||
(let [task-fn (get tasks name)]
|
||||
(if task-fn
|
||||
(task-fn item)
|
||||
(log/warn "no task handler found for" (pr-str name)))
|
||||
{:status :completed :task item}))
|
||||
|
||||
(defn- handle-exception
|
||||
[error item]
|
||||
(let [edata (ex-data error)]
|
||||
(if (and (< (:retry-num item)
|
||||
(:max-retries item))
|
||||
(= ::retry (:type edata)))
|
||||
(cond-> {:status :retry :task item :error error}
|
||||
(dt/duration? (:delay edata))
|
||||
(assoc :delay (:delay edata))
|
||||
|
||||
(= ::noop (:strategy edata))
|
||||
(assoc :inc-by 0))
|
||||
|
||||
(do
|
||||
(log/warn "no task handler found for" (pr-str name))
|
||||
nil))))
|
||||
(log/errorf error
|
||||
(str "Unhandled exception.\n"
|
||||
"=> task: " (:name item) "\n"
|
||||
"=> retry: " (:retry-num item) "\n"
|
||||
"=> props: \n"
|
||||
(with-out-str
|
||||
(pprint (:props item)))))
|
||||
(if (>= (:retry-num item) (:max-retries item))
|
||||
{:status :failed :task item :error error}
|
||||
{:status :retry :task item :error error})))))
|
||||
|
||||
(defn- run-task
|
||||
[{:keys [tasks]} item]
|
||||
[{:keys [tasks conn]} item]
|
||||
(try
|
||||
(log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item))
|
||||
(handle-task tasks item)
|
||||
{:status :completed :task item}
|
||||
(catch Exception e
|
||||
(let [data (ex-data e)]
|
||||
(cond
|
||||
(and (= ::retry (:type data))
|
||||
(= ::noop (:strategy data)))
|
||||
{:status :retry :task item :error e :inc-by 0}
|
||||
|
||||
(and (< (:retry-num item)
|
||||
(:max-retries item))
|
||||
(= ::retry (:type data)))
|
||||
{:status :retry :task item :error e}
|
||||
|
||||
:else
|
||||
(do
|
||||
(log/errorf e "Unhandled exception on task '%s' (retry: %s)\nProps: %s"
|
||||
(:name item) (:retry-num item) (pr-str (:props item)))
|
||||
(if (>= (:retry-num item) (:max-retries item))
|
||||
{:status :failed :task item :error e}
|
||||
{:status :retry :task item :error e})))))
|
||||
(handle-exception e item))
|
||||
(finally
|
||||
(log/debugf "Finished task '%s/%s/%s'." (:name item) (:id item) (:retry-num item)))))
|
||||
|
||||
(def ^:private
|
||||
sql:select-next-tasks
|
||||
(def sql:select-next-tasks
|
||||
"select * from task as t
|
||||
where t.scheduled_at <= now()
|
||||
and t.queue = ?
|
||||
|
@ -181,103 +255,69 @@
|
|||
for update skip locked")
|
||||
|
||||
(defn- event-loop-fn*
|
||||
[{:keys [executor batch-size] :as opts}]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(let [queue (:queue opts "default")
|
||||
[{:keys [tasks pool executor batch-size] :as cfg}]
|
||||
(db/with-atomic [conn pool]
|
||||
(let [queue (:queue cfg)
|
||||
items (->> (db/exec! conn [sql:select-next-tasks queue batch-size])
|
||||
(map decode-task-row)
|
||||
(seq))
|
||||
opts (assoc opts :conn conn)]
|
||||
cfg (assoc cfg :conn conn)]
|
||||
|
||||
(if (nil? items)
|
||||
::empty
|
||||
(let [results (->> items
|
||||
(map #(partial run-task opts %))
|
||||
(map #(px/submit! executor %)))]
|
||||
(doseq [res results]
|
||||
(let [res (deref res)]
|
||||
(case (:status res)
|
||||
:retry (mark-as-retry conn res)
|
||||
:failed (mark-as-failed conn res)
|
||||
:completed (mark-as-completed conn res))))
|
||||
(let [proc-xf (comp (map #(partial run-task cfg %))
|
||||
(map #(px/submit! executor %)))]
|
||||
(->> (into [] proc-xf items)
|
||||
(map deref)
|
||||
(run! (fn [res]
|
||||
(case (:status res)
|
||||
:retry (mark-as-retry conn res)
|
||||
:failed (mark-as-failed conn res)
|
||||
:completed (mark-as-completed conn res)))))
|
||||
::handled)))))
|
||||
|
||||
(defn- event-loop-fn
|
||||
[{:keys [executor] :as opts}]
|
||||
(aa/thread-call executor #(event-loop-fn* opts)))
|
||||
[{:keys [executor] :as cfg}]
|
||||
(aa/thread-call executor #(event-loop-fn* cfg)))
|
||||
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Scheduler
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(declare schedule-task)
|
||||
(declare synchronize-schedule)
|
||||
|
||||
(s/def ::batch-size ::us/integer)
|
||||
(s/def ::poll-interval ::us/integer)
|
||||
(s/def ::fn (s/or :var var? :fn fn?))
|
||||
(s/def ::tasks (s/map-of string? ::fn))
|
||||
(s/def ::id ::us/string)
|
||||
(s/def ::cron dt/cron?)
|
||||
(s/def ::props (s/nilable map?))
|
||||
|
||||
(s/def ::start-worker-params
|
||||
(s/keys :req-un [::tasks ::aa/executor ::batch-size]
|
||||
:opt-un [::poll-interval]))
|
||||
(s/def ::scheduled-task
|
||||
(s/keys :req-un [::id ::cron ::fn]
|
||||
:opt-un [::props]))
|
||||
|
||||
(defn start-worker!
|
||||
[{:keys [poll-interval]
|
||||
:or {poll-interval 5000}
|
||||
:as opts}]
|
||||
(us/assert ::start-worker-params opts)
|
||||
(log/infof "Starting worker '%s' on queue '%s'."
|
||||
(:name opts "anonymous")
|
||||
(:queue opts "default"))
|
||||
(let [cch (a/chan 1)]
|
||||
(a/go-loop []
|
||||
(let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)]
|
||||
(cond
|
||||
;; Terminate the loop if close channel is closed or
|
||||
;; event-loop-fn returns nil.
|
||||
(or (= port cch) (nil? val))
|
||||
(log/infof "Stop condition found. Shutdown worker: '%s'"
|
||||
(:name opts "anonymous"))
|
||||
(s/def ::schedule (s/coll-of ::scheduled-task))
|
||||
|
||||
(db/pool-closed? db/pool)
|
||||
(do
|
||||
(log/info "Worker eventloop is aborted because pool is closed.")
|
||||
(a/close! cch))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val)))
|
||||
(do
|
||||
(log/error "Connection error, trying resume in some instants.")
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(= "40001" (.getSQLState ^java.sql.SQLException val)))
|
||||
(do
|
||||
(log/debug "Serialization failure (retrying in some instants).")
|
||||
(a/<! (a/timeout 1000))
|
||||
(recur))
|
||||
|
||||
(instance? Exception val)
|
||||
(do
|
||||
(log/errorf val "Unexpected error ocurried on polling the database (will resume operations in some instants). ")
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur))
|
||||
|
||||
(= ::handled val)
|
||||
(recur)
|
||||
|
||||
(= ::empty val)
|
||||
(do
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur)))))
|
||||
(defmethod ig/pre-init-spec ::scheduler [_]
|
||||
(s/keys :req-un [::executor ::db/pool ::schedule]))
|
||||
|
||||
(defmethod ig/init-key ::scheduler
|
||||
[_ {:keys [executor schedule] :as cfg}]
|
||||
(let [scheduler (Executors/newScheduledThreadPool (int 1))
|
||||
cfg (assoc cfg :scheduler scheduler)]
|
||||
(synchronize-schedule cfg)
|
||||
(run! (partial schedule-task cfg) schedule)
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(a/close! cch)))))
|
||||
(.shutdownNow ^ExecutorService scheduler)))))
|
||||
|
||||
(defmethod ig/halt-key! ::scheduler
|
||||
[_ instance]
|
||||
(.close ^java.lang.AutoCloseable instance))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Scheduled Tasks (cron based) IMPL
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def ^:private
|
||||
sql:upsert-scheduled-task
|
||||
(def sql:upsert-scheduled-task
|
||||
"insert into scheduled_task (id, cron_expr)
|
||||
values (?, ?)
|
||||
on conflict (id)
|
||||
|
@ -286,18 +326,18 @@
|
|||
(defn- synchronize-schedule-item
|
||||
[conn {:keys [id cron]}]
|
||||
(let [cron (str cron)]
|
||||
(log/debugf "Initialize scheduled task '%s' (cron: '%s')." id cron)
|
||||
(log/debugf "initialize scheduled task '%s' (cron: '%s')." id cron)
|
||||
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
|
||||
|
||||
(defn- synchronize-schedule!
|
||||
[schedule]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(defn- synchronize-schedule
|
||||
[{:keys [pool schedule]}]
|
||||
(db/with-atomic [conn pool]
|
||||
(run! (partial synchronize-schedule-item conn) schedule)))
|
||||
|
||||
(def ^:private sql:lock-scheduled-task
|
||||
(def sql:lock-scheduled-task
|
||||
"select id from scheduled_task where id=? for update skip locked")
|
||||
|
||||
(declare schedule-task!)
|
||||
(declare schedule-task)
|
||||
|
||||
(defn exception->string
|
||||
[error]
|
||||
|
@ -305,7 +345,7 @@
|
|||
(.printStackTrace ^Throwable error (java.io.PrintWriter. *out*))))
|
||||
|
||||
(defn- execute-scheduled-task
|
||||
[{:keys [executor] :as opts} {:keys [id] :as task}]
|
||||
[{:keys [executor pool] :as cfg} {:keys [id] :as task}]
|
||||
(letfn [(run-task [conn]
|
||||
(try
|
||||
(when (db/exec-one! conn [sql:lock-scheduled-task id])
|
||||
|
@ -318,7 +358,7 @@
|
|||
(let [result (run-task conn)]
|
||||
(if (instance? Throwable result)
|
||||
(do
|
||||
(log/warnf result "Unhandled exception on scheduled task '%s'." id)
|
||||
(log/warnf result "unhandled exception on scheduled task '%s'" id)
|
||||
(db/insert! conn :scheduled-task-history
|
||||
{:id (uuid/next)
|
||||
:task-id id
|
||||
|
@ -328,75 +368,22 @@
|
|||
{:id (uuid/next)
|
||||
:task-id id}))))
|
||||
(handle-task []
|
||||
(db/with-atomic [conn db/pool]
|
||||
(db/with-atomic [conn pool]
|
||||
(handle-task* conn)))]
|
||||
|
||||
(try
|
||||
(px/run! executor handle-task)
|
||||
(finally
|
||||
(schedule-task! opts task)))))
|
||||
(schedule-task cfg task)))))
|
||||
|
||||
(defn ms-until-valid
|
||||
(defn- ms-until-valid
|
||||
[cron]
|
||||
(s/assert dt/cron? cron)
|
||||
(let [^Instant now (dt/now)
|
||||
^Instant next (dt/next-valid-instant-from cron now)]
|
||||
(let [now (dt/now)
|
||||
next (dt/next-valid-instant-from cron now)]
|
||||
(inst-ms (dt/duration-between now next))))
|
||||
|
||||
(defn- schedule-task!
|
||||
[{:keys [scheduler] :as opts} {:keys [cron] :as task}]
|
||||
(defn- schedule-task
|
||||
[{:keys [scheduler] :as cfg} {:keys [cron] :as task}]
|
||||
(let [ms (ms-until-valid cron)]
|
||||
(px/schedule! scheduler ms (partial execute-scheduled-task opts task))))
|
||||
|
||||
(s/def ::fn (s/or :var var? :fn fn?))
|
||||
(s/def ::id string?)
|
||||
(s/def ::cron dt/cron?)
|
||||
(s/def ::props (s/nilable map?))
|
||||
(s/def ::scheduled-task
|
||||
(s/keys :req-un [::id ::cron ::fn]
|
||||
:opt-un [::props]))
|
||||
|
||||
(s/def ::schedule (s/coll-of ::scheduled-task))
|
||||
(s/def ::start-scheduler-worker-params
|
||||
(s/keys :req-un [::schedule]))
|
||||
|
||||
(defn start-scheduler-worker!
|
||||
[{:keys [schedule] :as opts}]
|
||||
(us/assert ::start-scheduler-worker-params opts)
|
||||
(let [scheduler (Executors/newScheduledThreadPool (int 1))
|
||||
opts (assoc opts :scheduler scheduler)]
|
||||
(synchronize-schedule! schedule)
|
||||
(run! (partial schedule-task! opts) schedule)
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(.shutdownNow ^ExecutorService scheduler)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Thread Pool
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defn thread-pool
|
||||
([] (thread-pool {}))
|
||||
([{:keys [min-threads max-threads name]
|
||||
:or {min-threads 0 max-threads 256}}]
|
||||
(let [executor (QueuedThreadPool. max-threads min-threads)]
|
||||
(.setName executor (or name "default-tp"))
|
||||
(.start executor)
|
||||
executor)))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Helpers
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defn stop!
|
||||
[o]
|
||||
(cond
|
||||
(instance? java.lang.AutoCloseable o)
|
||||
(.close ^java.lang.AutoCloseable o)
|
||||
|
||||
(instance? org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
||||
(.stop ^org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
||||
|
||||
:else
|
||||
(ex/raise :type :not-implemented)))
|
||||
(px/schedule! scheduler ms (partial execute-scheduled-task cfg task))))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue