diff --git a/backend/src/uxbox/db.clj b/backend/src/uxbox/db.clj index 538356d7f..32363755a 100644 --- a/backend/src/uxbox/db.clj +++ b/backend/src/uxbox/db.clj @@ -6,6 +6,7 @@ (ns uxbox.db (:require + [clojure.data.json :as json] [clojure.string :as str] [clojure.tools.logging :as log] [lambdaisland.uri :refer [uri]] @@ -20,6 +21,7 @@ [uxbox.config :as cfg] [uxbox.util.data :as data]) (:import + org.postgresql.util.PGobject com.zaxxer.hikari.HikariConfig com.zaxxer.hikari.HikariDataSource)) @@ -112,3 +114,16 @@ (get-by-params ds table {:id id} nil)) ([ds table id opts] (get-by-params ds table {:id id} opts))) + +(defn pgobject? + [v] + (instance? PGobject v)) + +(defn decode-pgobject + [^PGobject obj] + (let [typ (.getType obj) + val (.getValue obj)] + (if (or (= typ "json") + (= typ "jsonb")) + (json/read-str val) + val))) diff --git a/backend/src/uxbox/emails.clj b/backend/src/uxbox/emails.clj index 7926ebb42..2f7aeac72 100644 --- a/backend/src/uxbox/emails.clj +++ b/backend/src/uxbox/emails.clj @@ -41,9 +41,9 @@ :reply-to (:sendmail-reply-to cfg/config)} data (merge defaults context) email (email-factory data)] - (tasks/schedule! conn {:name "sendmail" - :delay 0 - :props email})))) + (tasks/submit! conn {:name "sendmail" + :delay 0 + :props email})))) ;; --- Emails diff --git a/backend/src/uxbox/services/mutations/colors.clj b/backend/src/uxbox/services/mutations/colors.clj index 394ecd525..82428854f 100644 --- a/backend/src/uxbox/services/mutations/colors.clj +++ b/backend/src/uxbox/services/mutations/colors.clj @@ -104,9 +104,9 @@ (teams/check-edition-permissions! conn profile-id (:team-id lib)) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :color-library}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :color-library}}) (db/update! conn :color-library {:deleted-at (dt/now)} @@ -188,9 +188,9 @@ (teams/check-edition-permissions! conn profile-id (:team-id clr)) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :color}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :color}}) (db/update! conn :color {:deleted-at (dt/now)} diff --git a/backend/src/uxbox/services/mutations/demo.clj b/backend/src/uxbox/services/mutations/demo.clj index ea0a5ae56..aa319fa1f 100644 --- a/backend/src/uxbox/services/mutations/demo.clj +++ b/backend/src/uxbox/services/mutations/demo.clj @@ -38,8 +38,8 @@ :password password}) ;; Schedule deletion of the demo profile - (tasks/schedule! conn {:name "delete-profile" - :delay cfg/default-deletion-delay - :props {:profile-id id}}) + (tasks/submit! conn {:name "delete-profile" + :delay cfg/default-deletion-delay + :props {:profile-id id}}) {:email email :password password}))) diff --git a/backend/src/uxbox/services/mutations/files.clj b/backend/src/uxbox/services/mutations/files.clj index 4dffcb94d..c1723078a 100644 --- a/backend/src/uxbox/services/mutations/files.clj +++ b/backend/src/uxbox/services/mutations/files.clj @@ -113,9 +113,9 @@ (files/check-edition-permissions! conn profile-id id) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :file}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :file}}) (mark-file-deleted conn params))) diff --git a/backend/src/uxbox/services/mutations/icons.clj b/backend/src/uxbox/services/mutations/icons.clj index d8bad54a7..14e8c55f7 100644 --- a/backend/src/uxbox/services/mutations/icons.clj +++ b/backend/src/uxbox/services/mutations/icons.clj @@ -111,9 +111,9 @@ (teams/check-edition-permissions! conn profile-id (:team-id lib)) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :icon-library}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :icon-library}}) (db/update! conn :icon-library {:deleted-at (dt/now)} @@ -196,9 +196,9 @@ (teams/check-edition-permissions! conn profile-id (:team-id icn)) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :icon}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :icon}}) (db/update! conn :icon {:deleted-at (dt/now)} diff --git a/backend/src/uxbox/services/mutations/images.clj b/backend/src/uxbox/services/mutations/images.clj index 905fc8a23..69ee47e52 100644 --- a/backend/src/uxbox/services/mutations/images.clj +++ b/backend/src/uxbox/services/mutations/images.clj @@ -96,9 +96,9 @@ (teams/check-edition-permissions! conn profile-id (:team-id lib)) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :image-library}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :image-library}}) (db/update! conn :image-library {:deleted-at (dt/now)} @@ -226,9 +226,9 @@ (teams/check-edition-permissions! conn profile-id (:team-id img)) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :image}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :image}}) (db/update! conn :image {:deleted-at (dt/now)} diff --git a/backend/src/uxbox/services/mutations/pages.clj b/backend/src/uxbox/services/mutations/pages.clj index 8c570938e..54cd6fa91 100644 --- a/backend/src/uxbox/services/mutations/pages.clj +++ b/backend/src/uxbox/services/mutations/pages.clj @@ -242,9 +242,9 @@ (files/check-edition-permissions! conn profile-id (:file-id page)) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :page}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :page}}) (db/update! conn :page {:deleted-at (dt/now)} diff --git a/backend/src/uxbox/services/mutations/profile.clj b/backend/src/uxbox/services/mutations/profile.clj index e3e39cc3f..e880bb77a 100644 --- a/backend/src/uxbox/services/mutations/profile.clj +++ b/backend/src/uxbox/services/mutations/profile.clj @@ -155,8 +155,8 @@ ;; Schedule deletion of old photo (when (and (string? (:photo profile)) (not (str/blank? (:photo profile)))) - (tasks/schedule! conn {:name "remove-media" - :props {:path (:photo profile)}})) + (tasks/submit! conn {:name "remove-media" + :props {:path (:photo profile)}})) ;; Save new photo (update-profile-photo conn profile-id photo)))) @@ -363,9 +363,9 @@ (check-teams-ownership! conn profile-id) ;; Schedule a complete deletion of profile - (tasks/schedule! conn {:name "delete-profile" - :delay (dt/duration {:hours 48}) - :props {:profile-id profile-id}}) + (tasks/submit! conn {:name "delete-profile" + :delay (dt/duration {:hours 48}) + :props {:profile-id profile-id}}) (db/update! conn :profile {:deleted-at (dt/now)} diff --git a/backend/src/uxbox/services/mutations/projects.clj b/backend/src/uxbox/services/mutations/projects.clj index b23552aaa..382acefb1 100644 --- a/backend/src/uxbox/services/mutations/projects.clj +++ b/backend/src/uxbox/services/mutations/projects.clj @@ -124,9 +124,9 @@ (check-edition-permissions! conn profile-id id) ;; Schedule object deletion - (tasks/schedule! conn {:name "delete-object" - :delay cfg/default-deletion-delay - :props {:id id :type :project}}) + (tasks/submit! conn {:name "delete-object" + :delay cfg/default-deletion-delay + :props {:id id :type :project}}) (mark-project-deleted conn params))) diff --git a/backend/src/uxbox/tasks.clj b/backend/src/uxbox/tasks.clj index 4108a59d0..71c5457ed 100644 --- a/backend/src/uxbox/tasks.clj +++ b/backend/src/uxbox/tasks.clj @@ -17,11 +17,21 @@ [uxbox.config :as cfg] [uxbox.db :as db] [uxbox.tasks.sendmail] + [uxbox.tasks.gc] [uxbox.tasks.remove-media] [uxbox.tasks.delete-profile] [uxbox.tasks.delete-object] [uxbox.tasks.impl :as impl] - [uxbox.util.time :as dt])) + [uxbox.util.time :as dt]) + (:import + java.util.concurrent.ScheduledExecutorService + java.util.concurrent.Executors)) + +;; --- Scheduler Executor Initialization + +(defstate scheduler + :start (Executors/newScheduledThreadPool (int 1)) + :stop (.shutdownNow ^ScheduledExecutorService scheduler)) ;; --- State initialization @@ -36,33 +46,25 @@ "remove-media" #'uxbox.tasks.remove-media/handler "sendmail" #'uxbox.tasks.sendmail/handler}) +(def ^:private schedule + [{:id "remove-deleted-media" + :cron (dt/cron "1 1 */1 * * ? *") + :fn #'uxbox.tasks.gc/remove-media}]) + (defstate worker - :start (impl/start-worker! {:tasks tasks}) + :start (impl/start-worker! {:tasks tasks + :xtor scheduler}) + :stop (impl/stop! worker)) + +(defstate scheduler-worker + :start (impl/start-scheduler-worker! {:schedule schedule + :xtor scheduler}) :stop (impl/stop! worker)) ;; --- Public API -(defn schedule! - ([opts] (schedule! db/pool opts)) +(defn submit! + ([opts] (submit! db/pool opts)) ([conn opts] (s/assert ::impl/task-options opts) - (impl/schedule! conn opts))) - -;; (defstate scheduler -;; :start (impl/start-scheduler! tasks) -;; :stop (impl/stop! tasks-worker)) - -;; :start (as-> (impl/worker-verticle {:tasks tasks}) $$ -;; (vc/deploy! system $$ {:instances 1}) -;; (deref $$))) - -;; (def ^:private schedule -;; [{:id "every 1 hour" -;; :cron (dt/cron "1 1 */1 * * ? *") -;; :fn #'uxbox.tasks.gc/handler -;; :props {:foo 1}}]) - -;; (defstate scheduler -;; :start (as-> (impl/scheduler-verticle {:schedule schedule}) $$ -;; (vc/deploy! system $$ {:instances 1 :worker true}) -;; (deref $$))) + (impl/submit! conn opts))) diff --git a/backend/src/uxbox/tasks/gc.clj b/backend/src/uxbox/tasks/gc.clj index c4db5c9d9..690ef922d 100644 --- a/backend/src/uxbox/tasks/gc.clj +++ b/backend/src/uxbox/tasks/gc.clj @@ -9,8 +9,8 @@ (ns uxbox.tasks.gc (:require - [clojure.tools.logging :as log] [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] [cuerdas.core :as str] [postal.core :as postal] [promesa.core :as p] @@ -18,36 +18,47 @@ [uxbox.common.spec :as us] [uxbox.config :as cfg] [uxbox.db :as db] - [uxbox.util.blob :as blob])) + [uxbox.media :as media] + [uxbox.util.blob :as blob] + [uxbox.util.storage :as ust])) -;; TODO: delete media referenced in pendint_to_delete table +(def ^:private sql:delete-items + "with items_part as ( + select i.id + from pending_to_delete as i + order by i.created_at + limit ? + for update skip locked + ) + delete from pending_to_delete + where id in (select id from items_part) + returning *") -;; (def ^:private sql:delete-item -;; "with items_part as ( -;; select i.id -;; from pending_to_delete as i -;; order by i.created_at -;; limit 1 -;; for update skip locked -;; ) -;; delete from pending_to_delete -;; where id in (select id from items_part) -;; returning *") +(defn- impl-remove-media + [result] + (run! (fn [item] + (let [path1 (get item "path") + path2 (get item "thumb_path")] + (ust/delete! media/media-storage path1) + (ust/delete! media/media-storage path2))) + result)) -;; (defn- remove-items -;; [] -;; (vu/loop [] -;; (db/with-atomic [conn db/pool] -;; (-> (db/query-one conn sql:delete-item) -;; (p/then decode-row) -;; (p/then (vu/wrap-blocking remove-media)) -;; (p/then (fn [item] -;; (when (not (empty? items)) -;; (p/recur)))))))) +(defn- decode-row + [{:keys [data] :as row}] + (cond-> row + (db/pgobject? data) (assoc :data (db/decode-pgobject data)))) + +(defn- get-items + [conn] + (->> (db/exec! conn [sql:delete-items 10]) + (map decode-row) + (map :data))) + +(defn remove-media + [{:keys [props] :as task}] + (db/with-atomic [conn db/pool] + (loop [result (get-items conn)] + (when-not (empty? result) + (impl-remove-media result) + (recur (get-items conn)))))) -;; (defn- remove-media -;; [{:keys -;; (doseq [item files] -;; (ust/delete! media/media-storage (:path item)) -;; (ust/delete! media/media-storage (:thumb-path item))) -;; files) diff --git a/backend/src/uxbox/tasks/impl.clj b/backend/src/uxbox/tasks/impl.clj index 249a8bf64..121797e6f 100644 --- a/backend/src/uxbox/tasks/impl.clj +++ b/backend/src/uxbox/tasks/impl.clj @@ -13,6 +13,7 @@ [clojure.core.async :as a] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] + [promesa.exec :as px] [uxbox.common.spec :as us] [uxbox.common.uuid :as uuid] [uxbox.config :as cfg] @@ -20,14 +21,12 @@ [uxbox.util.blob :as blob] [uxbox.util.time :as dt]) (:import + java.util.concurrent.ScheduledExecutorService + java.util.concurrent.Executors java.time.Duration java.time.Instant java.util.Date)) -(defrecord Worker [stop] - java.lang.AutoCloseable - (close [_] (a/close! stop))) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Tasks ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -37,7 +36,8 @@ (with-out-str (.printStackTrace err (java.io.PrintWriter. *out*)))) -(def ^:private sql:mark-as-retry +(def ^:private + sql:mark-as-retry "update task set scheduled_at = clock_timestamp() + '5 seconds'::interval, error = ?, @@ -45,48 +45,32 @@ retry_num = retry_num + 1 where id = ?") -(defn- reschedule +(defn- mark-as-retry [conn task error] (let [explain (ex-message error) sqlv [sql:mark-as-retry explain (:id task)]] (db/exec-one! conn sqlv) nil)) -(def ^:private sql:mark-as-failed - "update task - set scheduled_at = clock_timestamp() + '5 seconds'::interval, - error = ?, - status = 'failed' - where id = ?;") - (defn- mark-as-failed [conn task error] - (let [explain (ex-message error) - sqlv [sql:mark-as-failed explain (:id task)]] - (db/exec-one! conn sqlv) + (let [explain (ex-message error)] + (db/update! conn :task + {:error explain + :status "failed"} + {:id (:id task)}) nil)) -(def ^:private sql:mark-as-completed - "update task - set completed_at = clock_timestamp(), - status = 'completed' - where id = ?") - (defn- mark-as-completed [conn task] - (db/exec-one! conn [sql:mark-as-completed (:id task)]) + (db/update! conn :task + {:completed-at (dt/now) + :status "completed"} + {:id (:id task)}) nil) -(defn- handle-task - [tasks {:keys [name] :as item}] - (let [task-fn (get tasks name)] - (if task-fn - (task-fn item) - (do - (log/warn "no task handler found for" (pr-str name)) - nil)))) - -(def ^:private sql:select-next-task +(def ^:private + sql:select-next-task "select * from task as t where t.scheduled_at <= now() and t.queue = ? @@ -108,6 +92,15 @@ (with-out-str (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*))))) +(defn- handle-task + [tasks {:keys [name] :as item}] + (let [task-fn (get tasks name)] + (if task-fn + (task-fn item) + (do + (log/warn "no task handler found for" (pr-str name)) + nil)))) + (defn- event-loop-fn [{:keys [tasks] :as options}] (let [queue (:queue options "default") @@ -125,156 +118,140 @@ (log-task-error item e) (if (>= (:retry-num item) max-retries) (mark-as-failed conn item e) - (reschedule conn item e))))))))) + (mark-as-retry conn item e))))))))) -(defn- start-worker-eventloop! - [options] - (let [stop (::stop options) - mbs (:max-batch-size options 10)] - (a/go-loop [] - (let [timeout (a/timeout 5000) - [val port] (a/alts! [stop timeout])] - (when (= port timeout) - (a/ mbs cnt)) - (recur (inc 1) - (event-loop-fn options)))))) - (recur)))))) - -(defn- duration->pginterval - [^Duration d] - (->> (/ (.toMillis d) 1000.0) - (format "%s seconds"))) - -(defn start-worker! - [options] - (let [stop (a/chan)] - (start-worker-eventloop! (assoc options ::stop stop)) - (->Worker stop))) - -(defn stop! - [worker] - (.close ^java.lang.AutoCloseable worker)) +(defn- execute-worker-task + [{:keys [::stop ::xtor poll-interval] + :or {poll-interval 5000} + :as opts}] + (try + (when-not @stop + (let [res (event-loop-fn opts)] + (if (= res ::handled) + (px/schedule! xtor 0 (partial execute-worker-task opts)) + (px/schedule! xtor poll-interval (partial execute-worker-task opts))))) + (catch Throwable e + (log/error "unexpected exception:" e) + (px/schedule! xtor poll-interval (partial execute-worker-task opts))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Scheduled Tasks ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; (def ^:privatr sql:upsert-scheduled-task -;; "insert into scheduled_task (id, cron_expr) -;; values ($1, $2) -;; on conflict (id) -;; do update set cron_expr=$2") +(def ^:private + sql:upsert-scheduled-task + "insert into scheduled_task (id, cron_expr) + values (?, ?) + on conflict (id) + do update set cron_expr=?") -;; (defn- synchronize-schedule-item -;; [conn {:keys [id cron]}] -;; (-> (db/query-one conn [sql:upsert-scheduled-task id (str cron)]) -;; (p/then' (constantly nil)))) +(defn- synchronize-schedule-item + [conn {:keys [id cron] :as item}] + (let [cron (str cron)] + (db/exec-one! conn [sql:upsert-scheduled-task id cron cron]))) -;; (defn- synchronize-schedule -;; [schedule] -;; (db/with-atomic [conn db/pool] -;; (p/run! (partial synchronize-schedule-item conn) schedule))) +(defn- synchronize-schedule! + [schedule] + (db/with-atomic [conn db/pool] + (run! (partial synchronize-schedule-item conn) schedule))) -;; (def ^:private sql:lock-scheduled-task -;; "select id from scheduled_task where id=$1 for update skip locked") +(def ^:private sql:lock-scheduled-task + "select id from scheduled_task where id=? for update skip locked") -;; (declare schedule-task) +(declare schedule-task!) -;; (defn- log-scheduled-task-error -;; [item err] -;; (log/error "Unhandled exception on scheduled task '" (:id item) "' \n" -;; (with-out-str -;; (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*))))) +(defn- log-scheduled-task-error + [item err] + (log/error "Unhandled exception on scheduled task '" (:id item) "' \n" + (with-out-str + (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*))))) -;; (defn- execute-scheduled-task -;; [{:keys [id cron] :as stask}] -;; (db/with-atomic [conn db/pool] -;; ;; First we try to lock the task in the database, if locking us -;; ;; successful, then we execute the scheduled task; if locking is -;; ;; not possible (because other instance is already locked id) we -;; ;; just skip it and schedule to be executed in the next slot. -;; (-> (db/query-one conn [sql:lock-scheduled-task id]) -;; (p/then (fn [result] -;; (when result -;; (-> (p/do! ((:fn stask) stask)) -;; (p/catch (fn [e] -;; (log-scheduled-task-error stask e) -;; nil)))))) -;; (p/finally (fn [v e] -;; (-> (vu/current-context) -;; (schedule-task stask))))))) -;; (defn ms-until-valid -;; [cron] -;; (s/assert dt/cron? cron) -;; (let [^Instant now (dt/now) -;; ^Instant next (dt/next-valid-instant-from cron now) -;; ^Duration duration (Duration/between now next)] -;; (.toMillis duration))) +(defn- execute-scheduled-task + [{:keys [id cron ::xtor] :as task}] + (try + (db/with-atomic [conn db/pool] + ;; First we try to lock the task in the database, if locking is + ;; successful, then we execute the scheduled task; if locking is + ;; not possible (because other instance is already locked id) we + ;; just skip it and schedule to be executed in the next slot. + (when (db/exec-one! conn [sql:lock-scheduled-task id]) + (log/info "Executing scheduled task" id) + ((:fn task) task))) -;; (defn- schedule-task -;; [ctx {:keys [cron] :as stask}] -;; (let [ms (ms-until-valid cron)] -;; (vt/schedule! ctx (assoc stask -;; :ctx ctx -;; ::vt/once true -;; ::vt/delay ms -;; ::vt/fn execute-scheduled-task)))) + (catch Throwable e + (log-scheduled-task-error task e)) + (finally + (schedule-task! xtor task)))) -;; (defn- on-scheduler-start -;; [ctx {:keys [schedule] :as options}] -;; (-> (synchronize-schedule schedule) -;; (p/then' (fn [_] -;; (run! #(schedule-task ctx %) schedule))))) +(defn ms-until-valid + [cron] + (s/assert dt/cron? cron) + (let [^Instant now (dt/now) + ^Instant next (dt/next-valid-instant-from cron now)] + (inst-ms (dt/duration-between now next)))) + +(defn- schedule-task! + [xtor {:keys [cron] :as task}] + (let [ms (ms-until-valid cron) + task (assoc task ::xtor xtor)] + (px/schedule! xtor ms (partial execute-scheduled-task task)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Public API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; --- Worker Verticle +(s/def ::id string?) +(s/def ::name string?) +(s/def ::cron dt/cron?) +(s/def ::fn (s/or :var var? :fn fn?)) +(s/def ::props (s/nilable map?)) +(s/def ::xtor #(instance? ScheduledExecutorService %)) -;; (s/def ::callable (s/or :fn fn? :var var?)) -;; (s/def ::max-batch-size ::us/integer) -;; (s/def ::max-retries ::us/integer) -;; (s/def ::tasks (s/map-of string? ::callable)) +(s/def ::scheduled-task + (s/keys :req-un [::id ::cron ::fn] + :opt-un [::props])) -;; (s/def ::worker-verticle-options -;; (s/keys :req-un [::tasks] -;; :opt-un [::queue ::max-batch-size])) +(s/def ::tasks (s/map-of string? ::fn)) +(s/def ::schedule (s/coll-of ::scheduled-task)) -;; (defn worker-verticle -;; [options] -;; (s/assert ::worker-verticle-options options) -;; (let [on-start #(on-worker-start % options)] -;; (vc/verticle {:on-start on-start}))) +(defn start-scheduler-worker! + [{:keys [schedule xtor] :as opts}] + (us/assert ::xtor xtor) + (us/assert ::schedule schedule) + (let [stop (atom false)] + (synchronize-schedule! schedule) + (run! (partial schedule-task! xtor) schedule) + (reify + java.lang.AutoCloseable + (close [_] + (reset! stop true))))) -;; --- Scheduler Verticle +(defn start-worker! + [{:keys [tasks xtor poll-interval] + :or {poll-interval 5000} + :as opts}] + (us/assert ::tasks tasks) + (us/assert ::xtor xtor) + (us/assert number? poll-interval) + (let [stop (atom false) + opts (assoc opts + ::xtor xtor + ::stop stop)] + (px/schedule! xtor poll-interval (partial execute-worker-task opts)) + (reify + java.lang.AutoCloseable + (close [_] + (reset! stop true))))) -;; (s/def ::id string?) -;; (s/def ::cron dt/cron?) -;; (s/def ::fn ::callable) -;; (s/def ::props (s/nilable map?)) +(defn stop! + [worker] + (.close ^java.lang.AutoCloseable worker)) -;; (s/def ::scheduled-task -;; (s/keys :req-un [::id ::cron ::fn] -;; :opt-un [::props])) -;; (s/def ::schedule (s/coll-of ::scheduled-task)) -;; (s/def ::scheduler-verticle-options -;; (s/keys :opt-un [::schedule])) -;; (defn scheduler-verticle -;; [options] -;; (s/assert ::scheduler-verticle-options options) -;; (let [on-start #(on-scheduler-start % options)] -;; (vc/verticle {:on-start on-start}))) -;; --- Schedule API +;; --- Submit API (s/def ::name ::us/string) (s/def ::delay @@ -290,7 +267,12 @@ values (?, ?, ?, ?, clock_timestamp()+cast(?::text as interval)) returning id") -(defn schedule! +(defn- duration->pginterval + [^Duration d] + (->> (/ (.toMillis d) 1000.0) + (format "%s seconds"))) + +(defn submit! [conn {:keys [name delay props queue key] :or {delay 0 props {} queue "default"} :as options}] @@ -299,9 +281,7 @@ pginterval (duration->pginterval duration) props (blob/encode props) id (uuid/next)] - (log/info "Schedule task" name - ;; "with props" (pr-str props) - "to be executed in" (str duration)) + (log/info "Submit task" name "to be executed in" (str duration)) (db/exec-one! conn [sql:insert-new-task id name props queue pginterval]) id)) diff --git a/docker/devenv/docker-compose.yaml b/docker/devenv/docker-compose.yaml index e773d4bc4..cfbcc2985 100644 --- a/docker/devenv/docker-compose.yaml +++ b/docker/devenv/docker-compose.yaml @@ -38,7 +38,6 @@ services: - 9090:9090 environment: - - CLOJURE_OPTS=-J-XX:-OmitStackTraceInFastThrow - UXBOX_DATABASE_URI=postgresql://postgres/uxbox - UXBOX_DATABASE_USERNAME=uxbox - UXBOX_DATABASE_PASSWORD=uxbox