diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 97171825c1..023783828d 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -349,8 +349,8 @@ :audit-log-archive (ig/ref :app.loggers.audit.archive-task/handler) :audit-log-gc (ig/ref :app.loggers.audit.gc-task/handler) - :object-update - (ig/ref :app.tasks.object-update/handler) + :delete-object + (ig/ref :app.tasks.delete-object/handler) :process-webhook-event (ig/ref ::webhooks/process-event-handler) :run-webhook @@ -380,7 +380,7 @@ :app.tasks.orphan-teams-gc/handler {::db/pool (ig/ref ::db/pool)} - :app.tasks.object-update/handler + :app.tasks.delete-object/handler {::db/pool (ig/ref ::db/pool)} :app.tasks.file-gc/handler diff --git a/backend/src/app/rpc/commands/files.clj b/backend/src/app/rpc/commands/files.clj index 601907e105..e165173f23 100644 --- a/backend/src/app/rpc/commands/files.clj +++ b/backend/src/app/rpc/commands/files.clj @@ -35,6 +35,7 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str])) @@ -916,7 +917,8 @@ (db/update! conn :file {:deleted-at (dt/now)} {:id file-id} - {::db/return-keys [:id :name :is-shared :project-id :created-at :modified-at]})) + {::db/return-keys [:id :name :is-shared :deleted-at + :project-id :created-at :modified-at]})) (def ^:private schema:delete-file @@ -929,6 +931,13 @@ (check-edition-permissions! conn profile-id id) (let [file (mark-file-deleted! conn id)] + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :file + :deleted-at (:deleted-at file) + :id id}) + ;; NOTE: when a file is a shared library, then we proceed to load ;; the whole file, proceed with feature checking and properly execute ;; the absorb-library procedure diff --git a/backend/src/app/rpc/commands/files_thumbnails.clj b/backend/src/app/rpc/commands/files_thumbnails.clj index bd982ce171..d766acd3c5 100644 --- a/backend/src/app/rpc/commands/files_thumbnails.clj +++ b/backend/src/app/rpc/commands/files_thumbnails.clj @@ -271,7 +271,7 @@ (when (and (some? th1) (not= (:media-id th1) (:media-id th2))) - (sto/touch-object! storage (:media-id th1) :async true)) + (sto/touch-object! storage (:media-id th1))) th2)) diff --git a/backend/src/app/rpc/commands/projects.clj b/backend/src/app/rpc/commands/projects.clj index caa3fe7a0a..29cbeaf514 100644 --- a/backend/src/app/rpc/commands/projects.clj +++ b/backend/src/app/rpc/commands/projects.clj @@ -20,6 +20,7 @@ [app.rpc.quotes :as quotes] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s])) (s/def ::id ::us/uuid) @@ -262,10 +263,16 @@ {:deleted-at (dt/now)} {:id id :is-default false} {::db/return-keys true})] + + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :project + :deleted-at (:deleted-at project) + :id id}) + (rph/with-meta (rph/wrap) {::audit/props {:team-id (:team-id project) :name (:name project) :created-at (:created-at project) :modified-at (:modified-at project)}})))) - - diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index 7a21a12416..dabf6c8487 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -31,6 +31,7 @@ [app.tokens :as tokens] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str])) @@ -528,14 +529,23 @@ {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] (db/with-atomic [conn pool] - (let [perms (get-permissions conn profile-id id)] + (let [perms (get-permissions conn profile-id id) + deleted-at (dt/now)] + (when-not (:is-owner perms) (ex/raise :type :validation :code :only-owner-can-delete-team)) (db/update! conn :team - {:deleted-at (dt/now)} + {:deleted-at deleted-at} {:id id :is-default false}) + + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay (dt/duration "1m") + ::wrk/conn conn + :object :team + :deleted-at deleted-at + :id id}) nil))) diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 48a15d8116..622dc840ec 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -192,7 +192,6 @@ ;; NOTIFICATIONS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - (defn notify! [{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level] :or {code :generic level :info} @@ -474,6 +473,83 @@ :rollback rollback? :elapsed elapsed)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; RESTORE DELETED OBJECTS +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn restore-deleted-team! + "Mark a team and all related objects as not deleted" + [team-id] + (let [team-id (h/parse-uuid team-id)] + (db/tx-run! main/system + (fn [{:keys [::db/conn]}] + (db/update! conn :team-font-variant + {:deleted-at nil} + {:team-id team-id}) + + (doseq [project (db/update! conn :project + {:deleted-at nil} + {:team-id team-id} + {::db/return-keys [:id] + ::db/many true})] + + (doseq [file (db/update! conn :file + {:deleted-at nil + :has-media-trimmed false} + {:project-id (:id project)} + {::db/return-keys [:id] + ::db/many true})] + + ;; Fragments are not handled here because they + ;; use the database cascade operation and they + ;; are not marked for deletion with objects-gc + ;; task + + (db/update! conn :file-media-object + {:deleted-at nil} + {:file-id (:id file)}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at nil} + {:file-id (:id file)}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at nil} + {:file-id (:id file)}))))))) + + +(defn restore-deleted-project! + "Mark a project and all related objects as not deleted" + [project-id] + (let [project-id (h/parse-uuid project-id)] + (db/tx-run! main/system + (fn [{:keys [::db/conn]}] + (doseq [file (db/update! conn :file + {:deleted-at nil + :has-media-trimmed false} + {:project-id project-id} + {::db/return-keys [:id] + ::db/many true})] + + ;; Fragments are not handled here because they use + ;; the database cascade operation and they are not + ;; marked for deletion with objects-gc task + + (db/update! conn :file-media-object + {:deleted-at nil} + {:file-id (:id file)}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at nil} + {:file-id (:id file)}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at nil} + {:file-id (:id file)})))))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MISC ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index 070c53f3fb..c818b03fa1 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -16,7 +16,6 @@ [app.storage.impl :as impl] [app.storage.s3 :as ss3] [app.util.time :as dt] - [app.worker :as wrk] [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] @@ -171,28 +170,16 @@ (impl/put-object object content)) object))) -(def ^:private default-touch-delay - "A default delay for the asynchronous touch operation" - (dt/duration "5m")) - (defn touch-object! "Mark object as touched." - [{:keys [::db/pool-or-conn] :as storage} object-or-id & {:keys [async]}] + [{:keys [::db/pool-or-conn] :as storage} object-or-id] (us/assert! ::storage storage) (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)] - (if async - (wrk/submit! ::wrk/conn pool-or-conn - ::wrk/task :object-update - ::wrk/delay default-touch-delay - :object :storage-object - :id id - :key :touched-at - :val (dt/now)) - (-> (db/update! pool-or-conn :storage-object - {:touched-at (dt/now)} - {:id id}) - (db/get-update-count) - (pos?))))) + (-> (db/update! pool-or-conn :storage-object + {:touched-at (dt/now)} + {:id id}) + (db/get-update-count) + (pos?)))) (defn get-object-data "Return an input stream instance of the object content." diff --git a/backend/src/app/tasks/delete_object.clj b/backend/src/app/tasks/delete_object.clj new file mode 100644 index 0000000000..f0a60d30a8 --- /dev/null +++ b/backend/src/app/tasks/delete_object.clj @@ -0,0 +1,69 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.delete-object + "A generic task for object deletion cascade handling" + (:require + [app.common.logging :as l] + [app.db :as db] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(defmulti delete-object + (fn [_ props] (:object props))) + +(defmethod delete-object :file + [{:keys [::db/conn]} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "file" :id id) + ;; Mark file media objects to be deleted + (db/update! conn :file-media-object + {:deleted-at deleted-at} + {:file-id id}) + + ;; Mark thumbnails to be deleted + (db/update! conn :file-thumbnail + {:deleted-at deleted-at} + {:file-id id}) + + (db/update! conn :file-tagged-object-thumbnail + {:deleted-at deleted-at} + {:file-id id})) + +(defmethod delete-object :project + [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "project" :id id) + (doseq [file (db/update! conn :file + {:deleted-at deleted-at} + {:project-id id} + {::db/return-keys [:id :deleted-at] + ::db/many true})] + (delete-object cfg (assoc file :object :file)))) + +(defmethod delete-object :team + [{:keys [::db/conn] :as cfg} {:keys [id deleted-at]}] + (l/trc :hint "marking for deletion" :rel "team" :id id) + (db/update! conn :team-font-variant + {:deleted-at deleted-at} + {:team-id id}) + + (doseq [project (db/update! conn :project + {:deleted-at deleted-at} + {:team-id id} + {::db/return-keys [:id :deleted-at] + ::db/many true})] + (delete-object cfg (assoc project :object :project)))) + +(defmethod delete-object :default + [_cfg props] + (l/wrn :hint "not implementation found" :rel (:object props))) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [{:keys [props] :as params}] + (db/tx-run! cfg delete-object props))) diff --git a/backend/src/app/tasks/object_update.clj b/backend/src/app/tasks/object_update.clj deleted file mode 100644 index cfe5fda445..0000000000 --- a/backend/src/app/tasks/object_update.clj +++ /dev/null @@ -1,32 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) KALEIDOS INC - -(ns app.tasks.object-update - "A task used for perform simple object properties update - in an asynchronous flow." - (:require - [app.common.data :as d] - [app.common.logging :as l] - [app.db :as db] - [clojure.spec.alpha :as s] - [integrant.core :as ig])) - -(defn- update-object - [{:keys [::db/conn] :as cfg} {:keys [id object key val] :as props}] - (l/trc :hint "update object prop" - :id (str id) - :object (d/name object) - :key (d/name key) - :val val) - (db/update! conn object {key val} {:id id} {::db/return-keys false})) - -(defmethod ig/pre-init-spec ::handler [_] - (s/keys :req [::db/pool])) - -(defmethod ig/init-key ::handler - [_ cfg] - (fn [{:keys [props] :as params}] - (db/tx-run! cfg update-object props))) diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 3caed3271d..da9e1232ff 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -17,67 +17,18 @@ [clojure.spec.alpha :as s] [integrant.core :as ig])) -(declare ^:private delete-file-data-fragments!) -(declare ^:private delete-file-media-objects!) -(declare ^:private delete-file-object-thumbnails!) -(declare ^:private delete-file-thumbnails!) -(declare ^:private delete-files!) -(declare ^:private delete-fonts!) -(declare ^:private delete-profiles!) -(declare ^:private delete-projects!) -(declare ^:private delete-teams!) - -(defmethod ig/pre-init-spec ::handler [_] - (s/keys :req [::db/pool ::sto/storage])) - -(defmethod ig/prep-key ::handler - [_ cfg] - (assoc cfg ::min-age cf/deletion-delay)) - -(defmethod ig/init-key ::handler - [_ cfg] - (fn [params] - (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] - ;; Disable deletion protection for the current transaction - (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) - (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"]) - - (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) - cfg (-> cfg - (assoc ::min-age (db/interval min-age)) - (update ::sto/storage media/configure-assets-storage conn)) - - total (reduce + 0 - [(delete-profiles! cfg) - (delete-teams! cfg) - (delete-fonts! cfg) - (delete-projects! cfg) - (delete-files! cfg) - (delete-file-thumbnails! cfg) - (delete-file-object-thumbnails! cfg) - (delete-file-data-fragments! cfg) - (delete-file-media-objects! cfg)])] - - (l/info :hint "task finished" - :deleted total - :rollback? (boolean (:rollback? params))) - - (when (:rollback? params) - (db/rollback! conn)) - - {:processed total}))))) - (def ^:private sql:get-profiles "SELECT id, photo_id FROM profile WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-profiles! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-profiles min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-profiles min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id photo-id]}] (l/trc :hint "permanently delete" :rel "profile" :id (str id)) @@ -99,13 +50,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-teams! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - - (->> (db/cursor conn [sql:get-teams min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-teams min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id photo-id deleted-at]}] (l/trc :hint "permanently delete" :rel "team" @@ -118,15 +69,6 @@ ;; And finally, permanently delete the team. (db/delete! conn :team {:id id}) - ;; Mark for deletion in cascade - (db/update! conn :team-font-variant - {:deleted-at deleted-at} - {:team-id id}) - - (db/update! conn :project - {:deleted-at deleted-at} - {:team-id id}) - (inc total)) 0))) @@ -136,12 +78,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-fonts! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-fonts min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-fonts min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id team-id deleted-at] :as font}] (l/trc :hint "permanently delete" :rel "team-font-variant" @@ -167,12 +110,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-projects! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-projects min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-projects min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id team-id deleted-at]}] (l/trc :hint "permanently delete" :rel "project" @@ -183,11 +127,6 @@ ;; And finally, permanently delete the project. (db/delete! conn :project {:id id}) - ;; Mark files to be deleted - (db/update! conn :file - {:deleted-at deleted-at} - {:project-id id}) - (inc total)) 0))) @@ -197,12 +136,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-files! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-files min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id deleted-at project-id]}] (l/trc :hint "permanently delete" :rel "file" @@ -210,26 +150,9 @@ :project-id (str project-id) :deleted-at (dt/format-instant deleted-at)) - ;; NOTE: fragments not handled here because they have - ;; cascade. - ;; And finally, permanently delete the file. (db/delete! conn :file {:id id}) - ;; Mark file media objects to be deleted - (db/update! conn :file-media-object - {:deleted-at deleted-at} - {:file-id id}) - - ;; Mark thumbnails to be deleted - (db/update! conn :file-thumbnail - {:deleted-at deleted-at} - {:file-id id}) - - (db/update! conn :file-tagged-object-thumbnail - {:deleted-at deleted-at} - {:file-id id}) - (inc total)) 0))) @@ -239,12 +162,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn delete-file-thumbnails! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-thumbnails min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-thumbnails min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id revn media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-thumbnail" @@ -267,12 +191,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn delete-file-object-thumbnails! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-object-thumbnails min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-object-thumbnails min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id object-id media-id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-tagged-object-thumbnail" @@ -295,12 +220,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-file-data-fragments! - [{:keys [::db/conn ::min-age] :as cfg}] - (->> (db/cursor conn [sql:get-file-data-fragments min-age]) + [{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] + (->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [file-id id deleted-at]}] (l/trc :hint "permanently delete" :rel "file-data-fragment" @@ -319,12 +245,13 @@ WHERE deleted_at IS NOT NULL AND deleted_at < now() - ?::interval ORDER BY deleted_at ASC + LIMIT ? FOR UPDATE SKIP LOCKED") (defn- delete-file-media-objects! - [{:keys [::db/conn ::min-age ::sto/storage] :as cfg}] - (->> (db/cursor conn [sql:get-file-media-objects min-age]) + [{:keys [::db/conn ::min-age ::chunk-size ::sto/storage] :as cfg}] + (->> (db/cursor conn [sql:get-file-media-objects min-age chunk-size] {:chunk-size 1}) (reduce (fn [total {:keys [id file-id deleted-at] :as fmo}] (l/trc :hint "permanently delete" :rel "file-media-object" @@ -340,3 +267,53 @@ (inc total)) 0))) + +(def ^:private deletion-proc-vars + [#'delete-file-media-objects! + #'delete-file-data-fragments! + #'delete-file-object-thumbnails! + #'delete-file-thumbnails! + #'delete-files! + #'delete-projects! + #'delete-fonts! + #'delete-teams! + #'delete-profiles!]) + +(defn- execute-proc! + "A generic function that executes the specified proc iterativelly + until 0 results is returned" + [cfg proc-fn] + (loop [total 0] + (let [result (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] + (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) + (proc-fn cfg)))] + (if (pos? result) + (recur (+ total result)) + total)))) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool ::sto/storage])) + +(defmethod ig/prep-key ::handler + [_ cfg] + (assoc cfg + ::min-age cf/deletion-delay + ::chunk-size 10)) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [params] + (let [min-age (dt/duration (or (:min-age params) (::min-age cfg))) + cfg (-> cfg + (assoc ::min-age (db/interval min-age)) + (update ::sto/storage media/configure-assets-storage))] + + (loop [procs (map deref deletion-proc-vars) + total 0] + (if-let [proc-fn (first procs)] + (let [result (execute-proc! cfg proc-fn)] + (recur (rest procs) + (+ total result))) + (do + (l/inf :hint "task finished" :deleted total) + {:processed total})))))) diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index be3663365d..4082c4a3a4 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -35,8 +35,92 @@ [_ item] {:params item}) +(defn- get-task + [{:keys [::db/pool]} task-id] + (ex/try! + (some-> (db/get* pool :task {:id task-id}) + (decode-task-row)))) + +(defn- run-task + [{:keys [::wrk/registry ::id ::queue] :as cfg} task] + (try + (l/dbg :hint "start" + :name (:name task) + :task-id (str (:id task)) + :queue queue + :runner-id id + :retry (:retry-num task)) + (let [tpoint (dt/tpoint) + task-fn (get registry (:name task)) + result (if task-fn + (task-fn task) + {:status :completed :task task}) + elapsed (dt/format-duration (tpoint))] + + (when-not task-fn + (l/wrn :hint "no task handler found" :name (:name task))) + + (l/dbg :hint "end" + :name (:name task) + :task-id (str (:id task)) + :queue queue + :runner-id id + :retry (:retry-num task) + :elapsed elapsed) + + result) + + (catch InterruptedException cause + (throw cause)) + (catch Throwable cause + (let [edata (ex-data cause)] + (if (and (< (:retry-num task) + (:max-retries task)) + (= ::retry (:type edata))) + (cond-> {:status :retry :task task :error cause} + (dt/duration? (:delay edata)) + (assoc :delay (:delay edata)) + + (= ::noop (:strategy edata)) + (assoc :inc-by 0)) + (do + (l/err :hint "unhandled exception on task" + ::l/context (get-error-context cause task) + :cause cause) + (if (>= (:retry-num task) (:max-retries task)) + {:status :failed :task task :error cause} + {:status :retry :task task :error cause}))))))) + +(defn- run-task! + [{:keys [::rds/rconn ::id] :as cfg} task-id] + (loop [task (get-task cfg task-id)] + (cond + (ex/exception? task) + (if (or (db/connection-error? task) + (db/serialization-error? task)) + (do + (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task cfg task-id))) + (do + (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" + :id id + :cause task) + (px/sleep (::rds/timeout rconn)) + (recur (get-task cfg task-id)))) + + (nil? task) + (l/wrn :hint "no task found on the database" + :id id + :task-id task-id) + + :else + (run-task cfg task)))) + (defn- run-worker-loop! - [{:keys [::db/pool ::rds/rconn ::wrk/registry ::timeout ::queue ::id]}] + [{:keys [::db/pool ::rds/rconn ::timeout ::queue] :as cfg}] (letfn [(handle-task-retry [{:keys [task error inc-by delay] :or {inc-by 1 delay 1000}}] (let [explain (ex-message error) nretry (+ (:retry-num task) inc-by) @@ -82,88 +166,6 @@ :length (alength payload) :cause cause)))) - (handle-task [{:keys [name] :as task}] - (let [task-fn (get registry name)] - (if task-fn - (task-fn task) - (l/wrn :hint "no task handler found" :name name)) - {:status :completed :task task})) - - (handle-task-exception [cause task] - (let [edata (ex-data cause)] - (if (and (< (:retry-num task) - (:max-retries task)) - (= ::retry (:type edata))) - (cond-> {:status :retry :task task :error cause} - (dt/duration? (:delay edata)) - (assoc :delay (:delay edata)) - - (= ::noop (:strategy edata)) - (assoc :inc-by 0)) - (do - (l/err :hint "unhandled exception on task" - ::l/context (get-error-context cause task) - :cause cause) - (if (>= (:retry-num task) (:max-retries task)) - {:status :failed :task task :error cause} - {:status :retry :task task :error cause}))))) - - (get-task [task-id] - (ex/try! - (some-> (db/get* pool :task {:id task-id}) - (decode-task-row)))) - - (run-task [task-id] - (loop [task (get-task task-id)] - (cond - (ex/exception? task) - (if (or (db/connection-error? task) - (db/serialization-error? task)) - (do - (l/wrn :hint "connection error on retrieving task from database (retrying in some instants)" - :id id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id))) - (do - (l/err :hint "unhandled exception on retrieving task from database (retrying in some instants)" - :id id - :cause task) - (px/sleep (::rds/timeout rconn)) - (recur (get-task task-id)))) - - (nil? task) - (l/wrn :hint "no task found on the database" - :id id - :task-id task-id) - - :else - (try - (l/dbg :hint "start" - :name (:name task) - :task-id (str task-id) - :queue queue - :runner-id id - :retry (:retry-num task)) - (let [tpoint (dt/tpoint) - result (handle-task task) - elapsed (dt/format-duration (tpoint))] - - (l/dbg :hint "end" - :name (:name task) - :task-id (str task-id) - :queue queue - :runner-id id - :retry (:retry-num task) - :elapsed elapsed) - - result) - - (catch InterruptedException cause - (throw cause)) - (catch Throwable cause - (handle-task-exception cause task)))))) - (process-result [{:keys [status] :as result}] (ex/try! (case status @@ -173,7 +175,7 @@ nil))) (run-task-loop [task-id] - (loop [result (run-task task-id)] + (loop [result (run-task! cfg task-id)] (when-let [cause (process-result result)] (if (or (db/connection-error? cause) (db/serialization-error? cause)) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 61b5f42bf2..12d76785ee 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -34,6 +34,7 @@ [app.util.blob :as blob] [app.util.services :as sv] [app.util.time :as dt] + [app.worker.runner] [clojure.java.io :as io] [clojure.spec.alpha :as s] [clojure.test :as t] @@ -77,47 +78,6 @@ :enable-feature-components-v2 :disable-file-validation]) -(def test-init-sql - ["alter table project_profile_rel set unlogged;\n" - "alter table file_profile_rel set unlogged;\n" - "alter table presence set unlogged;\n" - "alter table presence set unlogged;\n" - "alter table http_session set unlogged;\n" - "alter table team_profile_rel set unlogged;\n" - "alter table team_project_profile_rel set unlogged;\n" - "alter table comment_thread_status set unlogged;\n" - "alter table comment set unlogged;\n" - "alter table comment_thread set unlogged;\n" - "alter table profile_complaint_report set unlogged;\n" - "alter table file_change set unlogged;\n" - "alter table team_font_variant set unlogged;\n" - "alter table share_link set unlogged;\n" - "alter table usage_quote set unlogged;\n" - "alter table access_token set unlogged;\n" - "alter table profile set unlogged;\n" - "alter table file_library_rel set unlogged;\n" - "alter table file_thumbnail set unlogged;\n" - "alter table file_object_thumbnail set unlogged;\n" - "alter table file_tagged_object_thumbnail set unlogged;\n" - "alter table file_media_object set unlogged;\n" - "alter table file_data_fragment set unlogged;\n" - "alter table file set unlogged;\n" - "alter table project set unlogged;\n" - "alter table team_invitation set unlogged;\n" - "alter table webhook_delivery set unlogged;\n" - "alter table webhook set unlogged;\n" - "alter table team set unlogged;\n" - ;; For some reason, modifying the task realted tables is very very - ;; slow (5s); so we just don't alter them - ;; "alter table task set unlogged;\n" - ;; "alter table task_default set unlogged;\n" - ;; "alter table task_completed set unlogged;\n" - "alter table audit_log set unlogged ;\n" - "alter table storage_object set unlogged;\n" - "alter table server_error_report set unlogged;\n" - "alter table server_prop set unlogged;\n" - "alter table global_complaint_report set unlogged;\n"]) - (defn state-init [next] (with-redefs [app.config/flags (flags/parse flags/default default-flags) @@ -164,9 +124,6 @@ (try (binding [*system* system *pool* (:app.db/pool system)] - (db/with-atomic [conn *pool*] - (doseq [sql test-init-sql] - (db/exec! conn [sql]))) (next)) (finally (ig/halt! system)))))) @@ -181,8 +138,7 @@ (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"]) (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) (let [result (->> (db/exec! conn [sql]) - (map :table-name) - (remove #(= "task" %)))] + (map :table-name))] (doseq [table result] (db/exec! conn [(str "delete from " table ";")])))) @@ -425,6 +381,18 @@ (let [task-fn (get tasks (d/name name))] (task-fn params))))) +(def sql:pending-tasks + "select t.* from task as t + where t.status = 'new' + order by t.priority desc, t.scheduled_at") + +(defn run-pending-tasks! + [] + (db/tx-run! *system* (fn [{:keys [::db/conn] :as cfg}] + (let [tasks (->> (db/exec! conn [sql:pending-tasks]) + (map #'app.worker.runner/decode-task-row))] + (run! (partial #'app.worker.runner/run-task cfg) tasks))))) + ;; --- UTILS (defn print-error! diff --git a/backend/test/backend_tests/rpc_file_test.clj b/backend/test/backend_tests/rpc_file_test.clj index c50c582525..35d76231ff 100644 --- a/backend/test/backend_tests/rpc_file_test.clj +++ b/backend/test/backend_tests/rpc_file_test.clj @@ -1189,6 +1189,7 @@ (t/is (nil? error)) (t/is (map? result))) + ;; insert another thumbnail with different revn (let [data {::th/type :create-file-thumbnail ::rpc/profile-id (:id prof) :file-id (:id file) @@ -1207,8 +1208,6 @@ (t/is (= 2 (count rows))))) (t/testing "gc task" - ;; make the file eligible for GC waiting 300ms (configured - ;; timeout for testing) (let [res (th/run-task! :file-gc {:min-age 0})] (t/is (= 1 (:processed res)))) diff --git a/backend/test/backend_tests/rpc_file_thumbnails_test.clj b/backend/test/backend_tests/rpc_file_thumbnails_test.clj index 11ed4f352e..e5cd918b14 100644 --- a/backend/test/backend_tests/rpc_file_thumbnails_test.clj +++ b/backend/test/backend_tests/rpc_file_thumbnails_test.clj @@ -346,13 +346,5 @@ (assoc :size 312043)))) out (th/command! data)] (t/is (nil? (:error out))) - (t/is (map? (:result out)))) + (t/is (map? (:result out)))))) - (let [[row1 :as rows] - (->> (th/db-query :task {:name "object-update"}) - (map #(update % :props db/decode-transit-pgobject)))] - - ;; (app.common.pprint/pprint rows) - (t/is (= 1 (count rows))) - (t/is (> (inst-ms (dt/diff (:created-at row1) (:scheduled-at row1))) - (inst-ms (dt/duration "4m"))))))) diff --git a/backend/test/backend_tests/rpc_team_test.clj b/backend/test/backend_tests/rpc_team_test.clj index 65acef49d8..3bd6ac3b97 100644 --- a/backend/test/backend_tests/rpc_team_test.clj +++ b/backend/test/backend_tests/rpc_team_test.clj @@ -391,6 +391,8 @@ (t/is (= 1 (count result))) (t/is (= (:default-team-id profile1) (get-in result [0 :id]))))) + (th/run-pending-tasks!) + ;; run permanent deletion (should be noop) (let [result (th/run-task! :objects-gc {:min-age (dt/duration {:minutes 1})})] (t/is (= 0 (:processed result)))) @@ -457,6 +459,8 @@ #_(th/print-result! out) (t/is (nil? (:error out)))) + (th/run-pending-tasks!) + (let [rows (th/db-exec! ["select * from team where id = ?" (:id team)])] (t/is (= 1 (count rows))) (t/is (dt/instant? (:deleted-at (first rows)))))