diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 97171825c..80f0651bf 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -351,6 +351,8 @@ :object-update (ig/ref :app.tasks.object-update/handler) + :delete-object + (ig/ref :app.tasks.delete-object/handler) :process-webhook-event (ig/ref ::webhooks/process-event-handler) :run-webhook @@ -383,6 +385,9 @@ :app.tasks.object-update/handler {::db/pool (ig/ref ::db/pool)} + :app.tasks.delete-object/handler + {::db/pool (ig/ref ::db/pool)} + :app.tasks.file-gc/handler {::db/pool (ig/ref ::db/pool) ::sto/storage (ig/ref ::sto/storage)} diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index 601907e10..e165173f2 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -35,6 +35,7 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str])) @@ -916,7 +917,8 @@ (db/update! conn :file {:deleted-at (dt/now)} {:id file-id} - {::db/return-keys [:id :name :is-shared :project-id :created-at :modified-at]})) + {::db/return-keys [:id :name :is-shared :deleted-at + :project-id :created-at :modified-at]})) (def ^:private schema:delete-file @@ -929,6 +931,13 @@ (check-edition-permissions! conn profile-id id) (let [file (mark-file-deleted! conn id)] + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :file + :deleted-at (:deleted-at file) + :id id}) + ;; NOTE: when a file is a shared library, then we proceed to load ;; the whole file, proceed with feature checking and properly execute ;; the absorb-library procedure diff --git a/backend/src/app/rpc/commands/projects.clj b/backend/src/app/rpc/commands/projects.clj index caa3fe7a0..29cbeaf51 100644 --- a/backend/src/app/rpc/commands/projects.clj +++ b/backend/src/app/rpc/commands/projects.clj @@ -20,6 +20,7 @@ [app.rpc.quotes :as quotes] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s])) (s/def ::id ::us/uuid) @@ -262,10 +263,16 @@ {:deleted-at (dt/now)} {:id id :is-default false} {::db/return-keys true})] + + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :project + :deleted-at (:deleted-at project) + :id id}) + (rph/with-meta (rph/wrap) {::audit/props {:team-id (:team-id project) :name (:name project) :created-at (:created-at project) :modified-at (:modified-at project)}})))) - - diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index 7a21a1241..dabf6c848 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -31,6 +31,7 @@ [app.tokens :as tokens] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str])) @@ -528,14 +529,23 @@ {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] (db/with-atomic [conn pool] - (let [perms (get-permissions conn profile-id id)] + (let [perms (get-permissions conn profile-id id) + deleted-at (dt/now)] + (when-not (:is-owner perms) (ex/raise :type :validation :code :only-owner-can-delete-team)) (db/update! conn :team - {:deleted-at (dt/now)} + {:deleted-at deleted-at} {:id id :is-default false}) + + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :team + :deleted-at deleted-at + :id id}) nil))) diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 48a15d811..622dc840e 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -192,7 +192,6 @@ ;; NOTIFICATIONS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (defn notify! [{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level] :or {code :generic level :info} @@ -474,6 +473,83 @@ :rollback rollback? :elapsed elapsed)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; RESTORE DELETED OBJECTS +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn restore-deleted-team! + "Mark a team and all related objects as not deleted" + [team-id] + (let [team-id (h/parse-uuid team-id)] + (db/tx-run! main/system + (fn [{:keys [::db/conn]}] + (db/update! conn :team-font-variant + {:deleted-at nil} + {:team-id team-id}) + + (doseq [project (db/update! conn :project + {:deleted-at nil} + {:team-id team-id} + {::db/return-keys [:id] + ::db/many true})] + + (doseq [file (db/update! conn :file + {:deleted-at nil + :has-media-trimmed false} + {:project-id (:id project)} + {::db/return-keys [:id] + ::db/many true})] + + ;; Fragments are not handled here because they + ;; use the database cascade operation and they + ;; are not marked for deletion with objects-gc + ;; task + + (db/update! conn :file-media-object + {:deleted-at nil} + {:file-id (:id file)}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at nil} + {:file-id (:id file)}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at nil} + {:file-id (:id file)}))))))) + + +(defn restore-deleted-project! + "Mark a project and all related objects as not deleted" + [project-id] + (let [project-id (h/parse-uuid project-id)] + (db/tx-run! main/system + (fn [{:keys [::db/conn]}] + (doseq [file (db/update! conn :file + {:deleted-at nil + :has-media-trimmed false} + {:project-id project-id} + {::db/return-keys [:id] + ::db/many true})] + + ;; Fragments are not handled here because they use + ;; the database cascade operation and they are not + ;; marked for deletion with objects-gc task + + (db/update! conn :file-media-object + {:deleted-at nil} + {:file-id (:id file)}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at nil} + {:file-id (:id file)}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at nil} + {:file-id (:id file)})))))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MISC ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/tasks/delete_object.clj b/backend/src/app/tasks/delete_object.clj new file mode 100644 index 000000000..f0a60d30a --- /dev/null +++ b/backend/src/app/tasks/delete_object.clj @@ -0,0 +1,69 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.delete-object + "A generic task for object deletion cascade handling" + (:require + [app.common.logging :as l] + [app.db :as db] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(defmulti delete-object + (fn [_ props] (:object props))) + +(defmethod delete-object :file + [{:keys [::db/conn]} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "file" :id id) + ;; Mark file media objects to be deleted + (db/update! conn :file-media-object + {:deleted-at deleted-at} + {:file-id id}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at deleted-at} + {:file-id id}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at deleted-at} + {:file-id id})) + +(defmethod delete-object :project + [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "project" :id id) + (doseq [file (db/update! conn :file + {:deleted-at deleted-at} + {:project-id id} + {::db/return-keys [:id :deleted-at] + ::db/many true})] + (delete-object cfg (assoc file :object :file)))) + +(defmethod delete-object :team + [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "team" :id id) + (db/update! conn :team-font-variant + {:deleted-at deleted-at} + {:team-id id}) + + (doseq [project (db/update! conn :project + {:deleted-at deleted-at} + {:team-id id} + {::db/return-keys [:id :deleted-at] + ::db/many true})] + (delete-object cfg (assoc project :object :project)))) + +(defmethod delete-object :default + [_cfg props] + (l/wrn :hint "not implementation found" :rel (:object props))) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [{:keys [props] :as params}] + (db/tx-run! cfg delete-object props))) diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 3caed3271..da9e1232f 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -17,67 +17,18 @@ [clojure.spec.alpha :as s] [integrant.core :as ig])) -(declare ^:private delete-file-data-fragments!) -(declare ^:private delete-file-media-objects!) -(declare ^:private delete-file-object-thumbnails!) -(declare ^:private delete-file-thumbnails!) -(declare ^:private delete-files!) -(declare ^:private delete-fonts!) -(declare ^:private delete-profiles!) -(declare ^:private delete-projects!) -(declare ^:private delete-teams!) - -(defmethod ig/pre-init-spec ::handler [_] - (s/keys :req [::db/pool ::sto/storage])) - -(defmethod ig/prep-key ::handler - [_ cfg] - (assoc cfg ::min-age cf/deletion-delay)) - -(defmethod ig/init-key ::handler - [_ cfg] - (fn [params] - (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] - ;; Disable deletion protection for the current transaction - (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) - (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"]) - - (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) - cfg (-> cfg - (assoc ::min-age (db/interval min-age)) - (update ::sto/storage media/configure-assets-storage conn)) - - total (reduce + 0 - [(delete-profiles! cfg) - (delete-teams! cfg) - (delete-fonts! cfg) - (delete-projects! cfg) - (delete-files! cfg) - (delete-file-thumbnails! cfg) - (delete-file-object-thumbnails! cfg) - (delete-file-data-fragments! cfg) - (delete-file-media-objects! cfg)])] - - (l/info :hint "task finished" - :deleted total - :rollback? (boolean (:rollback? params))) - - (when (:rollback? params) - (db/rollback! conn)) - - {:processed total}))))) - (def ^:private sql:get-profiles "SELECT id, photo_id FROM profile WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-profiles! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-profiles min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id photo-id]}] (l/trc :hint "permanently delete" :rel "profile" :id (str id)) @@ -99,13 +50,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-teams! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - - (->> (db/cursor conn [sql:get-teams min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id photo-id deleted-at]}] (l/trc :hint "permanently delete" :rel "team" @@ -118,15 +69,6 @@ ;; And finally, permanently delete the team. (db/delete! conn :team {:id id}) - ;; Mark for deletion in cascade - (db/update! conn :team-font-variant - {:deleted-at deleted-at} - {:team-id id}) - - (db/update! conn :project - {:deleted-at deleted-at} - {:team-id id}) - (inc total)) 0))) @@ -136,12 +78,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-fonts! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-fonts min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id team-id deleted-at] :as font}] (l/trc :hint "permanently delete" :rel "team-font-variant" @@ -167,12 +110,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-projects! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-projects min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id team-id deleted-at]}] (l/trc :hint "permanently delete" :rel "project" @@ -183,11 +127,6 @@ ;; And finally, permanently delete the project. (db/delete! conn :project {:id id}) - ;; Mark files to be deleted - (db/update! conn :file - {:deleted-at deleted-at} - {:project-id id}) - (inc total)) 0))) @@ -197,12 +136,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-files! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-files min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id deleted-at project-id]}] (l/trc :hint "permanently delete" :rel "file" @@ -210,26 +150,9 @@ :project-id (str project-id) :deleted-at (dt/format-instant deleted-at)) - ;; NOTE: fragments not handled here because they have - ;; cascade. - ;; And finally, permanently delete the file. (db/delete! conn :file {:id id}) - ;; Mark file media objects to be deleted - (db/update! conn :file-media-object - {:deleted-at deleted-at} - {:file-id id}) - - ;; Mark thumbnails to be deleted - (db/update! conn :file-thumbnail - {:deleted-at deleted-at} - {:file-id id}) - - (db/update! conn :file-tagged-object-thumbnail - {:deleted-at deleted-at} - {:file-id id}) - (inc total)) 0))) @@ -239,12 +162,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn delete-file-thumbnails! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-thumbnails min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id revn media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-thumbnail" @@ -267,12 +191,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn delete-file-object-thumbnails! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-object-thumbnails min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id object-id media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-tagged-object-thumbnail" @@ -295,12 +220,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-file-data-fragments! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-file-data-fragments min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-data-fragment" @@ -319,12 +245,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-file-media-objects! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-media-objects min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id file-id deleted-at] :as fmo}] (l/trc :hint "permanently delete" :rel "file-media-object" @@ -340,3 +267,53 @@ (inc total)) 0))) + +(def ^:private deletion-proc-vars + [#'delete-file-media-objects! + #'delete-file-data-fragments! + #'delete-file-object-thumbnails! + #'delete-file-thumbnails! + #'delete-files! + #'delete-projects! + #'delete-fonts! + #'delete-teams! + #'delete-profiles!]) + +(defn- execute-proc! + "A generic function that executes the specified proc iterativelly + until 0 results is returned" + [cfg proc-fn] + (loop [total 0] + (let [result (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] + (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) + (proc-fn cfg)))] + (if (pos? result) + (recur (+ total result)) + total)))) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool ::sto/storage])) + +(defmethod ig/prep-key ::handler + [_ cfg] + (assoc cfg + ::min-age cf/deletion-delay + ::chunk-size 10)) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [params] + (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) + cfg (-> cfg + (assoc ::min-age (db/interval min-age)) + (update ::sto/storage media/configure-assets-storage))] + + (loop [procs (map deref deletion-proc-vars) + total 0] + (if-let [proc-fn (first procs)] + (let [result (execute-proc! cfg proc-fn)] + (recur (rest procs) + (+ total result))) + (do + (l/inf :hint "task finished" :deleted total) + {:processed total})))))) diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index be3663365..4082c4a3a 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -35,8 +35,92 @@ [_ item] {:params item}) +(defn- get-task + [{:keys [::db/pool]} task-id] + (ex/try! + (some-> (db/get* pool :task {:id task-id}) + (decode-task-row)))) + +(defn- run-task + [{:keys [::wrk/registry ::id ::queue] :as cfg} task] + (try + (l/dbg :hint "start" + :name (:name task) + :task-id (str (:id task)) + :queue queue + :runner-id id + :retry (:retry-num task)) + (let [tpoint (dt/tpoint) + task-fn (get registry (:name task)) + result (if task-fn + (task-fn task) + {:status :completed :task task}) + elapsed (dt/format-duration (tpoint))] + + (when-not task-fn + (l/wrn :hint "no task handler found" :name (:name task))) + + (l/dbg :hint "end" + :name (:name task) + :task-id (str (:id task)) + :queue queue + :runner-id id + :retry (:retry-num task) + :elapsed elapsed) + + result) + + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (let [edata (ex-data cause)] + (if (and (< (:retry-num task) + (:max-retries task)) + (= ::retry (:type edata))) + (cond-> {:status :retry :task task :error cause} + (dt/duration? (:delay edata)) + (assoc :delay (:delay edata)) + + (= ::noop (:strategy edata)) + (assoc :inc-by 0)) + (do + (l/err :hint "unhandled exception on task" + ::l/context (get-error-context cause task) + :cause cause) + (if (>= (:retry-num task) (:max-retries task)) + {:status :failed :task task :error cause} + {:status :retry :task task :error cause}))))))) + +(defn- run-task! + [{:keys [::rds/rconn ::id] :as cfg} task-id] + (loop [task (get-task cfg task-id)] + (cond + (ex/exception? task) + (if (or (db/connection-error? task) + (db/serialization-error? task)) + (do + (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task cfg task-id))) + (do + (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task cfg task-id)))) + + (nil? task) + (l/wrn :hint "no task found on the database" + :id id + :task-id task-id) + + :else + (run-task cfg task)))) + (defn- run-worker-loop! - [{:keys [::db/pool ::rds/rconn ::wrk/registry ::timeout ::queue ::id]}] + [{:keys [::db/pool ::rds/rconn ::timeout ::queue] :as cfg}] (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}] (let [explain (ex-message error) nretry (+ (:retry-num task) inc-by) @@ -82,88 +166,6 @@ :length (alength payload) :cause cause)))) - (handle-task [{:keys [name] :as task}] - (let [task-fn (get registry name)] - (if task-fn - (task-fn task) - (l/wrn :hint "no task handler found" :name name)) - {:status :completed :task task})) - - (handle-task-exception [cause task] - (let [edata (ex-data cause)] - (if (and (< (:retry-num task) - (:max-retries task)) - (= ::retry (:type edata))) - (cond-> {:status :retry :task task :error cause} - (dt/duration? (:delay edata)) - (assoc :delay (:delay edata)) - - (= ::noop (:strategy edata)) - (assoc :inc-by 0)) - (do - (l/err :hint "unhandled exception on task" - ::l/context (get-error-context cause task) - :cause cause) - (if (>= (:retry-num task) (:max-retries task)) - {:status :failed :task task :error cause} - {:status :retry :task task :error cause}))))) - - (get-task [task-id] - (ex/try! - (some-> (db/get* pool :task {:id task-id}) - (decode-task-row)))) - - (run-task [task-id] - (loop [task (get-task task-id)] - (cond - (ex/exception? task) - (if (or (db/connection-error? task) - (db/serialization-error? task)) - (do - (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" - :id id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id))) - (do - (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" - :id id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id)))) - - (nil? task) - (l/wrn :hint "no task found on the database" - :id id - :task-id task-id) - - :else - (try - (l/dbg :hint "start" - :name (:name task) - :task-id (str task-id) - :queue queue - :runner-id id - :retry (:retry-num task)) - (let [tpoint (dt/tpoint) - result (handle-task task) - elapsed (dt/format-duration (tpoint))] - - (l/dbg :hint "end" - :name (:name task) - :task-id (str task-id) - :queue queue - :runner-id id - :retry (:retry-num task) - :elapsed elapsed) - - result) - - (catch InterruptedException cause - (throw cause)) - (catch Throwable cause - (handle-task-exception cause task)))))) - (process-result [{:keys [status] :as result}] (ex/try! (case status @@ -173,7 +175,7 @@ nil))) (run-task-loop [task-id] - (loop [result (run-task task-id)] + (loop [result (run-task! cfg task-id)] (when-let [cause (process-result result)] (if (or (db/connection-error? cause) (db/serialization-error? cause)) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 61b5f42bf..a83cec1b6 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -20,8 +20,8 @@ [app.config :as cf] [app.db :as db] [app.main :as main] - [app.media] [app.media :as-alias mtx] + [app.media] [app.migrations] [app.msgbus :as-alias mbus] [app.rpc :as-alias rpc] @@ -34,6 +34,7 @@ [app.util.blob :as blob] [app.util.services :as sv] [app.util.time :as dt] + [app.worker.runner] [clojure.java.io :as io] [clojure.spec.alpha :as s] [clojure.test :as t] @@ -425,6 +426,18 @@ (let [task-fn (get tasks (d/name name))] (task-fn params))))) +(def sql:pending-tasks + "select t.* from task as t + where t.status = 'new' + order by t.priority desc, t.scheduled_at") + +(defn run-pending-tasks! + [] + (db/tx-run! *system* (fn [{:keys [::db/conn] :as cfg}] + (let [tasks (->> (db/exec! conn [sql:pending-tasks]) + (map #'app.worker.runner/decode-task-row))] + (run! (partial #'app.worker.runner/run-task cfg) tasks))))) + ;; --- UTILS (defn print-error! diff --git a/backend/test/backend_tests/rpc_file_test.clj b/backend/test/backend_tests/rpc_file_test.clj index c50c58252..35d76231f 100644 --- a/backend/test/backend_tests/rpc_file_test.clj +++ b/backend/test/backend_tests/rpc_file_test.clj @@ -1189,6 +1189,7 @@ (t/is (nil? error)) (t/is (map? result))) + ;; insert another thumbnail with different revn (let [data {::th/type :create-file-thumbnail ::rpc/profile-id (:id prof) :file-id (:id file) @@ -1207,8 +1208,6 @@ (t/is (= 2 (count rows))))) (t/testing "gc task" - ;; make the file eligible for GC waiting 300ms (configured - ;; timeout for testing) (let [res (th/run-task! :file-gc {:min-age 0})] (t/is (= 1 (:processed res)))) diff --git a/backend/test/backend_tests/rpc_team_test.clj b/backend/test/backend_tests/rpc_team_test.clj index 65acef49d..3bd6ac3b9 100644 --- a/backend/test/backend_tests/rpc_team_test.clj +++ b/backend/test/backend_tests/rpc_team_test.clj @@ -391,6 +391,8 @@ (t/is (= 1 (count result))) (t/is (= (:default-team-id profile1) (get-in result [0 :id]))))) + (th/run-pending-tasks!) + ;; run permanent deletion (should be noop) (let [result (th/run-task! :objects-gc {:min-age (dt/duration {:minutes 1})})] (t/is (= 0 (:processed result)))) @@ -457,6 +459,8 @@ #_(th/print-result! out) (t/is (nil? (:error out)))) + (th/run-pending-tasks!) + (let [rows (th/db-exec! ["select * from team where id = ?" (:id team)])] (t/is (= 1 (count rows))) (t/is (dt/instant? (:deleted-at (first rows)))))