diff --git a/backend/resources/migrations/0004.tasks.sql b/backend/resources/migrations/0004.tasks.sql index e1009ac1f..5bfc55c3d 100644 --- a/backend/resources/migrations/0004.tasks.sql +++ b/backend/resources/migrations/0004.tasks.sql @@ -1,21 +1,22 @@ CREATE TABLE IF NOT EXISTS tasks ( id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), - queue text NOT NULL, - created_at timestamptz NOT NULL DEFAULT clock_timestamp(), modified_at timestamptz NOT NULL DEFAULT clock_timestamp(), completed_at timestamptz NULL DEFAULT NULL, scheduled_at timestamptz NOT NULL, + queue text NOT NULL, + name text NOT NULL, props bytea NOT NULL, - error_text text NULL DEFAULT NULL, + error text NULL DEFAULT NULL, + result bytea NULL DEFAULT NULL, retry_num smallint NOT NULL DEFAULT 0, status text NOT NULL DEFAULT 'new' ); -CREATE INDEX tasks__scheduled_at__idx +CREATE INDEX tasks__scheduled_at__queue__idx ON tasks (scheduled_at, queue); diff --git a/backend/src/uxbox/tasks.clj b/backend/src/uxbox/tasks.clj index 9a13bff94..af305146d 100644 --- a/backend/src/uxbox/tasks.clj +++ b/backend/src/uxbox/tasks.clj @@ -26,16 +26,9 @@ ;; --- Public API -(s/def ::name ::us/string) -(s/def ::delay ::us/integer) -(s/def ::props map?) -(s/def ::task-spec - (s/keys :req-un [::name ::delay] :opt-un [::props])) - (defn schedule! ([task] (schedule! db/pool task)) ([conn task] - (us/assert ::task-spec task) (impl/schedule! conn task))) ;; --- State initialization @@ -50,6 +43,6 @@ #'uxbox.tasks.sendmail/handler]) (defstate small-tasks - :start (as-> (impl/verticle {:tasks tasks :queue "default"}) $$ + :start (as-> (impl/verticle tasks {:queue "default"}) $$ (vc/deploy! system $$ {:instances 1}) (deref $$))) diff --git a/backend/src/uxbox/tasks/demo_gc.clj b/backend/src/uxbox/tasks/demo_gc.clj index 7ddc29a4f..919305a2f 100644 --- a/backend/src/uxbox/tasks/demo_gc.clj +++ b/backend/src/uxbox/tasks/demo_gc.clj @@ -10,12 +10,12 @@ (ns uxbox.tasks.demo-gc "Demo accounts garbage collector." (:require - [clojure.tools.logging :as log])) - -;; TODO + [clojure.tools.logging :as log] + [uxbox.common.exceptions :as ex])) (defn handler {:uxbox.tasks/name "demo-gc"} [{:keys [props] :as task}] - (prn "debug" props) - ) + (ex/raise :type :foobar + :code :foobaz + :hint "Foo bar")) diff --git a/backend/src/uxbox/tasks/impl.clj b/backend/src/uxbox/tasks/impl.clj index 22521e8c4..1a96e5833 100644 --- a/backend/src/uxbox/tasks/impl.clj +++ b/backend/src/uxbox/tasks/impl.clj @@ -29,22 +29,37 @@ (with-out-str (.printStackTrace err (java.io.PrintWriter. *out*)))) -(def ^:private sql:update-failed-task +(def ^:private sql:mark-as-retry "update tasks set scheduled_at = clock_timestamp() + '5 seconds'::interval, - error_text = $1 - status = 'error' + error = $1, + status = 'retry', retry_num = retry_num + 1 where id = $2;") (defn- reschedule [conn task error] - (let [error (string-strack-trace error) - sqlv [sql:update-failed-task error (:id task)]] + (let [explain (string-strack-trace error) + sqlv [sql:mark-as-retry explain (:id task)]] (-> (db/query-one conn sqlv) (p/then' (constantly nil))))) -(def ^:private sql:update-completed-task + +(def ^:private sql:mark-as-failed + "update tasks + set scheduled_at = clock_timestamp() + '5 seconds'::interval, + error = $1, + status = 'failed' + where id = $2;") + +(defn- mark-as-failed + [conn task error] + (let [error (string-strack-trace error) + sqlv [sql:mark-as-failed error (:id task)]] + (-> (db/query-one conn sqlv) + (p/then' (constantly nil))))) + +(def ^:private sql:mark-as-completed "update tasks set completed_at = clock_timestamp(), status = 'completed' @@ -52,7 +67,7 @@ (defn- mark-as-completed [conn task] - (-> (db/query-one conn [sql:update-completed-task (:id task)]) + (-> (db/query-one conn [sql:mark-as-completed (:id task)]) (p/then' (constantly nil)))) (defn- handle-task @@ -68,7 +83,7 @@ "select * from tasks as t where t.scheduled_at <= now() and t.queue = $1 - and (t.status = 'new' or (t.status = 'error' and t.retry_num < 3)) + and (t.status = 'new' or (t.status = 'retry' and t.retry_num <= $2)) order by t.scheduled_at limit 1 for update skip locked") @@ -80,70 +95,97 @@ props (assoc :props (blob/decode props))))) (defn- event-loop - [{:keys [handlers queue] :as opts}] - (db/with-atomic [conn db/pool] - (-> (db/query-one conn [sql:select-next-task queue]) - (p/then decode-task-row) - (p/then (fn [item] - (when item - (-> (p/do! (handle-task handlers item)) - (p/handle (fn [v e] - (if e - (reschedule conn item e) - (mark-as-completed conn item)))) - (p/then' (constantly ::handled))))))))) + [{:keys [handlers] :as options}] + (let [queue (:queue options "default") + max-retries (:max-retries options 3)] + (db/with-atomic [conn db/pool] + (-> (db/query-one conn [sql:select-next-task queue max-retries]) + (p/then decode-task-row) + (p/then (fn [item] + (when item + (-> (p/do! (handle-task handlers item)) + (p/handle (fn [v e] + (if e + (if (>= (:retry-num item) max-retries) + (mark-as-failed conn item e) + (reschedule conn item e)) + (mark-as-completed conn item)))) + (p/then' (constantly ::handled)))))))))) (defn- event-loop-handler - [{:keys [::counter max-barch-size] - :or {counter 1 max-barch-size 10} - :as opts}] - (-> (event-loop opts) - (p/then (fn [result] - (when (and (= result ::handled) - (> max-barch-size counter)) - (event-loop-handler (assoc opts ::counter (inc counter)))))))) + [options] + (let [counter (::counter options 1) + mbs (:max-batch-size options 10)] + (-> (event-loop options) + (p/then (fn [result] + (when (and (= result ::handled) + (> mbs counter)) + (event-loop-handler (assoc options ::counter (inc counter))))))))) (def ^:private sql:insert-new-task "insert into tasks (name, props, queue, scheduled_at) values ($1, $2, $3, clock_timestamp()+cast($4::text as interval)) returning id") +(s/def ::name ::us/string) +(s/def ::delay ::us/integer) +(s/def ::props map?) +(s/def ::queue ::us/string) +(s/def ::task-options + (s/keys :req-un [::name ::delay] + :opt-un [::props ::queue])) + +(defn- duration->pginterval + [^Duration d] + (->> (/ (.toMillis d) 1000.0) + (format "%s seconds"))) + (defn schedule! - [conn {:keys [name delay props queue] :as task}] + [conn {:keys [name delay props queue key] :as options}] + (us/assert ::task-options options) (let [queue (if (string? queue) queue "default") - delay (tm/duration delay) - duration (->> (/ (.toMillis ^Duration delay) 1000.0) - (format "%s seconds")) + duration (-> (tm/duration delay) + (duration->pginterval)) props (blob/encode props)] (-> (db/query-one conn [sql:insert-new-task name props queue duration]) (p/then' (fn [task] (:id task)))))) (defn- on-start - [ctx queue handlers] - (vt/schedule! ctx {::vt/fn #'event-loop-handler - ::vt/delay 1000 - ::vt/repeat true - :max-batch-size 10 - :queue queue - :handlers handlers})) + [ctx handlers options] + (vt/schedule! ctx (assoc options + ::vt/fn #'event-loop-handler + ::vt/delay 3000 + ::vt/repeat true + :handlers handlers))) + +(defn- resolve-handlers + [tasks] + (s/assert (s/coll-of ::callable) tasks) + (reduce (fn [acc f] + (let [task-name (:uxbox.tasks/name (meta f))] + (if task-name + (assoc acc task-name f) + (do + (log/warn "skiping task, no name provided in metadata" (pr-str f)) + acc)))) + {} + tasks)) + +(s/def ::callable (s/or :fn fn? :var var?)) +(s/def ::max-batch-size ::us/integer) +(s/def ::max-retries ::us/integer) + +(s/def ::verticle-tasks + (s/coll-of ::callable)) -(s/def ::tasks (s/coll-of (s/or :fn fn? :var var?))) -(s/def ::queue ::us/string) (s/def ::verticle-options - (s/keys :req-un [::tasks ::queue])) + (s/keys :opt-un [::queue ::max-batch-size])) (defn verticle - [{:keys [tasks queue] :as options}] + [tasks options] + (s/assert ::verticle-tasks tasks) (s/assert ::verticle-options options) - (let [handlers (reduce (fn [acc f] - (let [task-name (:uxbox.tasks/name (meta f))] - (if task-name - (assoc acc task-name f) - (do - (log/warn "skiping task, no name provided in metadata" (pr-str f)) - acc)))) - {} - tasks) - on-start #(on-start % queue handlers)] + (let [handlers (resolve-handlers tasks) + on-start #(on-start % handlers options)] (vc/verticle {:on-start on-start})))