diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index dda506b11..f659d93ee 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -268,6 +268,9 @@ {:name "0086-add-webhook-delivery-table" :fn (mg/resource "app/migrations/sql/0086-add-webhook-delivery-table.sql")} + + {:name "0087-mod-task-table" + :fn (mg/resource "app/migrations/sql/0087-mod-task-table.sql")} ]) diff --git a/backend/src/app/migrations/sql/0087-mod-task-table.sql b/backend/src/app/migrations/sql/0087-mod-task-table.sql new file mode 100644 index 000000000..75379eca6 --- /dev/null +++ b/backend/src/app/migrations/sql/0087-mod-task-table.sql @@ -0,0 +1,9 @@ +ALTER TABLE task + ADD COLUMN label text NULL; + +ALTER TABLE task + ALTER COLUMN label SET STORAGE external; + +CREATE INDEX task__label__idx + ON task (label, name, queue) + WHERE status = 'new'; diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index b4305a3b9..9adaa7ee1 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -97,7 +97,7 @@ (l/info :hint "registry initialized" :tasks (count tasks)) (reduce-kv (fn [registry k v] (let [tname (name k)] - (l/debug :hint "register task" :name tname) + (l/trace :hint "register task" :name tname) (assoc registry tname (wrap-task-handler metrics tname v)))) {} tasks)) @@ -214,10 +214,10 @@ (db/create-array conn "uuid" ids)]] (db/exec-one! conn sql) - (l/debug :hist "dispatcher: push tasks to redis" + (l/debug :hist "dispatcher: queue tasks" :queue queue :tasks (count ids) - :queued res))) + :total-queued res))) (run-batch! [rconn] (db/with-atomic [conn pool] @@ -445,7 +445,7 @@ :else (try - (l/trace :hint "worker: executing task" + (l/debug :hint "worker: executing task" :worker-id worker-id :task-id (:id task) :task-name (:name task) @@ -649,39 +649,57 @@ options)))) (def ^:private sql:insert-new-task - "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) - values (?, ?, ?, ?, ?, ?, now() + ?) + "insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at) + values (?, ?, ?, ?, ?, ?, ?, now() + ?) returning id") +(def ^:private + sql:remove-not-started-tasks + "delete from task + where name=? and queue=? and label=? and status = 'new' and scheduled_at > now()") + +(s/def ::label string?) (s/def ::task (s/or :kw keyword? :str string?)) (s/def ::queue (s/or :kw keyword? :str string?)) -(s/def ::delay (s/or :int ::us/integer :duration dt/duration?)) +(s/def ::delay (s/or :int integer? :duration dt/duration?)) (s/def ::conn (s/or :pool ::db/pool :connection some?)) -(s/def ::priority ::us/integer) -(s/def ::max-retries ::us/integer) +(s/def ::priority integer?) +(s/def ::max-retries integer?) +(s/def ::dedupe boolean?) (s/def ::submit-options - (s/keys :req [::task ::conn] - :opt [::delay ::queue ::priority ::max-retries])) + (s/and + (s/keys :req [::task ::conn] + :opt [::label ::delay ::queue ::priority ::max-retries ::dedupe]) + (fn [{:keys [::dedupe ::label] :or {label ""}}] + (if dedupe + (not= label "") + true)))) (defn submit! - [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn] - :or {delay 0 queue :default priority 100 max-retries 3} + [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn ::dedupe ::label] + :or {delay 0 queue :default priority 100 max-retries 3 label ""} :as options}] - (us/verify ::submit-options options) + (us/verify! ::submit-options options) (let [duration (dt/duration delay) interval (db/interval duration) props (-> options extract-props db/tjson) id (uuid/next) tenant (cf/get :tenant) task (d/name task) - queue (str/ffmt "%:%" tenant (d/name queue))] + queue (str/ffmt "%:%" tenant (d/name queue)) + deleted (when dedupe + (-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label]) + :next.jdbc/update-count))] (l/debug :hint "submit task" :name task :queue queue + :label label + :dedupe (boolean dedupe) + :deleted (or deleted 0) :in (dt/format-duration duration)) - (db/exec-one! conn [sql:insert-new-task id task props - queue priority max-retries interval]) + (db/exec-one! conn [sql:insert-new-task id task props queue + label priority max-retries interval]) id))