diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 32b01ca86..95a221b3a 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -108,7 +108,9 @@ (s/def ::default-executor-parallelism ::us/integer) (s/def ::scheduled-executor-parallelism ::us/integer) -(s/def ::worker-parallelism ::us/integer) + +(s/def ::worker-default-parallelism ::us/integer) +(s/def ::worker-webhook-parallelism ::us/integer) (s/def ::authenticated-cookie-domain ::us/string) (s/def ::authenticated-cookie-name ::us/string) @@ -222,7 +224,8 @@ ::error-report-webhook ::default-executor-parallelism ::scheduled-executor-parallelism - ::worker-parallelism + ::worker-default-parallelism + ::worker-webhook-parallelism ::file-change-snapshot-every ::file-change-snapshot-timeout ::user-feedback-destination diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 22d9de047..1c36c9d03 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -494,20 +494,28 @@ {:cron #app/cron "30 */5 * * * ?" ;; every 5m :task :audit-log-gc})]} - ::wrk/scheduler + ::wrk/dispatcher {::rds/redis (ig/ref ::rds/redis) ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)} - ::wrk/worker - {::wrk/parallelism (cf/get ::worker-parallelism 1) - ;; FIXME: read queues from configuration - ::wrk/queue "default" + [::default ::wrk/worker] + {::wrk/parallelism (cf/get ::worker-default-parallelism 1) + ::wrk/queue :default + ::rds/redis (ig/ref ::rds/redis) + ::wrk/registry (ig/ref ::wrk/registry) + ::mtx/metrics (ig/ref ::mtx/metrics) + ::db/pool (ig/ref ::db/pool)} + + [::webhook ::wrk/worker] + {::wrk/parallelism (cf/get ::worker-webhook-parallelism 1) + ::wrk/queue :webhooks ::rds/redis (ig/ref ::rds/redis) ::wrk/registry (ig/ref ::wrk/registry) ::mtx/metrics (ig/ref ::mtx/metrics) ::db/pool (ig/ref ::db/pool)}}) + (def system nil) (defn start diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 607ba1809..b4305a3b9 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -14,6 +14,7 @@ [app.common.spec :as us] [app.common.transit :as t] [app.common.uuid :as uuid] + [app.config :as cf] [app.db :as db] [app.metrics :as mtx] [app.redis :as rds] @@ -174,63 +175,62 @@ (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))) -(s/def ::queue ::us/string) (s/def ::wait-duration ::dt/duration) -(defmethod ig/pre-init-spec ::scheduler [_] +(defmethod ig/pre-init-spec ::dispatcher [_] (s/keys :req [::mtx/metrics ::db/pool ::rds/redis] :opt [::wait-duration ::batch-size])) -(defmethod ig/prep-key ::scheduler +(defmethod ig/prep-key ::dispatcher [_ cfg] - (merge {::batch-size 1 - ::wait-duration (dt/duration "2s")} + (merge {::batch-size 100 + ::wait-duration (dt/duration "5s")} (d/without-nils cfg))) (def ^:private sql:select-next-tasks - "select * from task as t + "select id, queue from task as t where t.scheduled_at <= now() and (t.status = 'new' or t.status = 'retry') + and queue ~~* ?::text order by t.priority desc, t.scheduled_at limit ? for update skip locked") -(defn- format-queue - [queue] - (str/ffmt "penpot-tasks-queue:%" queue)) - -(defmethod ig/init-key ::scheduler +(defmethod ig/init-key ::dispatcher [_ {:keys [::db/pool ::rds/redis ::batch-size] :as cfg}] - (letfn [(get-tasks-batch [conn] - (->> (db/exec! conn [sql:select-next-tasks batch-size]) - (map decode-task-row) - (seq))) + (letfn [(get-tasks [conn] + (let [prefix (str (cf/get :tenant) ":%")] + (seq (db/exec! conn [sql:select-next-tasks prefix batch-size])))) - (queue-task [conn rconn {:keys [id queue] :as task}] - (db/update! conn :task {:status "ready"} {:id id}) - (let [queue (format-queue queue) - payload (t/encode id) - result (rds/rpush! rconn queue payload)] - (l/debug :hist "scheduler: task pushed to redis" - :task-id id - :key queue - :queued result))) + (push-tasks! [conn rconn [queue tasks]] + (let [ids (mapv :id tasks) + key (str/ffmt "taskq:%" queue) + res (rds/rpush! rconn key (mapv t/encode ids)) + sql [(str "update task set status = 'scheduled'" + " where id = ANY(?)") + (db/create-array conn "uuid" ids)]] - (run-batch [rconn] + (db/exec-one! conn sql) + (l/debug :hist "dispatcher: push tasks to redis" + :queue queue + :tasks (count ids) + :queued res))) + + (run-batch! [rconn] (db/with-atomic [conn pool] - (when-let [tasks (get-tasks-batch conn)] - (run! (partial queue-task conn rconn) tasks) - true))) - ] + (when-let [tasks (get-tasks conn)] + (->> (group-by :queue tasks) + (run! (partial push-tasks! conn rconn))) + true)))] (if (db/read-only? pool) - (l/warn :hint "scheduler: not started (db is read-only)") + (l/warn :hint "dispatcher: not started (db is read-only)") (px/thread - {:name "penpot/scheduler"} - (l/info :hint "scheduler: started") + {:name "penpot/worker-dispatcher"} + (l/info :hint "dispatcher: started") (try (dm/with-open [rconn (rds/connect redis)] (loop [] @@ -238,7 +238,7 @@ (throw (InterruptedException. "interrumpted"))) (try - (when-not (run-batch rconn) + (when-not (run-batch! rconn) (px/sleep (::wait-duration cfg))) (catch InterruptedException cause (throw cause)) @@ -246,29 +246,29 @@ (cond (rds/exception? cause) (do - (l/warn :hint "scheduler: redis exception (will retry in an instant)" :cause cause) + (l/warn :hint "dispatcher: redis exception (will retry in an instant)" :cause cause) (px/sleep (::rds/timeout rconn))) (db/sql-exception? cause) (do - (l/warn :hint "scheduler: database exception (will retry in an instant)" :cause cause) + (l/warn :hint "dispatcher: database exception (will retry in an instant)" :cause cause) (px/sleep (::rds/timeout rconn))) :else (do - (l/error :hint "scheduler: unhandled exception (will retry in an instant)" :cause cause) + (l/error :hint "dispatcher: unhandled exception (will retry in an instant)" :cause cause) (px/sleep (::rds/timeout rconn)))))) (recur))) (catch InterruptedException _ - (l/debug :hint "scheduler: interrupted")) + (l/debug :hint "dispatcher: interrupted")) (catch Throwable cause - (l/error :hint "scheduler: unexpected exception" :cause cause)) + (l/error :hint "dispatcher: unexpected exception" :cause cause)) (finally - (l/info :hint "scheduler: terminated"))))))) + (l/info :hint "dispatcher: terminated"))))))) -(defmethod ig/halt-key! ::scheduler +(defmethod ig/halt-key! ::dispatcher [_ thread] (some-> thread px/interrupt!)) @@ -288,36 +288,38 @@ ::queue ::registry])) -;; FIXME: define queue as set (defmethod ig/prep-key ::worker [_ cfg] - (merge {::queue "default" ::parallelism 1} + (merge {::parallelism 1} (d/without-nils cfg))) (defmethod ig/init-key ::worker [_ {:keys [::db/pool ::queue ::parallelism] :as cfg}] - (if (db/read-only? pool) - (l/warn :hint "workers: not started (db is read-only)" :queue queue) - (doall - (->> (range parallelism) - (map #(assoc cfg ::worker-id %)) - (map start-worker!))))) + (let [queue (d/name queue) + cfg (assoc cfg ::queue queue)] + (if (db/read-only? pool) + (l/warn :hint "worker: not started (db is read-only)" :queue queue :parallelism parallelism) + (doall + (->> (range parallelism) + (map #(assoc cfg ::worker-id %)) + (map start-worker!)))))) (defmethod ig/halt-key! ::worker [_ threads] (run! px/interrupt! threads)) (defn- start-worker! - [{:keys [::rds/redis ::worker-id] :as cfg}] + [{:keys [::rds/redis ::worker-id ::queue] :as cfg}] (px/thread {:name (format "penpot/worker/%s" worker-id)} - (l/info :hint "worker: started" :worker-id worker-id) + (l/info :hint "worker: started" :worker-id worker-id :queue queue) (try (dm/with-open [rconn (rds/connect redis)] - (let [cfg (-> cfg - (update ::queue format-queue) - (assoc ::rds/rconn rconn) - (assoc ::timeout (dt/duration "5s")))] + (let [tenant (cf/get :tenant "main") + cfg (-> cfg + (assoc ::queue (str/ffmt "taskq:%:%" tenant queue)) + (assoc ::rds/rconn rconn) + (assoc ::timeout (dt/duration "5s")))] (loop [] (when (px/interrupted?) (throw (InterruptedException. "interrupted"))) @@ -327,13 +329,17 @@ (catch InterruptedException _ (l/debug :hint "worker: interrupted" - :worker-id worker-id)) + :worker-id worker-id + :queue queue)) (catch Throwable cause (l/error :hint "worker: unexpected exception" :worker-id worker-id + :queue queue :cause cause)) (finally - (l/info :hint "worker: terminated" :worker-id worker-id))))) + (l/info :hint "worker: terminated" + :worker-id worker-id + :queue queue))))) (defn- run-worker-loop! [{:keys [::db/pool ::rds/rconn ::timeout ::queue ::registry ::worker-id]}] @@ -631,9 +637,26 @@ ;; SUBMIT API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::task keyword?) +(defn- extract-props + [options] + (let [cns (namespace ::sample)] + (persistent! + (reduce-kv (fn [res k v] + (cond-> res + (not= (namespace k) cns) + (assoc! k v))) + (transient {}) + options)))) + +(def ^:private sql:insert-new-task + "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) + values (?, ?, ?, ?, ?, ?, now() + ?) + returning id") + +(s/def ::task (s/or :kw keyword? :str string?)) +(s/def ::queue (s/or :kw keyword? :str string?)) (s/def ::delay (s/or :int ::us/integer :duration dt/duration?)) -(s/def ::conn some?) +(s/def ::conn (s/or :pool ::db/pool :connection some?)) (s/def ::priority ::us/integer) (s/def ::max-retries ::us/integer) @@ -641,36 +664,24 @@ (s/keys :req [::task ::conn] :opt [::delay ::queue ::priority ::max-retries])) -(defn- extract-props - [options] - (persistent! - (reduce-kv (fn [res k v] - (cond-> res - (not (qualified-keyword? k)) - (assoc! k v))) - (transient {}) - options))) - -(def ^:private sql:insert-new-task - "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) - values (?, ?, ?, ?, ?, ?, now() + ?) - returning id") - (defn submit! [& {:keys [::task ::delay ::queue ::priority ::max-retries ::conn] - :or {delay 0 queue "default" priority 100 max-retries 3} + :or {delay 0 queue :default priority 100 max-retries 3} :as options}] (us/verify ::submit-options options) (let [duration (dt/duration delay) interval (db/interval duration) props (-> options extract-props db/tjson) - id (uuid/next)] + id (uuid/next) + tenant (cf/get :tenant) + task (d/name task) + queue (str/ffmt "%:%" tenant (d/name queue))] (l/debug :hint "submit task" - :name (d/name task) + :name task :queue queue :in (dt/format-duration duration)) - (db/exec-one! conn [sql:insert-new-task id (d/name task) props + (db/exec-one! conn [sql:insert-new-task id task props queue priority max-retries interval]) id))