🎉 Add scheduled (cron based) tasks subsystem.

This commit is contained in:
Andrey Antukh 2020-01-25 17:23:21 +01:00
parent 9bcb91ceae
commit b005c3905f
12 changed files with 400 additions and 139 deletions

View file

@ -22,7 +22,16 @@
[uxbox.util.time :as tm]
[vertx.core :as vc]
[vertx.timers :as vt])
(:import java.time.Duration))
(:import
java.time.Duration
java.time.Instant
java.util.Date))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Implementation
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; --- Task Execution
(defn- string-strack-trace
[err]
@ -44,7 +53,6 @@
(-> (db/query-one conn sqlv)
(p/then' (constantly nil)))))
(def ^:private sql:mark-as-failed
"update tasks
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
@ -127,19 +135,139 @@
values ($1, $2, $3, clock_timestamp()+cast($4::text as interval))
returning id")
(s/def ::name ::us/string)
(s/def ::delay ::us/integer)
(s/def ::props map?)
(s/def ::queue ::us/string)
(s/def ::task-options
(s/keys :req-un [::name ::delay]
:opt-un [::props ::queue]))
(defn- duration->pginterval
[^Duration d]
(->> (/ (.toMillis d) 1000.0)
(format "%s seconds")))
(defn- on-worker-start
[ctx {:keys [tasks] :as options}]
(vt/schedule! ctx (assoc options
::vt/fn #'event-loop-handler
::vt/delay 3000
::vt/repeat true)))
;; --- Task Scheduling
(def ^:privatr sql:upsert-scheduled-task
"insert into scheduled_tasks (id, cron_expr)
values ($1, $2)
on conflict (id)
do update set cron_expr=$2")
(defn- synchronize-schedule-item
[conn {:keys [id cron]}]
(-> (db/query-one conn [sql:upsert-scheduled-task id (str cron)])
(p/then' (constantly nil))))
(defn- synchronize-schedule
[schedule]
(db/with-atomic [conn db/pool]
(p/run! (partial synchronize-schedule-item conn) schedule)))
(def ^:private sql:lock-scheduled-task
"select id from scheduled_tasks where id=$1 for update skip locked")
(declare schedule-task)
(defn thr-name
[]
(.getName (Thread/currentThread)))
(defn- execute-scheduled-task
[{:keys [id cron] :as stask}]
(db/with-atomic [conn db/pool]
(-> (db/query-one conn [sql:lock-scheduled-task id])
(p/then (fn [result]
(if result
(do
(prn (thr-name) "execute-scheduled-task" "task-locked")
(-> (p/do! ((:fn stask) stask))
(p/catch (fn [e]
(log/warn "Excepton happens on executing scheduled task" e)
nil))))
(prn (thr-name) "execute-scheduled-task" "task-already-locked"))))
(p/finally (fn [v e]
(-> (vc/current-context)
(schedule-task stask)))))))
(defn ms-until-valid
[cron]
(s/assert tm/cron? cron)
(let [^Instant now (tm/now)
^Instant next (.toInstant (.getNextValidTimeAfter cron (Date/from now)))
^Duration duration (Duration/between now next)]
(.toMillis duration)))
(defn- schedule-task
[ctx {:keys [cron] :as stask}]
(let [ms (ms-until-valid cron)]
(prn (thr-name) "schedule-task" (:id stask) ms)
(vt/schedule! ctx (assoc stask
:ctx ctx
::vt/once true
::vt/delay ms
::vt/fn execute-scheduled-task))))
(defn- on-scheduler-start
[ctx {:keys [schedule] :as options}]
(-> (synchronize-schedule schedule)
(p/then' (fn [_]
(run! #(schedule-task ctx %) schedule)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Public API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; --- Worker Verticle
(s/def ::callable (s/or :fn fn? :var var?))
(s/def ::max-batch-size ::us/integer)
(s/def ::max-retries ::us/integer)
(s/def ::tasks (s/map-of string? ::callable))
(s/def ::worker-verticle-options
(s/keys :req-un [::tasks]
:opt-un [::queue ::max-batch-size]))
(defn worker-verticle
[options]
(s/assert ::worker-verticle-options options)
(let [on-start #(on-worker-start % options)]
(vc/verticle {:on-start on-start})))
;; --- Scheduler Verticle
(s/def ::id string?)
(s/def ::cron tm/cron?)
(s/def ::fn ::callable)
(s/def ::props (s/nilable map?))
(s/def ::scheduled-task
(s/keys :req-un [::id ::cron ::fn]
:opt-un [::props]))
(s/def ::schedule (s/coll-of ::scheduled-task))
(s/def ::scheduler-verticle-options
(s/keys :opt-un [::schedule]))
(defn scheduler-verticle
[options]
(s/assert ::scheduler-verticle-options options)
(let [on-start #(on-scheduler-start % options)]
(vc/verticle {:on-start on-start})))
;; --- Schedule API
(s/def ::name ::us/string)
(s/def ::delay ::us/integer)
(s/def ::queue ::us/string)
(s/def ::task-options
(s/keys :req-un [::name ::delay]
:opt-un [::props ::queue]))
(defn schedule!
[conn {:keys [name delay props queue key] :as options}]
(us/assert ::task-options options)
@ -149,43 +277,3 @@
props (blob/encode props)]
(-> (db/query-one conn [sql:insert-new-task name props queue duration])
(p/then' (fn [task] (:id task))))))
(defn- on-start
[ctx handlers options]
(vt/schedule! ctx (assoc options
::vt/fn #'event-loop-handler
::vt/delay 3000
::vt/repeat true
:handlers handlers)))
(defn- resolve-handlers
[tasks]
(s/assert (s/coll-of ::callable) tasks)
(reduce (fn [acc f]
(let [task-name (:uxbox.tasks/name (meta f))]
(if task-name
(assoc acc task-name f)
(do
(log/warn "skiping task, no name provided in metadata" (pr-str f))
acc))))
{}
tasks))
(s/def ::callable (s/or :fn fn? :var var?))
(s/def ::max-batch-size ::us/integer)
(s/def ::max-retries ::us/integer)
(s/def ::verticle-tasks
(s/coll-of ::callable))
(s/def ::verticle-options
(s/keys :opt-un [::queue ::max-batch-size]))
(defn verticle
[tasks options]
(s/assert ::verticle-tasks tasks)
(s/assert ::verticle-options options)
(let [handlers (resolve-handlers tasks)
on-start #(on-start % handlers options)]
(vc/verticle {:on-start on-start})))