Improve tasks metrics.

This commit is contained in:
Andrey Antukh 2021-02-22 12:46:15 +01:00 committed by Andrés Moya
parent a63f28a2e5
commit fbe2e2a285
12 changed files with 264 additions and 219 deletions

View file

@ -13,6 +13,7 @@
[app.common.geom.point :as gpt] [app.common.geom.point :as gpt]
[app.common.spec :as us] [app.common.spec :as us]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.metrics :as mtx]
[app.util.json :as json] [app.util.json :as json]
[app.util.migrations :as mg] [app.util.migrations :as mg]
[app.util.time :as dt] [app.util.time :as dt]
@ -45,19 +46,21 @@
;; Initialization ;; Initialization
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare instrument-jdbc!)
(s/def ::uri ::us/not-empty-string) (s/def ::uri ::us/not-empty-string)
(s/def ::name ::us/not-empty-string) (s/def ::name ::us/not-empty-string)
(s/def ::min-pool-size ::us/integer) (s/def ::min-pool-size ::us/integer)
(s/def ::max-pool-size ::us/integer) (s/def ::max-pool-size ::us/integer)
(s/def ::migrations map?) (s/def ::migrations map?)
(s/def ::metrics map?)
(defmethod ig/pre-init-spec ::pool [_] (defmethod ig/pre-init-spec ::pool [_]
(s/keys :req-un [::uri ::name ::min-pool-size ::max-pool-size ::migrations])) (s/keys :req-un [::uri ::name ::min-pool-size ::max-pool-size ::migrations ::mtx/metrics]))
(defmethod ig/init-key ::pool (defmethod ig/init-key ::pool
[_ {:keys [migrations] :as cfg}] [_ {:keys [migrations metrics] :as cfg}]
(log/debugf "initialize connection pool %s with uri %s" (:name cfg) (:uri cfg)) (log/infof "initialize connection pool '%s' with uri '%s'" (:name cfg) (:uri cfg))
(instrument-jdbc! (:registry metrics))
(let [pool (create-pool cfg)] (let [pool (create-pool cfg)]
(when (seq migrations) (when (seq migrations)
(with-open [conn ^AutoCloseable (open pool)] (with-open [conn ^AutoCloseable (open pool)]
@ -70,6 +73,16 @@
[_ pool] [_ pool]
(.close ^HikariDataSource pool)) (.close ^HikariDataSource pool))
(defn- instrument-jdbc!
[registry]
(mtx/instrument-vars!
[#'next.jdbc/execute-one!
#'next.jdbc/execute!]
{:registry registry
:type :counter
:name "database_query_count"
:help "An absolute counter of database queries."}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API & Impl ;; API & Impl
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View file

@ -174,46 +174,56 @@
:app.worker/worker :app.worker/worker
{:executor (ig/ref :app.worker/executor) {:executor (ig/ref :app.worker/executor)
:pool (ig/ref :app.db/pool) :pool (ig/ref :app.db/pool)
:tasks (ig/ref :app.tasks/all)} :tasks (ig/ref :app.tasks/registry)}
:app.worker/scheduler :app.worker/scheduler
{:executor (ig/ref :app.worker/executor) {:executor (ig/ref :app.worker/executor)
:pool (ig/ref :app.db/pool) :pool (ig/ref :app.db/pool)
:tasks (ig/ref :app.tasks/registry)
:schedule :schedule
[{:id "file-media-gc" [{:id "file-media-gc"
:cron #app/cron "0 0 0 */1 * ? *" ;; daily :cron #app/cron "0 0 0 */1 * ? *" ;; daily
:fn (ig/ref :app.tasks.file-media-gc/handler)} :task :file-media-gc}
{:id "file-xlog-gc" {:id "file-xlog-gc"
:cron #app/cron "0 0 */1 * * ?" ;; hourly :cron #app/cron "0 0 */1 * * ?" ;; hourly
:fn (ig/ref :app.tasks.file-xlog-gc/handler)} :task :file-xlog-gc}
{:id "storage-deleted-gc" {:id "storage-deleted-gc"
:cron #app/cron "0 0 1 */1 * ?" ;; daily (1 hour shift) :cron #app/cron "0 0 1 */1 * ?" ;; daily (1 hour shift)
:fn (ig/ref :app.storage/gc-deleted-task)} :task :storage-deleted-gc}
{:id "storage-touched-gc" {:id "storage-touched-gc"
:cron #app/cron "0 0 2 */1 * ?" ;; daily (2 hour shift) :cron #app/cron "0 0 2 */1 * ?" ;; daily (2 hour shift)
:fn (ig/ref :app.storage/gc-touched-task)} :task :storage-touched-gc}
{:id "storage-recheck" {:id "storage-recheck"
:cron #app/cron "0 0 */1 * * ?" ;; hourly :cron #app/cron "0 0 */1 * * ?" ;; hourly
:fn (ig/ref :app.storage/recheck-task)} :task :storage-recheck}
{:id "tasks-gc" {:id "tasks-gc"
:cron #app/cron "0 0 0 */1 * ?" ;; daily :cron #app/cron "0 0 0 */1 * ?" ;; daily
:fn (ig/ref :app.tasks.tasks-gc/handler)} :task :tasks-gc}
(when (:telemetry-enabled config) (when (:telemetry-enabled config)
{:id "telemetry" {:id "telemetry"
:cron #app/cron "0 0 */6 * * ?" ;; every 6h :cron #app/cron "0 0 */6 * * ?" ;; every 6h
:uri (:telemetry-uri config) :uri (:telemetry-uri config)
:fn (ig/ref :app.tasks.telemetry/handler)})]} :task :telemetry})]}
:app.tasks/all :app.tasks/registry
{"sendmail" (ig/ref :app.tasks.sendmail/handler) {:metrics (ig/ref :app.metrics/metrics)
"delete-object" (ig/ref :app.tasks.delete-object/handler) :tasks
"delete-profile" (ig/ref :app.tasks.delete-profile/handler)} {:sendmail (ig/ref :app.tasks.sendmail/handler)
:delete-object (ig/ref :app.tasks.delete-object/handler)
:delete-profile (ig/ref :app.tasks.delete-profile/handler)
:file-media-gc (ig/ref :app.tasks.file-media-gc/handler)
:file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler)
:storage-deleted-gc (ig/ref :app.storage/gc-deleted-task)
:storage-touched-gc (ig/ref :app.storage/gc-touched-task)
:storage-recheck (ig/ref :app.storage/recheck-task)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler)}}
:app.tasks.sendmail/handler :app.tasks.sendmail/handler
{:host (:smtp-host config) {:host (:smtp-host config)

View file

@ -10,17 +10,15 @@
(ns app.metrics (ns app.metrics
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.util.time :as dt]
[app.worker]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig] [integrant.core :as ig])
[next.jdbc :as jdbc])
(:import (:import
io.prometheus.client.CollectorRegistry io.prometheus.client.CollectorRegistry
io.prometheus.client.Counter io.prometheus.client.Counter
io.prometheus.client.Gauge io.prometheus.client.Gauge
io.prometheus.client.Summary io.prometheus.client.Summary
io.prometheus.client.Histogram
io.prometheus.client.exporter.common.TextFormat io.prometheus.client.exporter.common.TextFormat
io.prometheus.client.hotspot.DefaultExports io.prometheus.client.hotspot.DefaultExports
io.prometheus.client.jetty.JettyStatisticsCollector io.prometheus.client.jetty.JettyStatisticsCollector
@ -30,41 +28,12 @@
(declare instrument-vars!) (declare instrument-vars!)
(declare instrument) (declare instrument)
(declare create-registry) (declare create-registry)
(declare create)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Entry Point ;; Entry Point
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- instrument-jdbc!
[registry]
(instrument-vars!
[#'next.jdbc/execute-one!
#'next.jdbc/execute!]
{:registry registry
:type :counter
:name "database_query_counter"
:help "An absolute counter of database queries."}))
(defn- instrument-workers!
[registry]
(instrument-vars!
[#'app.worker/run-task]
{:registry registry
:type :summary
:name "worker_task_checkout_millis"
:help "Latency measured between scheduld_at and execution time."
:wrap (fn [rootf mobj]
(let [mdata (meta rootf)
origf (::original mdata rootf)]
(with-meta
(fn [tasks item]
(let [now (inst-ms (dt/now))
sat (inst-ms (:scheduled-at item))]
(mobj :observe (- now sat))
(origf tasks item)))
{::original origf})))}))
(defn- handler (defn- handler
[registry _request] [registry _request]
(let [samples (.metricFamilySamples ^CollectorRegistry registry) (let [samples (.metricFamilySamples ^CollectorRegistry registry)
@ -73,13 +42,24 @@
{:headers {"content-type" TextFormat/CONTENT_TYPE_004} {:headers {"content-type" TextFormat/CONTENT_TYPE_004}
:body (.toString writer)})) :body (.toString writer)}))
(s/def ::definitions
(s/map-of keyword? map?))
(defmethod ig/pre-init-spec ::metrics [_]
(s/keys :opt-un [::definitions]))
(defmethod ig/init-key ::metrics (defmethod ig/init-key ::metrics
[_ _cfg] [_ {:keys [definitions] :as cfg}]
(log/infof "Initializing prometheus registry and instrumentation.") (log/infof "Initializing prometheus registry and instrumentation.")
(let [registry (create-registry)] (let [registry (create-registry)
(instrument-workers! registry) definitions (reduce-kv (fn [res k v]
(instrument-jdbc! registry) (->> (assoc v :registry registry)
(create)
(assoc res k)))
{}
definitions)]
{:handler (partial handler registry) {:handler (partial handler registry)
:definitions definitions
:registry registry})) :registry registry}))
(s/def ::handler fn?) (s/def ::handler fn?)
@ -87,7 +67,6 @@
(s/def ::metrics (s/def ::metrics
(s/keys :req-un [::registry ::handler])) (s/keys :req-un [::registry ::handler]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Implementation ;; Implementation
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -126,7 +105,7 @@
(invoke [_ cmd labels] (invoke [_ cmd labels]
(.. ^Counter instance (.. ^Counter instance
(labels labels) (labels (into-array String labels))
(inc)))))) (inc))))))
(defn make-gauge (defn make-gauge
@ -150,19 +129,27 @@
:dec (.dec ^Gauge instance))) :dec (.dec ^Gauge instance)))
(invoke [_ cmd labels] (invoke [_ cmd labels]
(let [labels (into-array String [labels])]
(case cmd (case cmd
:inc (.. ^Gauge instance (labels labels) (inc)) :inc (.. ^Gauge instance (labels labels) (inc))
:dec (.. ^Gauge instance (labels labels) (dec))))))) :dec (.. ^Gauge instance (labels labels) (dec))))))))
(def default-quantiles
[[0.75 0.02]
[0.99 0.001]])
(defn make-summary (defn make-summary
[{:keys [name help registry reg labels max-age] :or {max-age 3600} :as props}] [{:keys [name help registry reg labels max-age quantiles buckets]
:or {max-age 3600 buckets 6 quantiles default-quantiles} :as props}]
(let [registry (or registry reg) (let [registry (or registry reg)
instance (doto (Summary/build) instance (doto (Summary/build)
(.name name) (.name name)
(.help help) (.help help))
(.maxAgeSeconds max-age) _ (when (seq quantiles)
(.quantile 0.75 0.02) (.maxAgeSeconds ^Summary instance max-age)
(.quantile 0.99 0.001)) (.ageBuckets ^Summary instance buckets))
_ (doseq [[q e] quantiles]
(.quantile ^Summary instance q e))
_ (when (seq labels) _ (when (seq labels)
(.labelNames instance (into-array String labels))) (.labelNames instance (into-array String labels)))
instance (.register instance registry)] instance (.register instance registry)]
@ -176,7 +163,34 @@
(invoke [_ cmd val labels] (invoke [_ cmd val labels]
(.. ^Summary instance (.. ^Summary instance
(labels labels) (labels (into-array String labels))
(observe val))))))
(def default-histogram-buckets
[1 5 10 25 50 75 100 250 500 750 1000 2500 5000 7500])
(defn make-histogram
[{:keys [name help registry reg labels buckets]
:or {buckets default-histogram-buckets}}]
(let [registry (or registry reg)
instance (doto (Histogram/build)
(.name name)
(.help help)
(.buckets (into-array Double/TYPE buckets)))
_ (when (seq labels)
(.labelNames instance (into-array String labels)))
instance (.register instance registry)]
(reify
clojure.lang.IDeref
(deref [_] instance)
clojure.lang.IFn
(invoke [_ cmd val]
(.observe ^Histogram instance val))
(invoke [_ cmd val labels]
(.. ^Histogram instance
(labels (into-array String labels))
(observe val)))))) (observe val))))))
(defn create (defn create
@ -184,7 +198,8 @@
(case type (case type
:counter (make-counter props) :counter (make-counter props)
:gauge (make-gauge props) :gauge (make-gauge props)
:summary (make-summary props))) :summary (make-summary props)
:histogram (make-histogram props)))
(defn wrap-counter (defn wrap-counter
([rootf mobj] ([rootf mobj]
@ -204,7 +219,6 @@
(assoc mdata ::original origf)))) (assoc mdata ::original origf))))
([rootf mobj labels] ([rootf mobj labels]
(let [mdata (meta rootf) (let [mdata (meta rootf)
labels (into-array String labels)
origf (::original mdata rootf)] origf (::original mdata rootf)]
(with-meta (with-meta
(fn (fn
@ -241,7 +255,6 @@
([rootf mobj labels] ([rootf mobj labels]
(let [mdata (meta rootf) (let [mdata (meta rootf)
labels (into-array String labels)
origf (::original mdata rootf)] origf (::original mdata rootf)]
(with-meta (with-meta
(fn (fn
@ -284,6 +297,9 @@
(instance? Summary @obj) (instance? Summary @obj)
((or wrap wrap-summary) f obj) ((or wrap wrap-summary) f obj)
(instance? Histogram @obj)
((or wrap wrap-summary) f obj)
:else :else
(ex/raise :type :not-implemented)))) (ex/raise :type :not-implemented))))

View file

@ -5,17 +5,19 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0. ;; defined by the Mozilla Public License, v. 2.0.
;; ;;
;; Copyright (c) 2020 UXBOX Labs SL ;; Copyright (c) 2020-2021 UXBOX Labs SL
(ns app.tasks (ns app.tasks
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
;; [app.metrics :as mtx] [app.metrics :as mtx]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log])) [clojure.tools.logging :as log]
[integrant.core :as ig]))
(s/def ::name ::us/string) (s/def ::name ::us/string)
(s/def ::delay (s/def ::delay
@ -41,11 +43,68 @@
interval (db/interval duration) interval (db/interval duration)
props (db/tjson props) props (db/tjson props)
id (uuid/next)] id (uuid/next)]
(log/infof "Submit task '%s' to be executed in '%s'." name (str duration)) (log/debugf "submit task '%s' to be executed in '%s'" name (str duration))
(db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval]) (db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval])
id)) id))
;; (mtx/instrument-with-counter! (defn- instrument!
;; {:var #'submit! [registry]
;; :id "tasks__submit_counter" (mtx/instrument-vars!
;; :help "Absolute task submit counter."}) [#'submit!]
{:registry registry
:type :counter
:labels ["name"]
:name "tasks_submit_counter"
:help "An absolute counter of task submissions."
:wrap (fn [rootf mobj]
(let [mdata (meta rootf)
origf (::original mdata rootf)]
(with-meta
(fn [conn params]
(let [tname (:name params)]
(mobj :inc [tname])
(origf conn params)))
{::original origf})))})
(mtx/instrument-vars!
[#'app.worker/run-task]
{:registry registry
:type :summary
:quantiles []
:name "tasks_checkout_timing"
:help "Latency measured between scheduld_at and execution time."
:wrap (fn [rootf mobj]
(let [mdata (meta rootf)
origf (::original mdata rootf)]
(with-meta
(fn [tasks item]
(let [now (inst-ms (dt/now))
sat (inst-ms (:scheduled-at item))]
(mobj :observe (- now sat))
(origf tasks item)))
{::original origf})))}))
;; --- STATE INIT: REGISTRY
(s/def ::tasks
(s/map-of keyword? fn?))
(defmethod ig/pre-init-spec ::registry [_]
(s/keys :req-un [::mtx/metrics ::tasks]))
(defmethod ig/init-key ::registry
[_ {:keys [metrics tasks]}]
(instrument! (:registry metrics))
(let [mobj (mtx/create
{:registry (:registry metrics)
:type :summary
:labels ["name"]
:quantiles []
:name "tasks_timing"
:help "Background task execution timing."})]
(reduce-kv (fn [res k v]
(let [tname (name k)]
(log/debugf "registring task '%s'" tname)
(assoc res tname (mtx/wrap-summary v mobj [tname]))))
{}
tasks)))

View file

@ -12,36 +12,26 @@
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler)
(declare handle-deletion) (declare handle-deletion)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::mtx/metrics])) (s/keys :req-un [::db/pool]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [metrics] :as cfg}] [_ {:keys [pool] :as cfg}]
(let [handler #(handler cfg %)] (fn [{:keys [props] :as task}]
(->> {:registry (:registry metrics) (us/verify ::props props)
:type :summary (db/with-atomic [conn pool]
:name "task_delete_object_timing" (handle-deletion conn props))))
:help "delete object task timing"}
(mtx/instrument handler))))
(s/def ::type ::us/keyword) (s/def ::type ::us/keyword)
(s/def ::id ::us/uuid) (s/def ::id ::us/uuid)
(s/def ::props (s/keys :req-un [::id ::type])) (s/def ::props (s/keys :req-un [::id ::type]))
(defn- handler
[{:keys [pool]} {:keys [props] :as task}]
(us/verify ::props props)
(db/with-atomic [conn pool]
(handle-deletion conn props)))
(defmulti handle-deletion (defmulti handle-deletion
(fn [_ props] (:type props))) (fn [_ props] (:type props)))

View file

@ -13,27 +13,16 @@
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.metrics :as mtx]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare delete-profile-data) (declare delete-profile-data)
(declare handler)
;; --- INIT ;; --- INIT
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::mtx/metrics])) (s/keys :req-un [::db/pool]))
(defmethod ig/init-key ::handler
[_ {:keys [metrics] :as cfg}]
(let [handler #(handler cfg %)]
(->> {:registry (:registry metrics)
:type :summary
:name "task_delete_profile_timing"
:help "delete profile task timing"}
(mtx/instrument handler))))
;; This task is responsible to permanently delete a profile with all ;; This task is responsible to permanently delete a profile with all
;; the dependent data. As step (1) we delete all owned teams of the ;; the dependent data. As step (1) we delete all owned teams of the
@ -48,8 +37,9 @@
(s/def ::profile-id ::us/uuid) (s/def ::profile-id ::us/uuid)
(s/def ::props (s/keys :req-un [::profile-id])) (s/def ::props (s/keys :req-un [::profile-id]))
(defn handler (defmethod ig/init-key ::handler
[{:keys [pool]} {:keys [props] :as task}] [_ {:keys [pool] :as cfg}]
(fn [{:keys [props] :as task}]
(us/verify ::props props) (us/verify ::props props)
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(let [id (:profile-id props) (let [id (:profile-id props)
@ -57,7 +47,7 @@
(if (or (:is-demo profile) (if (or (:is-demo profile)
(:deleted-at profile)) (:deleted-at profile))
(delete-profile-data conn id) (delete-profile-data conn id)
(log/warnf "Profile %s does not match constraints for deletion" id))))) (log/warnf "profile '%s' does not match constraints for deletion" id))))))
;; --- IMPL ;; --- IMPL

View file

@ -5,7 +5,7 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0. ;; defined by the Mozilla Public License, v. 2.0.
;; ;;
;; Copyright (c) 2020 UXBOX Labs SL ;; Copyright (c) 2020-2021 UXBOX Labs SL
(ns app.tasks.file-media-gc (ns app.tasks.file-media-gc
"A maintenance task that is responsible to purge the unused media "A maintenance task that is responsible to purge the unused media
@ -14,33 +14,23 @@
(:require (:require
[app.common.pages.migrations :as pmg] [app.common.pages.migrations :as pmg]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler)
(declare process-file) (declare process-file)
(declare retrieve-candidates) (declare retrieve-candidates)
(s/def ::max-age ::dt/duration) (s/def ::max-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::mtx/metrics ::max-age])) (s/keys :req-un [::db/pool ::max-age]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [metrics] :as cfg}] [_ {:keys [pool] :as cfg}]
(let [handler #(handler cfg %)] (fn [_]
(->> {:registry (:registry metrics)
:type :summary
:name "task_file_media_gc_timing"
:help "file media garbage collection task timing"}
(mtx/instrument handler))))
(defn- handler
[{:keys [pool] :as cfg} _]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(let [cfg (assoc cfg :conn conn)] (let [cfg (assoc cfg :conn conn)]
(loop [n 0] (loop [n 0]
@ -50,8 +40,8 @@
(run! (partial process-file cfg) files) (run! (partial process-file cfg) files)
(recur (+ n (count files)))) (recur (+ n (count files))))
(do (do
(log/infof "finalized with total of %s processed files" n) (log/debugf "finalized with total of %s processed files" n)
{:processed n}))))))) {:processed n}))))))))
(def ^:private (def ^:private
sql:retrieve-candidates-chunk sql:retrieve-candidates-chunk

View file

@ -5,45 +5,36 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0. ;; defined by the Mozilla Public License, v. 2.0.
;; ;;
;; Copyright (c) 2020 UXBOX Labs SL ;; Copyright (c) 2020-2021 UXBOX Labs SL
(ns app.tasks.file-xlog-gc (ns app.tasks.file-xlog-gc
"A maintenance task that performs a garbage collection of the file "A maintenance task that performs a garbage collection of the file
change (transaction) log." change (transaction) log."
(:require (:require
[app.db :as db] [app.db :as db]
[app.metrics :as mtx]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler) (declare sql:delete-files-xlog)
(s/def ::max-age ::dt/duration) (s/def ::max-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::mtx/metrics ::max-age])) (s/keys :req-un [::db/pool ::max-age]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [metrics] :as cfg}] [_ {:keys [pool max-age] :as cfg}]
(let [handler #(handler cfg %)] (fn [_]
(->> {:registry (:registry metrics) (db/with-atomic [conn pool]
:type :summary (let [interval (db/interval max-age)
:name "task_file_xlog_gc_timing" result (db/exec-one! conn [sql:delete-files-xlog interval])
:help "file changes garbage collection task timing"} result (:next.jdbc/update-count result)]
(mtx/instrument handler)))) (log/debugf "removed %s rows from file-change table" result)
result))))
(def ^:private (def ^:private
sql:delete-files-xlog sql:delete-files-xlog
"delete from file_change "delete from file_change
where created_at < now() - ?::interval") where created_at < now() - ?::interval")
(defn- handler
[{:keys [pool max-age]} _]
(db/with-atomic [conn pool]
(let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-files-xlog interval])
result (:next.jdbc/update-count result)]
(log/infof "removed %s rows from file_change table" result)
nil)))

View file

@ -10,13 +10,12 @@
(ns app.tasks.sendmail (ns app.tasks.sendmail
(:require (:require
[app.config :as cfg] [app.config :as cfg]
[app.metrics :as mtx]
[app.util.emails :as emails] [app.util.emails :as emails]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler) (declare send-console!)
(s/def ::username ::cfg/smtp-username) (s/def ::username ::cfg/smtp-username)
(s/def ::password ::cfg/smtp-password) (s/def ::password ::cfg/smtp-password)
@ -29,7 +28,7 @@
(s/def ::enabled ::cfg/smtp-enabled) (s/def ::enabled ::cfg/smtp-enabled)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::enabled ::mtx/metrics] (s/keys :req-un [::enabled]
:opt-un [::username :opt-un [::username
::password ::password
::tls ::tls
@ -40,13 +39,11 @@
::default-reply-to])) ::default-reply-to]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [metrics] :as cfg}] [_ cfg]
(let [handler #(handler cfg %)] (fn [{:keys [props] :as task}]
(->> {:registry (:registry metrics) (if (:enabled cfg)
:type :summary (emails/send! cfg props)
:name "task_sendmail_timing" (send-console! cfg props))))
:help "sendmail task timing"}
(mtx/instrument handler))))
(defn- send-console! (defn- send-console!
[cfg email] [cfg email]
@ -59,9 +56,3 @@
(println (.toString baos)) (println (.toString baos))
(println "******** end email "(:id email) "**********"))] (println "******** end email "(:id email) "**********"))]
(log/info out)))) (log/info out))))
(defn handler
[cfg {:keys [props] :as task}]
(if (:enabled cfg)
(emails/send! cfg props)
(send-console! cfg props)))

View file

@ -5,46 +5,36 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0. ;; defined by the Mozilla Public License, v. 2.0.
;; ;;
;; Copyright (c) 2020 UXBOX Labs SL ;; Copyright (c) 2020-2021 UXBOX Labs SL
(ns app.tasks.tasks-gc (ns app.tasks.tasks-gc
"A maintenance task that performs a cleanup of already executed tasks "A maintenance task that performs a cleanup of already executed tasks
from the database table." from the database table."
(:require (:require
[app.db :as db] [app.db :as db]
[app.metrics :as mtx]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler) (declare sql:delete-completed-tasks)
(s/def ::max-age ::dt/duration) (s/def ::max-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::mtx/metrics ::max-age])) (s/keys :req-un [::db/pool ::max-age]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [metrics] :as cfg}] [_ {:keys [pool max-age] :as cfg}]
(let [handler #(handler cfg %)] (fn [_]
(->> {:registry (:registry metrics) (db/with-atomic [conn pool]
:type :summary (let [interval (db/interval max-age)
:name "task_tasks_gc_timing" result (db/exec-one! conn [sql:delete-completed-tasks interval])
:help "tasks garbage collection task timing"} result (:next.jdbc/update-count result)]
(mtx/instrument handler)))) (log/debugf "removed %s rows from tasks-completed table" result)
result))))
(def ^:private (def ^:private
sql:delete-completed-tasks sql:delete-completed-tasks
"delete from task_completed "delete from task_completed
where scheduled_at < now() - ?::interval") where scheduled_at < now() - ?::interval")
(defn- handler
[{:keys [pool max-age]} _]
(db/with-atomic [conn pool]
(let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-completed-tasks interval])
result (:next.jdbc/update-count result)]
(log/infof "removed %s rows from tasks_completed table" result)
nil)))

View file

@ -10,6 +10,7 @@
(ns app.worker (ns app.worker
"Async tasks abstraction (impl)." "Async tasks abstraction (impl)."
(:require (:require
[app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
@ -19,6 +20,7 @@
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec :as px]) [promesa.exec :as px])
(:import (:import
@ -72,7 +74,7 @@
(s/def ::queue ::us/string) (s/def ::queue ::us/string)
(s/def ::parallelism ::us/integer) (s/def ::parallelism ::us/integer)
(s/def ::batch-size ::us/integer) (s/def ::batch-size ::us/integer)
(s/def ::tasks (s/map-of string? ::us/fn)) (s/def ::tasks (s/map-of string? fn?))
(s/def ::poll-interval ::dt/duration) (s/def ::poll-interval ::dt/duration)
(defmethod ig/pre-init-spec ::worker [_] (defmethod ig/pre-init-spec ::worker [_]
@ -289,21 +291,31 @@
(s/def ::id ::us/string) (s/def ::id ::us/string)
(s/def ::cron dt/cron?) (s/def ::cron dt/cron?)
(s/def ::props (s/nilable map?)) (s/def ::props (s/nilable map?))
(s/def ::task keyword?)
(s/def ::scheduled-task-spec (s/def ::scheduled-task-spec
(s/keys :req-un [::id ::cron ::fn] (s/keys :req-un [::id ::cron ::task]
:opt-un [::props])) :opt-un [::props]))
(s/def ::schedule (s/def ::schedule (s/coll-of (s/nilable ::scheduled-task-spec)))
(s/coll-of (s/nilable ::scheduled-task-spec)))
(defmethod ig/pre-init-spec ::scheduler [_] (defmethod ig/pre-init-spec ::scheduler [_]
(s/keys :req-un [::executor ::db/pool ::schedule])) (s/keys :req-un [::executor ::db/pool ::schedule ::tasks]))
(defmethod ig/init-key ::scheduler (defmethod ig/init-key ::scheduler
[_ {:keys [schedule] :as cfg}] [_ {:keys [schedule tasks] :as cfg}]
(let [scheduler (Executors/newScheduledThreadPool (int 1)) (let [scheduler (Executors/newScheduledThreadPool (int 1))
schedule (filter some? schedule) schedule (->> schedule
(filter some?)
(map (fn [{:keys [task] :as item}]
(let [f (get tasks (name task))]
(when-not f
(ex/raise :type :internal
:code :task-not-found
:hint (str/fmt "task %s not configured" task)))
(-> item
(dissoc :task)
(assoc :fn f))))))
cfg (assoc cfg cfg (assoc cfg
:scheduler scheduler :scheduler scheduler
:schedule schedule)] :schedule schedule)]
@ -351,27 +363,16 @@
(letfn [(run-task [conn] (letfn [(run-task [conn]
(try (try
(when (db/exec-one! conn [sql:lock-scheduled-task id]) (when (db/exec-one! conn [sql:lock-scheduled-task id])
(log/info "Executing scheduled task" id) (log/debugf "executing scheduled task '%s'" id)
((:fn task) task)) ((:fn task) task))
(catch Exception e (catch Throwable e
e))) e)))
(handle-task* [conn]
(let [result (run-task conn)]
(if (instance? Throwable result)
(do
(log/warnf result "unhandled exception on scheduled task '%s'" id)
(db/insert! conn :scheduled-task-history
{:id (uuid/next)
:task-id id
:is-error true
:reason (exception->string result)}))
(db/insert! conn :scheduled-task-history
{:id (uuid/next)
:task-id id}))))
(handle-task [] (handle-task []
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(handle-task* conn)))] (let [result (run-task conn)]
(when (ex/exception? result)
(log/errorf result "unhandled exception on scheduled task '%s'" id)))))]
(try (try
(px/run! executor handle-task) (px/run! executor handle-task)

View file

@ -52,3 +52,7 @@
(defn ex-info? (defn ex-info?
[v] [v]
(instance? #?(:clj clojure.lang.ExceptionInfo :cljs cljs.core.ExceptionInfo) v)) (instance? #?(:clj clojure.lang.ExceptionInfo :cljs cljs.core.ExceptionInfo) v))
(defn exception?
[v]
(instance? #?(:clj java.lang.Throwable :cljs js/Error) v))