diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index d85ece5bc..de31d7a93 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -13,6 +13,7 @@ [app.common.geom.point :as gpt] [app.common.spec :as us] [app.db.sql :as sql] + [app.metrics :as mtx] [app.util.json :as json] [app.util.migrations :as mg] [app.util.time :as dt] @@ -45,19 +46,21 @@ ;; Initialization ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(declare instrument-jdbc!) + (s/def ::uri ::us/not-empty-string) (s/def ::name ::us/not-empty-string) (s/def ::min-pool-size ::us/integer) (s/def ::max-pool-size ::us/integer) (s/def ::migrations map?) -(s/def ::metrics map?) (defmethod ig/pre-init-spec ::pool [_] - (s/keys :req-un [::uri ::name ::min-pool-size ::max-pool-size ::migrations])) + (s/keys :req-un [::uri ::name ::min-pool-size ::max-pool-size ::migrations ::mtx/metrics])) (defmethod ig/init-key ::pool - [_ {:keys [migrations] :as cfg}] - (log/debugf "initialize connection pool %s with uri %s" (:name cfg) (:uri cfg)) + [_ {:keys [migrations metrics] :as cfg}] + (log/infof "initialize connection pool '%s' with uri '%s'" (:name cfg) (:uri cfg)) + (instrument-jdbc! (:registry metrics)) (let [pool (create-pool cfg)] (when (seq migrations) (with-open [conn ^AutoCloseable (open pool)] @@ -70,6 +73,16 @@ [_ pool] (.close ^HikariDataSource pool)) +(defn- instrument-jdbc! + [registry] + (mtx/instrument-vars! + [#'next.jdbc/execute-one! + #'next.jdbc/execute!] + {:registry registry + :type :counter + :name "database_query_count" + :help "An absolute counter of database queries."})) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; API & Impl ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 9ed384a8d..03483c38e 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -174,46 +174,56 @@ :app.worker/worker {:executor (ig/ref :app.worker/executor) :pool (ig/ref :app.db/pool) - :tasks (ig/ref :app.tasks/all)} + :tasks (ig/ref :app.tasks/registry)} :app.worker/scheduler {:executor (ig/ref :app.worker/executor) :pool (ig/ref :app.db/pool) + :tasks (ig/ref :app.tasks/registry) :schedule [{:id "file-media-gc" :cron #app/cron "0 0 0 */1 * ? *" ;; daily - :fn (ig/ref :app.tasks.file-media-gc/handler)} + :task :file-media-gc} {:id "file-xlog-gc" :cron #app/cron "0 0 */1 * * ?" ;; hourly - :fn (ig/ref :app.tasks.file-xlog-gc/handler)} + :task :file-xlog-gc} {:id "storage-deleted-gc" :cron #app/cron "0 0 1 */1 * ?" ;; daily (1 hour shift) - :fn (ig/ref :app.storage/gc-deleted-task)} + :task :storage-deleted-gc} {:id "storage-touched-gc" :cron #app/cron "0 0 2 */1 * ?" ;; daily (2 hour shift) - :fn (ig/ref :app.storage/gc-touched-task)} + :task :storage-touched-gc} {:id "storage-recheck" :cron #app/cron "0 0 */1 * * ?" ;; hourly - :fn (ig/ref :app.storage/recheck-task)} + :task :storage-recheck} {:id "tasks-gc" :cron #app/cron "0 0 0 */1 * ?" ;; daily - :fn (ig/ref :app.tasks.tasks-gc/handler)} + :task :tasks-gc} (when (:telemetry-enabled config) {:id "telemetry" :cron #app/cron "0 0 */6 * * ?" ;; every 6h :uri (:telemetry-uri config) - :fn (ig/ref :app.tasks.telemetry/handler)})]} + :task :telemetry})]} - :app.tasks/all - {"sendmail" (ig/ref :app.tasks.sendmail/handler) - "delete-object" (ig/ref :app.tasks.delete-object/handler) - "delete-profile" (ig/ref :app.tasks.delete-profile/handler)} + :app.tasks/registry + {:metrics (ig/ref :app.metrics/metrics) + :tasks + {:sendmail (ig/ref :app.tasks.sendmail/handler) + :delete-object (ig/ref :app.tasks.delete-object/handler) + :delete-profile (ig/ref :app.tasks.delete-profile/handler) + :file-media-gc (ig/ref :app.tasks.file-media-gc/handler) + :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) + :storage-deleted-gc (ig/ref :app.storage/gc-deleted-task) + :storage-touched-gc (ig/ref :app.storage/gc-touched-task) + :storage-recheck (ig/ref :app.storage/recheck-task) + :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) + :telemetry (ig/ref :app.tasks.telemetry/handler)}} :app.tasks.sendmail/handler {:host (:smtp-host config) diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index a7e866068..e71283ca2 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -10,17 +10,15 @@ (ns app.metrics (:require [app.common.exceptions :as ex] - [app.util.time :as dt] - [app.worker] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] - [integrant.core :as ig] - [next.jdbc :as jdbc]) + [integrant.core :as ig]) (:import io.prometheus.client.CollectorRegistry io.prometheus.client.Counter io.prometheus.client.Gauge io.prometheus.client.Summary + io.prometheus.client.Histogram io.prometheus.client.exporter.common.TextFormat io.prometheus.client.hotspot.DefaultExports io.prometheus.client.jetty.JettyStatisticsCollector @@ -30,41 +28,12 @@ (declare instrument-vars!) (declare instrument) (declare create-registry) - +(declare create) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Entry Point ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn- instrument-jdbc! - [registry] - (instrument-vars! - [#'next.jdbc/execute-one! - #'next.jdbc/execute!] - {:registry registry - :type :counter - :name "database_query_counter" - :help "An absolute counter of database queries."})) - -(defn- instrument-workers! - [registry] - (instrument-vars! - [#'app.worker/run-task] - {:registry registry - :type :summary - :name "worker_task_checkout_millis" - :help "Latency measured between scheduld_at and execution time." - :wrap (fn [rootf mobj] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn [tasks item] - (let [now (inst-ms (dt/now)) - sat (inst-ms (:scheduled-at item))] - (mobj :observe (- now sat)) - (origf tasks item))) - {::original origf})))})) - (defn- handler [registry _request] (let [samples (.metricFamilySamples ^CollectorRegistry registry) @@ -73,13 +42,24 @@ {:headers {"content-type" TextFormat/CONTENT_TYPE_004} :body (.toString writer)})) +(s/def ::definitions + (s/map-of keyword? map?)) + +(defmethod ig/pre-init-spec ::metrics [_] + (s/keys :opt-un [::definitions])) + (defmethod ig/init-key ::metrics - [_ _cfg] + [_ {:keys [definitions] :as cfg}] (log/infof "Initializing prometheus registry and instrumentation.") - (let [registry (create-registry)] - (instrument-workers! registry) - (instrument-jdbc! registry) + (let [registry (create-registry) + definitions (reduce-kv (fn [res k v] + (->> (assoc v :registry registry) + (create) + (assoc res k))) + {} + definitions)] {:handler (partial handler registry) + :definitions definitions :registry registry})) (s/def ::handler fn?) @@ -87,7 +67,6 @@ (s/def ::metrics (s/keys :req-un [::registry ::handler])) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Implementation ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -126,7 +105,7 @@ (invoke [_ cmd labels] (.. ^Counter instance - (labels labels) + (labels (into-array String labels)) (inc)))))) (defn make-gauge @@ -150,19 +129,27 @@ :dec (.dec ^Gauge instance))) (invoke [_ cmd labels] - (case cmd - :inc (.. ^Gauge instance (labels labels) (inc)) - :dec (.. ^Gauge instance (labels labels) (dec))))))) + (let [labels (into-array String [labels])] + (case cmd + :inc (.. ^Gauge instance (labels labels) (inc)) + :dec (.. ^Gauge instance (labels labels) (dec)))))))) + +(def default-quantiles + [[0.75 0.02] + [0.99 0.001]]) (defn make-summary - [{:keys [name help registry reg labels max-age] :or {max-age 3600} :as props}] + [{:keys [name help registry reg labels max-age quantiles buckets] + :or {max-age 3600 buckets 6 quantiles default-quantiles} :as props}] (let [registry (or registry reg) instance (doto (Summary/build) (.name name) - (.help help) - (.maxAgeSeconds max-age) - (.quantile 0.75 0.02) - (.quantile 0.99 0.001)) + (.help help)) + _ (when (seq quantiles) + (.maxAgeSeconds ^Summary instance max-age) + (.ageBuckets ^Summary instance buckets)) + _ (doseq [[q e] quantiles] + (.quantile ^Summary instance q e)) _ (when (seq labels) (.labelNames instance (into-array String labels))) instance (.register instance registry)] @@ -176,7 +163,34 @@ (invoke [_ cmd val labels] (.. ^Summary instance - (labels labels) + (labels (into-array String labels)) + (observe val)))))) + +(def default-histogram-buckets + [1 5 10 25 50 75 100 250 500 750 1000 2500 5000 7500]) + +(defn make-histogram + [{:keys [name help registry reg labels buckets] + :or {buckets default-histogram-buckets}}] + (let [registry (or registry reg) + instance (doto (Histogram/build) + (.name name) + (.help help) + (.buckets (into-array Double/TYPE buckets))) + _ (when (seq labels) + (.labelNames instance (into-array String labels))) + instance (.register instance registry)] + (reify + clojure.lang.IDeref + (deref [_] instance) + + clojure.lang.IFn + (invoke [_ cmd val] + (.observe ^Histogram instance val)) + + (invoke [_ cmd val labels] + (.. ^Histogram instance + (labels (into-array String labels)) (observe val)))))) (defn create @@ -184,7 +198,8 @@ (case type :counter (make-counter props) :gauge (make-gauge props) - :summary (make-summary props))) + :summary (make-summary props) + :histogram (make-histogram props))) (defn wrap-counter ([rootf mobj] @@ -204,7 +219,6 @@ (assoc mdata ::original origf)))) ([rootf mobj labels] (let [mdata (meta rootf) - labels (into-array String labels) origf (::original mdata rootf)] (with-meta (fn @@ -241,7 +255,6 @@ ([rootf mobj labels] (let [mdata (meta rootf) - labels (into-array String labels) origf (::original mdata rootf)] (with-meta (fn @@ -284,6 +297,9 @@ (instance? Summary @obj) ((or wrap wrap-summary) f obj) + (instance? Histogram @obj) + ((or wrap wrap-summary) f obj) + :else (ex/raise :type :not-implemented)))) diff --git a/backend/src/app/tasks.clj b/backend/src/app/tasks.clj index 2a3eca68d..9ac9a3a8b 100644 --- a/backend/src/app/tasks.clj +++ b/backend/src/app/tasks.clj @@ -5,17 +5,19 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020 UXBOX Labs SL +;; Copyright (c) 2020-2021 UXBOX Labs SL (ns app.tasks (:require [app.common.spec :as us] [app.common.uuid :as uuid] [app.db :as db] - ;; [app.metrics :as mtx] + [app.metrics :as mtx] [app.util.time :as dt] + [app.worker] [clojure.spec.alpha :as s] - [clojure.tools.logging :as log])) + [clojure.tools.logging :as log] + [integrant.core :as ig])) (s/def ::name ::us/string) (s/def ::delay @@ -41,11 +43,68 @@ interval (db/interval duration) props (db/tjson props) id (uuid/next)] - (log/infof "Submit task '%s' to be executed in '%s'." name (str duration)) + (log/debugf "submit task '%s' to be executed in '%s'" name (str duration)) (db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval]) id)) -;; (mtx/instrument-with-counter! -;; {:var #'submit! -;; :id "tasks__submit_counter" -;; :help "Absolute task submit counter."}) +(defn- instrument! + [registry] + (mtx/instrument-vars! + [#'submit!] + {:registry registry + :type :counter + :labels ["name"] + :name "tasks_submit_counter" + :help "An absolute counter of task submissions." + :wrap (fn [rootf mobj] + (let [mdata (meta rootf) + origf (::original mdata rootf)] + (with-meta + (fn [conn params] + (let [tname (:name params)] + (mobj :inc [tname]) + (origf conn params))) + {::original origf})))}) + + (mtx/instrument-vars! + [#'app.worker/run-task] + {:registry registry + :type :summary + :quantiles [] + :name "tasks_checkout_timing" + :help "Latency measured between scheduld_at and execution time." + :wrap (fn [rootf mobj] + (let [mdata (meta rootf) + origf (::original mdata rootf)] + (with-meta + (fn [tasks item] + (let [now (inst-ms (dt/now)) + sat (inst-ms (:scheduled-at item))] + (mobj :observe (- now sat)) + (origf tasks item))) + {::original origf})))})) + +;; --- STATE INIT: REGISTRY + +(s/def ::tasks + (s/map-of keyword? fn?)) + +(defmethod ig/pre-init-spec ::registry [_] + (s/keys :req-un [::mtx/metrics ::tasks])) + +(defmethod ig/init-key ::registry + [_ {:keys [metrics tasks]}] + (instrument! (:registry metrics)) + (let [mobj (mtx/create + {:registry (:registry metrics) + :type :summary + :labels ["name"] + :quantiles [] + :name "tasks_timing" + :help "Background task execution timing."})] + (reduce-kv (fn [res k v] + (let [tname (name k)] + (log/debugf "registring task '%s'" tname) + (assoc res tname (mtx/wrap-summary v mobj [tname])))) + {} + tasks))) diff --git a/backend/src/app/tasks/delete_object.clj b/backend/src/app/tasks/delete_object.clj index 602c7182c..78fd47007 100644 --- a/backend/src/app/tasks/delete_object.clj +++ b/backend/src/app/tasks/delete_object.clj @@ -12,36 +12,26 @@ (:require [app.common.spec :as us] [app.db :as db] - [app.metrics :as mtx] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig])) -(declare handler) (declare handle-deletion) (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool ::mtx/metrics])) + (s/keys :req-un [::db/pool])) (defmethod ig/init-key ::handler - [_ {:keys [metrics] :as cfg}] - (let [handler #(handler cfg %)] - (->> {:registry (:registry metrics) - :type :summary - :name "task_delete_object_timing" - :help "delete object task timing"} - (mtx/instrument handler)))) + [_ {:keys [pool] :as cfg}] + (fn [{:keys [props] :as task}] + (us/verify ::props props) + (db/with-atomic [conn pool] + (handle-deletion conn props)))) (s/def ::type ::us/keyword) (s/def ::id ::us/uuid) (s/def ::props (s/keys :req-un [::id ::type])) -(defn- handler - [{:keys [pool]} {:keys [props] :as task}] - (us/verify ::props props) - (db/with-atomic [conn pool] - (handle-deletion conn props))) - (defmulti handle-deletion (fn [_ props] (:type props))) diff --git a/backend/src/app/tasks/delete_profile.clj b/backend/src/app/tasks/delete_profile.clj index f2b2030a6..923ccf814 100644 --- a/backend/src/app/tasks/delete_profile.clj +++ b/backend/src/app/tasks/delete_profile.clj @@ -13,27 +13,16 @@ [app.common.spec :as us] [app.db :as db] [app.db.sql :as sql] - [app.metrics :as mtx] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig])) (declare delete-profile-data) -(declare handler) ;; --- INIT (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool ::mtx/metrics])) - -(defmethod ig/init-key ::handler - [_ {:keys [metrics] :as cfg}] - (let [handler #(handler cfg %)] - (->> {:registry (:registry metrics) - :type :summary - :name "task_delete_profile_timing" - :help "delete profile task timing"} - (mtx/instrument handler)))) + (s/keys :req-un [::db/pool])) ;; This task is responsible to permanently delete a profile with all ;; the dependent data. As step (1) we delete all owned teams of the @@ -48,16 +37,17 @@ (s/def ::profile-id ::us/uuid) (s/def ::props (s/keys :req-un [::profile-id])) -(defn handler - [{:keys [pool]} {:keys [props] :as task}] - (us/verify ::props props) - (db/with-atomic [conn pool] - (let [id (:profile-id props) - profile (db/exec-one! conn (sql/select :profile {:id id} {:for-update true}))] - (if (or (:is-demo profile) - (:deleted-at profile)) - (delete-profile-data conn id) - (log/warnf "Profile %s does not match constraints for deletion" id))))) +(defmethod ig/init-key ::handler + [_ {:keys [pool] :as cfg}] + (fn [{:keys [props] :as task}] + (us/verify ::props props) + (db/with-atomic [conn pool] + (let [id (:profile-id props) + profile (db/exec-one! conn (sql/select :profile {:id id} {:for-update true}))] + (if (or (:is-demo profile) + (:deleted-at profile)) + (delete-profile-data conn id) + (log/warnf "profile '%s' does not match constraints for deletion" id)))))) ;; --- IMPL diff --git a/backend/src/app/tasks/file_media_gc.clj b/backend/src/app/tasks/file_media_gc.clj index f13f244c0..eebd434b5 100644 --- a/backend/src/app/tasks/file_media_gc.clj +++ b/backend/src/app/tasks/file_media_gc.clj @@ -5,7 +5,7 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020 UXBOX Labs SL +;; Copyright (c) 2020-2021 UXBOX Labs SL (ns app.tasks.file-media-gc "A maintenance task that is responsible to purge the unused media @@ -14,44 +14,34 @@ (:require [app.common.pages.migrations :as pmg] [app.db :as db] - [app.metrics :as mtx] [app.util.blob :as blob] [app.util.time :as dt] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig])) -(declare handler) (declare process-file) (declare retrieve-candidates) (s/def ::max-age ::dt/duration) (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool ::mtx/metrics ::max-age])) + (s/keys :req-un [::db/pool ::max-age])) (defmethod ig/init-key ::handler - [_ {:keys [metrics] :as cfg}] - (let [handler #(handler cfg %)] - (->> {:registry (:registry metrics) - :type :summary - :name "task_file_media_gc_timing" - :help "file media garbage collection task timing"} - (mtx/instrument handler)))) - -(defn- handler - [{:keys [pool] :as cfg} _] - (db/with-atomic [conn pool] - (let [cfg (assoc cfg :conn conn)] - (loop [n 0] - (let [files (retrieve-candidates cfg)] - (if (seq files) - (do - (run! (partial process-file cfg) files) - (recur (+ n (count files)))) - (do - (log/infof "finalized with total of %s processed files" n) - {:processed n}))))))) + [_ {:keys [pool] :as cfg}] + (fn [_] + (db/with-atomic [conn pool] + (let [cfg (assoc cfg :conn conn)] + (loop [n 0] + (let [files (retrieve-candidates cfg)] + (if (seq files) + (do + (run! (partial process-file cfg) files) + (recur (+ n (count files)))) + (do + (log/debugf "finalized with total of %s processed files" n) + {:processed n})))))))) (def ^:private sql:retrieve-candidates-chunk diff --git a/backend/src/app/tasks/file_xlog_gc.clj b/backend/src/app/tasks/file_xlog_gc.clj index 4b90200c3..d333f2ac5 100644 --- a/backend/src/app/tasks/file_xlog_gc.clj +++ b/backend/src/app/tasks/file_xlog_gc.clj @@ -5,45 +5,36 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020 UXBOX Labs SL +;; Copyright (c) 2020-2021 UXBOX Labs SL (ns app.tasks.file-xlog-gc "A maintenance task that performs a garbage collection of the file change (transaction) log." (:require [app.db :as db] - [app.metrics :as mtx] [app.util.time :as dt] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig])) -(declare handler) +(declare sql:delete-files-xlog) (s/def ::max-age ::dt/duration) (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool ::mtx/metrics ::max-age])) + (s/keys :req-un [::db/pool ::max-age])) (defmethod ig/init-key ::handler - [_ {:keys [metrics] :as cfg}] - (let [handler #(handler cfg %)] - (->> {:registry (:registry metrics) - :type :summary - :name "task_file_xlog_gc_timing" - :help "file changes garbage collection task timing"} - (mtx/instrument handler)))) + [_ {:keys [pool max-age] :as cfg}] + (fn [_] + (db/with-atomic [conn pool] + (let [interval (db/interval max-age) + result (db/exec-one! conn [sql:delete-files-xlog interval]) + result (:next.jdbc/update-count result)] + (log/debugf "removed %s rows from file-change table" result) + result)))) (def ^:private sql:delete-files-xlog "delete from file_change where created_at < now() - ?::interval") - -(defn- handler - [{:keys [pool max-age]} _] - (db/with-atomic [conn pool] - (let [interval (db/interval max-age) - result (db/exec-one! conn [sql:delete-files-xlog interval]) - result (:next.jdbc/update-count result)] - (log/infof "removed %s rows from file_change table" result) - nil))) diff --git a/backend/src/app/tasks/sendmail.clj b/backend/src/app/tasks/sendmail.clj index 78315a2b7..0619b75a2 100644 --- a/backend/src/app/tasks/sendmail.clj +++ b/backend/src/app/tasks/sendmail.clj @@ -10,13 +10,12 @@ (ns app.tasks.sendmail (:require [app.config :as cfg] - [app.metrics :as mtx] [app.util.emails :as emails] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig])) -(declare handler) +(declare send-console!) (s/def ::username ::cfg/smtp-username) (s/def ::password ::cfg/smtp-password) @@ -29,7 +28,7 @@ (s/def ::enabled ::cfg/smtp-enabled) (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::enabled ::mtx/metrics] + (s/keys :req-un [::enabled] :opt-un [::username ::password ::tls @@ -40,13 +39,11 @@ ::default-reply-to])) (defmethod ig/init-key ::handler - [_ {:keys [metrics] :as cfg}] - (let [handler #(handler cfg %)] - (->> {:registry (:registry metrics) - :type :summary - :name "task_sendmail_timing" - :help "sendmail task timing"} - (mtx/instrument handler)))) + [_ cfg] + (fn [{:keys [props] :as task}] + (if (:enabled cfg) + (emails/send! cfg props) + (send-console! cfg props)))) (defn- send-console! [cfg email] @@ -59,9 +56,3 @@ (println (.toString baos)) (println "******** end email "(:id email) "**********"))] (log/info out)))) - -(defn handler - [cfg {:keys [props] :as task}] - (if (:enabled cfg) - (emails/send! cfg props) - (send-console! cfg props))) diff --git a/backend/src/app/tasks/tasks_gc.clj b/backend/src/app/tasks/tasks_gc.clj index 975bfea8c..3ff4e8db0 100644 --- a/backend/src/app/tasks/tasks_gc.clj +++ b/backend/src/app/tasks/tasks_gc.clj @@ -5,46 +5,36 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020 UXBOX Labs SL +;; Copyright (c) 2020-2021 UXBOX Labs SL (ns app.tasks.tasks-gc "A maintenance task that performs a cleanup of already executed tasks from the database table." (:require [app.db :as db] - [app.metrics :as mtx] [app.util.time :as dt] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig])) -(declare handler) +(declare sql:delete-completed-tasks) (s/def ::max-age ::dt/duration) (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool ::mtx/metrics ::max-age])) + (s/keys :req-un [::db/pool ::max-age])) (defmethod ig/init-key ::handler - [_ {:keys [metrics] :as cfg}] - (let [handler #(handler cfg %)] - (->> {:registry (:registry metrics) - :type :summary - :name "task_tasks_gc_timing" - :help "tasks garbage collection task timing"} - (mtx/instrument handler)))) + [_ {:keys [pool max-age] :as cfg}] + (fn [_] + (db/with-atomic [conn pool] + (let [interval (db/interval max-age) + result (db/exec-one! conn [sql:delete-completed-tasks interval]) + result (:next.jdbc/update-count result)] + (log/debugf "removed %s rows from tasks-completed table" result) + result)))) (def ^:private sql:delete-completed-tasks "delete from task_completed where scheduled_at < now() - ?::interval") - -(defn- handler - [{:keys [pool max-age]} _] - (db/with-atomic [conn pool] - (let [interval (db/interval max-age) - result (db/exec-one! conn [sql:delete-completed-tasks interval]) - result (:next.jdbc/update-count result)] - (log/infof "removed %s rows from tasks_completed table" result) - nil))) - diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 9b7cdc478..fe078ba02 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -10,6 +10,7 @@ (ns app.worker "Async tasks abstraction (impl)." (:require + [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uuid :as uuid] [app.db :as db] @@ -19,6 +20,7 @@ [clojure.core.async :as a] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] + [cuerdas.core :as str] [integrant.core :as ig] [promesa.exec :as px]) (:import @@ -72,7 +74,7 @@ (s/def ::queue ::us/string) (s/def ::parallelism ::us/integer) (s/def ::batch-size ::us/integer) -(s/def ::tasks (s/map-of string? ::us/fn)) +(s/def ::tasks (s/map-of string? fn?)) (s/def ::poll-interval ::dt/duration) (defmethod ig/pre-init-spec ::worker [_] @@ -289,21 +291,31 @@ (s/def ::id ::us/string) (s/def ::cron dt/cron?) (s/def ::props (s/nilable map?)) +(s/def ::task keyword?) (s/def ::scheduled-task-spec - (s/keys :req-un [::id ::cron ::fn] + (s/keys :req-un [::id ::cron ::task] :opt-un [::props])) -(s/def ::schedule - (s/coll-of (s/nilable ::scheduled-task-spec))) +(s/def ::schedule (s/coll-of (s/nilable ::scheduled-task-spec))) (defmethod ig/pre-init-spec ::scheduler [_] - (s/keys :req-un [::executor ::db/pool ::schedule])) + (s/keys :req-un [::executor ::db/pool ::schedule ::tasks])) (defmethod ig/init-key ::scheduler - [_ {:keys [schedule] :as cfg}] + [_ {:keys [schedule tasks] :as cfg}] (let [scheduler (Executors/newScheduledThreadPool (int 1)) - schedule (filter some? schedule) + schedule (->> schedule + (filter some?) + (map (fn [{:keys [task] :as item}] + (let [f (get tasks (name task))] + (when-not f + (ex/raise :type :internal + :code :task-not-found + :hint (str/fmt "task %s not configured" task))) + (-> item + (dissoc :task) + (assoc :fn f)))))) cfg (assoc cfg :scheduler scheduler :schedule schedule)] @@ -351,27 +363,16 @@ (letfn [(run-task [conn] (try (when (db/exec-one! conn [sql:lock-scheduled-task id]) - (log/info "Executing scheduled task" id) + (log/debugf "executing scheduled task '%s'" id) ((:fn task) task)) - (catch Exception e + (catch Throwable e e))) - (handle-task* [conn] - (let [result (run-task conn)] - (if (instance? Throwable result) - (do - (log/warnf result "unhandled exception on scheduled task '%s'" id) - (db/insert! conn :scheduled-task-history - {:id (uuid/next) - :task-id id - :is-error true - :reason (exception->string result)})) - (db/insert! conn :scheduled-task-history - {:id (uuid/next) - :task-id id})))) (handle-task [] (db/with-atomic [conn pool] - (handle-task* conn)))] + (let [result (run-task conn)] + (when (ex/exception? result) + (log/errorf result "unhandled exception on scheduled task '%s'" id)))))] (try (px/run! executor handle-task) diff --git a/common/app/common/exceptions.cljc b/common/app/common/exceptions.cljc index 389178255..96782de95 100644 --- a/common/app/common/exceptions.cljc +++ b/common/app/common/exceptions.cljc @@ -52,3 +52,7 @@ (defn ex-info? [v] (instance? #?(:clj clojure.lang.ExceptionInfo :cljs cljs.core.ExceptionInfo) v)) + +(defn exception? + [v] + (instance? #?(:clj java.lang.Throwable :cljs js/Error) v))