♻️ Minor code reorganization.

Improves modularity and reusability and allows usage
of backend code as a library.
This commit is contained in:
Andrey Antukh 2021-03-30 14:55:19 +02:00 committed by Alonso Torres
parent 59a45530a8
commit 0926fbcbc6
27 changed files with 704 additions and 791 deletions

View file

@ -5,15 +5,17 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
;; Copyright (c) UXBOX Labs SL
(ns app.worker
"Async tasks abstraction (impl)."
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.db :as db]
[app.metrics :as mtx]
[app.util.async :as aa]
[app.util.log4j :refer [update-thread-context!]]
[app.util.time :as dt]
@ -35,21 +37,13 @@
;; Executor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::name ::us/string)
(s/def ::name keyword?)
(s/def ::min-threads ::us/integer)
(s/def ::max-threads ::us/integer)
(s/def ::idle-timeout ::us/integer)
(defmethod ig/pre-init-spec ::executor [_]
(s/keys :opt-un [::min-threads ::max-threads ::idle-timeout ::name]))
(defmethod ig/prep-key ::executor
[_ cfg]
(merge {:min-threads 0
:max-threads 256
:idle-timeout 60000
:name "worker"}
cfg))
(s/keys :req-un [::min-threads ::max-threads ::idle-timeout ::name]))
(defmethod ig/init-key ::executor
[_ {:keys [min-threads max-threads idle-timeout name]}]
@ -57,28 +51,29 @@
(int min-threads)
(int idle-timeout))
(.setStopTimeout 500)
(.setName name)
(.setName (d/name name))
(.start)))
(defmethod ig/halt-key! ::executor
[_ instance]
(.stop ^QueuedThreadPool instance))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Worker
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare event-loop-fn)
(declare instrument-tasks)
(s/def ::queue ::us/string)
(s/def ::queue keyword?)
(s/def ::parallelism ::us/integer)
(s/def ::batch-size ::us/integer)
(s/def ::tasks (s/map-of string? fn?))
(s/def ::tasks (s/map-of keyword? fn?))
(s/def ::poll-interval ::dt/duration)
(defmethod ig/pre-init-spec ::worker [_]
(s/keys :req-un [::executor
::mtx/metrics
::db/pool
::batch-size
::name
@ -88,29 +83,29 @@
(defmethod ig/prep-key ::worker
[_ cfg]
(merge {:batch-size 2
:name "worker"
:poll-interval (dt/duration {:seconds 5})
:queue "default"}
cfg))
(d/merge {:batch-size 2
:name :worker
:poll-interval (dt/duration {:seconds 5})
:queue :default}
(d/without-nils cfg)))
(defmethod ig/init-key ::worker
[_ {:keys [pool poll-interval name queue] :as cfg}]
(log/infof "starting worker '%s' on queue '%s'" name queue)
(let [cch (a/chan 1)
poll-ms (inst-ms poll-interval)]
(log/infof "starting worker '%s' on queue '%s'" (d/name name) (d/name queue))
(let [close-ch (a/chan 1)
poll-ms (inst-ms poll-interval)]
(a/go-loop []
(let [[val port] (a/alts! [cch (event-loop-fn cfg)] :priority true)]
(let [[val port] (a/alts! [close-ch (event-loop-fn cfg)] :priority true)]
(cond
;; Terminate the loop if close channel is closed or
;; event-loop-fn returns nil.
(or (= port cch) (nil? val))
(log/infof "stop condition found; shutdown worker: '%s'" name)
(or (= port close-ch) (nil? val))
(log/infof "stop condition found; shutdown worker: '%s'" (d/name name))
(db/pool-closed? pool)
(do
(log/info "worker eventloop is aborted because pool is closed")
(a/close! cch))
(a/close! close-ch))
(and (instance? java.sql.SQLException val)
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState ^java.sql.SQLException val)))
@ -143,13 +138,55 @@
(reify
java.lang.AutoCloseable
(close [_]
(a/close! cch)))))
(a/close! close-ch)))))
(defmethod ig/halt-key! ::worker
[_ instance]
(.close ^java.lang.AutoCloseable instance))
;; --- SUBMIT
(s/def ::task keyword?)
(s/def ::delay (s/or :int ::us/integer :duration dt/duration?))
(s/def ::conn some?)
(s/def ::priority ::us/integer)
(s/def ::max-retries ::us/integer)
(s/def ::submit-options
(s/keys :req [::task ::conn]
:opt [::delay ::queue ::priority ::max-retries]))
(def ^:private sql:insert-new-task
"insert into task (id, name, props, queue, priority, max_retries, scheduled_at)
values (?, ?, ?, ?, ?, ?, clock_timestamp() + ?)
returning id")
(defn- extract-props
[options]
(persistent!
(reduce-kv (fn [res k v]
(cond-> res
(not (qualified-keyword? k))
(assoc! k v)))
(transient {})
options)))
(defn submit!
[{:keys [::task ::delay ::queue ::priority ::max-retries ::conn]
:or {delay 0 queue :default priority 100 max-retries 3}
:as options}]
(us/verify ::submit-options options)
(let [duration (dt/duration delay)
interval (db/interval duration)
props (-> options extract-props db/tjson)
id (uuid/next)]
(log/debugf "submit task '%s' to be executed in '%s'" (d/name task) (str duration))
(db/exec-one! conn [sql:insert-new-task id (d/name task) props (d/name queue) priority max-retries interval])
id))
;; --- RUNNER
(def ^:private
sql:mark-as-retry
@ -194,17 +231,18 @@
nil))
(defn- decode-task-row
[{:keys [props] :as row}]
[{:keys [props name] :as row}]
(when row
(cond-> row
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))))
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))
(string? name) (assoc :name (keyword name)))))
(defn- handle-task
[tasks {:keys [name] :as item}]
(let [task-fn (get tasks name)]
(if task-fn
(task-fn item)
(log/warnf "no task handler found for '%s'" (pr-str name)))
(log/warnf "no task handler found for '%s'" (d/name name)))
{:status :completed :task item}))
(defn get-error-context
@ -236,13 +274,14 @@
(defn- run-task
[{:keys [tasks]} item]
(try
(log/debugf "started task '%s/%s/%s'" (:name item) (:id item) (:retry-num item))
(handle-task tasks item)
(catch Exception e
(handle-exception e item))
(finally
(log/debugf "finished task '%s/%s/%s'" (:name item) (:id item) (:retry-num item)))))
(let [name (d/name (:name item))]
(try
(log/debugf "started task '%s/%s/%s'" name (:id item) (:retry-num item))
(handle-task tasks item)
(catch Exception e
(handle-exception e item))
(finally
(log/debugf "finished task '%s/%s/%s'" name (:id item) (:retry-num item))))))
(def sql:select-next-tasks
"select * from task as t
@ -256,7 +295,7 @@
(defn- event-loop-fn*
[{:keys [pool executor batch-size] :as cfg}]
(db/with-atomic [conn pool]
(let [queue (:queue cfg)
(let [queue (name (:queue cfg))
items (->> (db/exec! conn [sql:select-next-tasks queue batch-size])
(map decode-task-row)
(seq))
@ -288,16 +327,16 @@
(declare synchronize-schedule)
(s/def ::fn (s/or :var var? :fn fn?))
(s/def ::id ::us/string)
(s/def ::id keyword?)
(s/def ::cron dt/cron?)
(s/def ::props (s/nilable map?))
(s/def ::task keyword?)
(s/def ::scheduled-task-spec
(s/keys :req-un [::id ::cron ::task]
:opt-un [::props]))
(s/def ::scheduled-task
(s/keys :req-un [::cron ::task]
:opt-un [::props ::id]))
(s/def ::schedule (s/coll-of (s/nilable ::scheduled-task-spec)))
(s/def ::schedule (s/coll-of (s/nilable ::scheduled-task)))
(defmethod ig/pre-init-spec ::scheduler [_]
(s/keys :req-un [::executor ::db/pool ::schedule ::tasks]))
@ -307,8 +346,13 @@
(let [scheduler (Executors/newScheduledThreadPool (int 1))
schedule (->> schedule
(filter some?)
;; If id is not defined, use the task as id.
(map (fn [{:keys [id task] :as item}]
(if (some? id)
item
(assoc item :id task))))
(map (fn [{:keys [task] :as item}]
(let [f (get tasks (name task))]
(let [f (get tasks task)]
(when-not f
(ex/raise :type :internal
:code :task-not-found
@ -341,7 +385,8 @@
(defn- synchronize-schedule-item
[conn {:keys [id cron]}]
(let [cron (str cron)]
(let [cron (str cron)
id (name id)]
(log/infof "initialize scheduled task '%s' (cron: '%s')" id cron)
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
@ -390,3 +435,62 @@
[{:keys [scheduler] :as cfg} {:keys [cron] :as task}]
(let [ms (ms-until-valid cron)]
(px/schedule! scheduler ms (partial execute-scheduled-task cfg task))))
;; --- INSTRUMENTATION
(defn instrument!
[registry]
(mtx/instrument-vars!
[#'submit!]
{:registry registry
:type :counter
:labels ["name"]
:name "tasks_submit_total"
:help "A counter of task submissions."
:wrap (fn [rootf mobj]
(let [mdata (meta rootf)
origf (::original mdata rootf)]
(with-meta
(fn [conn params]
(let [tname (:name params)]
(mobj :inc [tname])
(origf conn params)))
{::original origf})))})
(mtx/instrument-vars!
[#'app.worker/run-task]
{:registry registry
:type :summary
:quantiles []
:name "tasks_checkout_timing"
:help "Latency measured between scheduld_at and execution time."
:wrap (fn [rootf mobj]
(let [mdata (meta rootf)
origf (::original mdata rootf)]
(with-meta
(fn [tasks item]
(let [now (inst-ms (dt/now))
sat (inst-ms (:scheduled-at item))]
(mobj :observe (- now sat))
(origf tasks item)))
{::original origf})))}))
(defmethod ig/pre-init-spec ::registry [_]
(s/keys :req-un [::mtx/metrics ::tasks]))
(defmethod ig/init-key ::registry
[_ {:keys [metrics tasks]}]
(let [mobj (mtx/create
{:registry (:registry metrics)
:type :summary
:labels ["name"]
:quantiles []
:name "tasks_timing"
:help "Background task execution timing."})]
(reduce-kv (fn [res k v]
(let [tname (name k)]
(log/debugf "registring task '%s'" tname)
(assoc res k (mtx/wrap-summary v mobj [tname]))))
{}
tasks)))