diff --git a/backend/src/app/tasks.clj b/backend/src/app/tasks.clj index 6a4fe8ef7..02b0164d4 100644 --- a/backend/src/app/tasks.clj +++ b/backend/src/app/tasks.clj @@ -9,14 +9,14 @@ (ns app.tasks (:require - [cuerdas.core :as str] - [clojure.spec.alpha :as s] - [clojure.tools.logging :as log] [app.common.spec :as us] [app.common.uuid :as uuid] + [app.config :as cfg] [app.db :as db] + [app.metrics :as mtx] [app.util.time :as dt] - [app.metrics :as mtx])) + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log])) (s/def ::name ::us/string) (s/def ::delay @@ -43,7 +43,7 @@ interval (db/interval duration) props (db/tjson props) id (uuid/next)] - (log/info (str/format "Submit task '%s' to be executed in '%s'." name (str duration))) + (log/infof "Submit task '%s' to be executed in '%s'." name (str duration)) (db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval]) id))) diff --git a/backend/src/app/tasks/gc.clj b/backend/src/app/tasks/gc.clj deleted file mode 100644 index e342f662a..000000000 --- a/backend/src/app/tasks/gc.clj +++ /dev/null @@ -1,76 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; This Source Code Form is "Incompatible With Secondary Licenses", as -;; defined by the Mozilla Public License, v. 2.0. -;; -;; Copyright (c) 2020 UXBOX Labs SL - -(ns app.tasks.gc - (:require - [clojure.spec.alpha :as s] - [clojure.tools.logging :as log] - [cuerdas.core :as str] - [postal.core :as postal] - [promesa.core :as p] - [app.common.exceptions :as ex] - [app.common.spec :as us] - [app.config :as cfg] - [app.db :as db] - [app.tasks :as tasks] - [app.media-storage :as mst] - [app.util.blob :as blob] - [app.util.storage :as ust])) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Task: Remove deleted media -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -;; The main purpose of this task is analize the `pending_to_delete` -;; table. This table stores the references to the physical files on -;; the file system thanks to `handle_delete()` trigger. - -;; Example: -;; (1) You delete an media-object. (2) This media object is marked as -;; deleted. (3) A task (`delete-object`) is scheduled for permanent -;; delete the object. - If that object stores media, the database -;; will execute the `handle_delete()` trigger which will place -;; filesystem paths into the `pendint_to_delete` table. (4) This -;; task (`remove-deleted-media`) permanently delete the file from the -;; filesystem when is executed (by scheduler). - -(def ^:private - sql:retrieve-peding-to-delete - "with items_part as ( - select i.id - from pending_to_delete as i - order by i.created_at - limit ? - for update skip locked - ) - delete from pending_to_delete - where id in (select id from items_part) - returning *") - -(defn remove-deleted-media - [{:keys [props] :as task}] - (letfn [(decode-row [{:keys [data] :as row}] - (cond-> row - (db/pgobject? data) (assoc :data (db/decode-pgobject data)))) - (retrieve-items [conn] - (->> (db/exec! conn [sql:retrieve-peding-to-delete 10]) - (map decode-row) - (map :data))) - (remove-media [rows] - (run! (fn [item] - (let [path (get item "path")] - (ust/delete! mst/media-storage path))) - rows))] - (loop [] - (let [rows (retrieve-items db/pool)] - (when-not (empty? rows) - (remove-media rows) - (recur)))))) - - diff --git a/backend/src/app/tasks/impl.clj b/backend/src/app/tasks/impl.clj deleted file mode 100644 index c0c8c4e09..000000000 --- a/backend/src/app/tasks/impl.clj +++ /dev/null @@ -1,304 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; This Source Code Form is "Incompatible With Secondary Licenses", as -;; defined by the Mozilla Public License, v. 2.0. -;; -;; Copyright (c) 2020 Andrey Antukh - -(ns app.tasks.impl - "Async tasks implementation." - (:require - [cuerdas.core :as str] - [clojure.core.async :as a] - [clojure.spec.alpha :as s] - [clojure.tools.logging :as log] - [promesa.exec :as px] - [app.common.spec :as us] - [app.common.uuid :as uuid] - [app.config :as cfg] - [app.db :as db] - [app.util.async :as aa] - [app.util.blob :as blob] - [app.util.time :as dt]) - (:import - java.util.concurrent.ScheduledExecutorService - java.util.concurrent.Executors - java.time.Duration - java.time.Instant - java.util.Date)) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Tasks -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(defn- string-strack-trace - [^Throwable err] - (with-out-str - (.printStackTrace err (java.io.PrintWriter. *out*)))) - -(def ^:private - sql:mark-as-retry - "update task - set scheduled_at = clock_timestamp() + '5 seconds'::interval, - modified_at = clock_timestamp(), - error = ?, - status = 'retry', - retry_num = retry_num + 1 - where id = ?") - -(defn- mark-as-retry - [conn task error] - (let [explain (ex-message error) - sqlv [sql:mark-as-retry explain (:id task)]] - (db/exec-one! conn sqlv) - nil)) - -(defn- mark-as-failed - [conn task error] - (let [explain (ex-message error)] - (db/update! conn :task - {:error explain - :modified-at (dt/now) - :status "failed"} - {:id (:id task)}) - nil)) - -(defn- mark-as-completed - [conn task] - (let [now (dt/now)] - (db/update! conn :task - {:completed-at now - :modified-at now - :status "completed"} - {:id (:id task)}) - nil)) - - -(def ^:private - sql:select-next-task - "select * from task as t - where t.scheduled_at <= now() - and t.queue = ? - and (t.status = 'new' or t.status = 'retry') - order by t.priority desc, t.scheduled_at - limit 1 - for update skip locked") - -(defn- decode-task-row - [{:keys [props] :as row}] - (when row - (cond-> row - (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))))) - - -(defn- log-task-error - [item err] - (log/error (str/format "Unhandled exception on task '%s' (retry: %s)\n" (:name item) (:retry-num item)) - (str/format "Props: %s\n" (pr-str (:props item))) - (with-out-str - (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*))))) - -(defn- handle-task - [tasks {:keys [name] :as item}] - (let [task-fn (get tasks name)] - (if task-fn - (task-fn item) - (do - (log/warn "no task handler found for" (pr-str name)) - nil)))) - -(defn- run-task - [{:keys [tasks conn]} item] - (try - (log/debug (str/format "Started task '%s/%s'." (:name item) (:id item))) - (handle-task tasks item) - (log/debug (str/format "Finished task '%s/%s'." (:name item) (:id item))) - (mark-as-completed conn item) - (catch Exception e - (log-task-error item e) - (if (>= (:retry-num item) (:max-retries item)) - (mark-as-failed conn item e) - (mark-as-retry conn item e))))) - -(defn- event-loop-fn - [{:keys [tasks] :as opts}] - (aa/thread-try - (db/with-atomic [conn db/pool] - (let [queue (:queue opts "default") - item (-> (db/exec-one! conn [sql:select-next-task queue]) - (decode-task-row)) - opts (assoc opts :conn conn)] - - (cond - (nil? item) - ::empty - - (or (= "new" (:status item)) - (= "retry" (:status item))) - (do - (run-task opts item) - ::handled) - - :else - (do - (log/warn "Unexpected condition on worker event loop:" (pr-str item)) - ::handled)))))) - -(s/def ::poll-interval ::us/integer) -(s/def ::fn (s/or :var var? :fn fn?)) -(s/def ::tasks (s/map-of string? ::fn)) -(s/def ::start-worker-params - (s/keys :req-un [::tasks] - :opt-un [::poll-interval])) - -(defn start-worker! - [{:keys [poll-interval] - :or {poll-interval 5000} - :as opts}] - (us/assert ::start-worker-params opts) - (log/info (str/format "Starting worker '%s' on queue '%s'." - (:name opts "anonymous") - (:queue opts "default"))) - (let [cch (a/chan 1)] - (a/go-loop [] - (let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)] - (cond - ;; Terminate the loop if close channel is closed or - ;; event-loop-fn returns nil. - (or (= port cch) (nil? val)) - (log/info (str/format "Stop condition found. Shutdown worker: '%s'" - (:name opts "anonymous"))) - - (db/pool-closed? db/pool) - (do - (log/info "Worker eventloop is aborted because pool is closed.") - (a/close! cch)) - - (and (instance? java.sql.SQLException val) - (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val))) - (do - (log/error "Connection error, trying resume in some instants.") - (a/ row + (db/pgobject? data) (assoc :data (db/decode-pgobject data)))) + (retrieve-items [conn] + (->> (db/exec! conn [sql:retrieve-peding-to-delete 10]) + (map decode-row) + (map :data))) + (remove-media [rows] + (run! (fn [item] + (let [path (get item "path")] + (ust/delete! mst/media-storage path))) + rows))] + (loop [] + (let [rows (retrieve-items db/pool)] + (when-not (empty? rows) + (remove-media rows) + (recur)))))) + + diff --git a/backend/src/app/util/time.clj b/backend/src/app/util/time.clj index bdf3d0c11..bfe99127f 100644 --- a/backend/src/app/util/time.clj +++ b/backend/src/app/util/time.clj @@ -53,15 +53,15 @@ (defn duration [ms-or-obj] (cond + (string? ms-or-obj) + (Duration/parse (str "PT" ms-or-obj)) + (duration? ms-or-obj) ms-or-obj (integer? ms-or-obj) (Duration/ofMillis ms-or-obj) - (string? ms-or-obj) - (Duration/parse ms-or-obj) - :else (obj->duration ms-or-obj))) @@ -71,7 +71,7 @@ (defn parse-duration [s] - (Duration/parse (str "PT" s))) + (Duration/parse s)) (extend-protocol clojure.core/Inst java.time.Duration @@ -79,7 +79,7 @@ (defmethod print-method Duration [mv ^java.io.Writer writer] - (.write writer (str "#app/duration \"" (.toString ^Duration mv) "\""))) + (.write writer (str "#app/duration \"" (subs (str mv) 2) "\""))) (defmethod print-dup Duration [o w] (print-method o w)) diff --git a/backend/src/app/util/workers.clj b/backend/src/app/util/workers.clj deleted file mode 100644 index 1f271366f..000000000 --- a/backend/src/app/util/workers.clj +++ /dev/null @@ -1,74 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) 2016 Andrey Antukh - -(ns app.util.workers - "A distributed asynchronous tasks queue implementation on top - of PostgreSQL reliable advirsory locking mechanism." - #_(:require - [suricatta.core :as sc] - [app.db :as db])) - -;; (defn- poll-for-task -;; [conn queue] -;; (let [sql (sql/acquire-task {:queue queue})] -;; (sc/fetch-one conn sql))) - -;; (defn- mark-task-done -;; [conn {:keys [id]}] -;; (let [sql (sql/mark-task-done {:id id})] -;; (sc/execute conn sql))) - -;; (defn- mark-task-failed -;; [conn {:keys [id]} error] -;; (let [sql (sql/mark-task-done {:id id :error (.getMessage error)})] -;; (sc/execute conn sql))) - -;; (defn- watch-unit -;; [conn queue callback] -;; (let [task (poll-for-task conn queue)] -;; (if (nil? task) -;; (Thread/sleep 1000) -;; (try -;; (sc/atomic conn -;; (callback conn task) -;; (mark-task-done conn task)) -;; (catch Exception e -;; (mark-task-failed conn task e)))))) - -;; (defn- watch-loop -;; "Watch tasks on the specified queue and executes a -;; callback for each task is received. -;; NOTE: This function blocks the current thread." -;; [queue callback] -;; (try -;; (loop [] -;; (with-open [conn (db/connection)] -;; (sc/atomic conn (watch-unit conn queue callback))) -;; (recur)) -;; (catch InterruptedException e -;; ;; just ignoring -;; ))) - -;; (defn watch! -;; [queue callback] -;; (let [runnable #(watch-loop queue callback) -;; thread (Thread. ^Runnable runnable)] -;; (.setDaemon thread true) -;; (.start thread) -;; (reify -;; java.lang.AutoCloseable -;; (close [_] -;; (.interrupt thread) -;; (.join thread 2000)) - -;; clojure.lang.IDeref -;; (deref [_] -;; (.join thread)) - -;; clojure.lang.IBlockingDeref -;; (deref [_ ms default] -;; (.join thread ms) -;; default)))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index f0d90af36..4c0084f10 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -15,8 +15,8 @@ [app.db :as db] [app.tasks.delete-object] [app.tasks.delete-profile] - [app.tasks.gc] [app.tasks.remove-media] + [app.tasks.maintenance] [app.tasks.sendmail] [app.tasks.trim-file] [app.util.async :as aa] @@ -53,18 +53,28 @@ (def ^:private schedule [{:id "remove-deleted-media" - :cron (dt/cron "0 0 0 */1 * ? *") ;; daily - :fn #'app.tasks.gc/remove-deleted-media} - {:id "trim-file" - :cron (dt/cron "0 0 0 */1 * ? *") ;; daily - :fn #'app.tasks.trim-file/handler} - ]) + :cron #app/cron "0 0 0 */1 * ? *" ;; daily + :fn #'app.tasks.remove-media/trim-media-storage} + {:id "trim-file" + :cron #app/cron "0 0 0 */1 * ? *" ;; daily + :fn #'app.tasks.trim-file/handler} + + {:id "maintenance/delete-executed-tasks" + :cron #app/cron "0 0 */1 * * ?" ;; hourly + :fn #'app.tasks.maintenance/delete-executed-tasks + :props {:max-age #app/duration "48h"}} + + {:id "maintenance/delete-old-files-xlog" + :cron #app/cron "0 0 */1 * * ?" ;; hourly + :fn #'app.tasks.maintenance/delete-old-files-xlog + :props {:max-age #app/duration "8h"}} + ]) (defstate executor :start (thread-pool {:idle-timeout 10000 - :min-threads 0 - :max-threads 256}) + :min-threads 0 + :max-threads 256}) :stop (stop! executor)) (defstate worker @@ -77,12 +87,12 @@ (defstate scheduler-worker :start (start-scheduler-worker! {:schedule schedule - :executor executor}) + :executor executor}) :stop (stop! scheduler-worker)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Worker Impl +;; Tasks Worker Impl ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (def ^:private