Add improvements to async tasks subsystem.

This commit is contained in:
Andrey Antukh 2020-01-23 20:38:21 +01:00
parent 6ba46673fa
commit 3433aa0c5b
4 changed files with 43 additions and 21 deletions

View file

@ -1,6 +1,8 @@
CREATE TABLE IF NOT EXISTS tasks (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
queue text NOT NULL,
created_at timestamptz NOT NULL DEFAULT clock_timestamp(),
modified_at timestamptz NOT NULL DEFAULT clock_timestamp(),
completed_at timestamptz NULL DEFAULT NULL,
@ -9,9 +11,11 @@ CREATE TABLE IF NOT EXISTS tasks (
name text NOT NULL,
props bytea NOT NULL,
error_text text NULL DEFAULT NULL,
retry_num smallint NOT NULL DEFAULT 0,
status text NOT NULL DEFAULT 'new'
);
CREATE INDEX tasks__scheduled_at__idx
ON tasks (scheduled_at);
ON tasks (scheduled_at, queue);

View file

@ -40,11 +40,16 @@
;; --- State initialization
;; TODO: missing self maintanance task; when the queue table is full
;; of completed/failed task, the performance starts degrading
;; linearly, so after some arbitrary number of tasks is processed, we
;; need to perform a maintenance and delete some old tasks.
(def ^:private tasks
[#'uxbox.tasks.demo-gc/handler
#'uxbox.tasks.sendmail/handler])
(defstate tasks
:start (as-> (impl/verticle tasks) $$
(defstate small-tasks
:start (as-> (impl/verticle {:tasks tasks :queue "default"}) $$
(vc/deploy! system $$ {:instances 1})
(deref $$)))

View file

@ -17,4 +17,5 @@
(defn handler
{:uxbox.tasks/name "demo-gc"}
[{:keys [props] :as task}]
(prn "debug" props)
)

View file

@ -24,20 +24,23 @@
[vertx.timers :as vt])
(:import java.time.Duration))
(def ^:private num-cpus
(delay (.availableProcessors (Runtime/getRuntime))))
(defn- string-strack-trace
[err]
(with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*))))
(def ^:private sql:update-failed-task
"update tasks
set scheduled_at = now() + cast($1::text as interval),
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
error_text = $1
status = 'error'
retry_num = retry_num + 1
where id = $2;")
(defn- reschedule
[conn task]
(let [duration (io.vertx.pgclient.data.Interval/of 0 0 0 0 0 5)
sqlv [sql:update-failed-task duration (:id task)]]
[conn task error]
(let [error (string-strack-trace error)
sqlv [sql:update-failed-task error (:id task)]]
(-> (db/query-one conn sqlv)
(p/then' (constantly nil)))))
@ -64,6 +67,7 @@
(def ^:private sql:select-next-task
"select * from tasks as t
where t.scheduled_at <= now()
and t.queue = $1
and (t.status = 'new' or (t.status = 'error' and t.retry_num < 3))
order by t.scheduled_at
limit 1
@ -76,16 +80,16 @@
props (assoc :props (blob/decode props)))))
(defn- event-loop
[{:keys [handlers] :as opts}]
[{:keys [handlers queue] :as opts}]
(db/with-atomic [conn db/pool]
(-> (db/query-one conn sql:select-next-task)
(-> (db/query-one conn [sql:select-next-task queue])
(p/then decode-task-row)
(p/then (fn [item]
(when item
(-> (p/do! (handle-task handlers item))
(p/handle (fn [v e]
(if e
(reschedule conn item)
(reschedule conn item e)
(mark-as-completed conn item))))
(p/then' (constantly ::handled)))))))))
@ -100,29 +104,37 @@
(event-loop-handler (assoc opts ::counter (inc counter))))))))
(def ^:private sql:insert-new-task
"insert into tasks (name, props, scheduled_at)
values ($1, $2, now()+cast($3::text as interval)) returning id")
"insert into tasks (name, props, queue, scheduled_at)
values ($1, $2, $3, clock_timestamp()+cast($4::text as interval))
returning id")
(defn schedule!
[conn {:keys [name delay props] :as task}]
(let [delay (tm/duration delay)
[conn {:keys [name delay props queue] :as task}]
(let [queue (if (string? queue) queue "default")
delay (tm/duration delay)
duration (->> (/ (.toMillis ^Duration delay) 1000.0)
(format "%s seconds"))
props (blob/encode props)]
(-> (db/query-one conn [sql:insert-new-task name props duration])
(-> (db/query-one conn [sql:insert-new-task name props queue duration])
(p/then' (fn [task] (:id task))))))
(defn- on-start
[ctx handlers]
[ctx queue handlers]
(vt/schedule! ctx {::vt/fn #'event-loop-handler
::vt/delay 1000
::vt/repeat true
:max-batch-size 10
:queue queue
:handlers handlers}))
(s/def ::tasks (s/coll-of (s/or :fn fn? :var var?)))
(s/def ::queue ::us/string)
(s/def ::verticle-options
(s/keys :req-un [::tasks ::queue]))
(defn verticle
[tasks]
(s/assert (s/coll-of (s/or :fn fn? :var var?)) tasks)
[{:keys [tasks queue] :as options}]
(s/assert ::verticle-options options)
(let [handlers (reduce (fn [acc f]
(let [task-name (:uxbox.tasks/name (meta f))]
(if task-name
@ -132,6 +144,6 @@
acc))))
{}
tasks)
on-start #(on-start % handlers)]
on-start #(on-start % queue handlers)]
(vc/verticle {:on-start on-start})))