diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ee58a21b5b..07946e59a3 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -344,6 +344,7 @@ {:sendmail (ig/ref ::email/handler) :objects-gc (ig/ref :app.tasks.objects-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler) + :file-gc-scheduler (ig/ref :app.tasks.file-gc-scheduler/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) @@ -394,6 +395,9 @@ {::db/pool (ig/ref ::db/pool) ::sto/storage (ig/ref ::sto/storage)} + :app.tasks.file-gc-scheduler/handler + {::db/pool (ig/ref ::db/pool)} + :app.tasks.file-xlog-gc/handler {::db/pool (ig/ref ::db/pool)} @@ -485,7 +489,7 @@ :task :tasks-gc} {:cron #app/cron "0 0 2 * * ?" ;; daily - :task :file-gc} + :task :file-gc-scheduler} {:cron #app/cron "0 30 */3,23 * * ?" :task :telemetry} diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index 79f5ff8b97..e007485f5c 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -10,6 +10,7 @@ file is eligible to be garbage collected after some period of inactivity (the default threshold is 72h)." (:require + [app.common.data :as d] [app.binfile.common :as bfc] [app.common.files.migrations :as fmg] [app.common.files.validate :as cfv] @@ -30,69 +31,9 @@ [clojure.spec.alpha :as s] [integrant.core :as ig])) -(declare ^:private clean-file!) - -(defn- decode-file - [cfg {:keys [id] :as file}] - (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)] - (-> file - (update :features db/decode-pgarray #{}) - (update :data blob/decode) - (update :data feat.fdata/process-pointers deref) - (update :data feat.fdata/process-objects (partial into {})) - (update :data assoc :id id) - (fmg/migrate-file)))) - -(defn- update-file! - [{:keys [::db/conn] :as cfg} {:keys [id] :as file}] - (let [file (if (contains? (:features file) "fdata/objects-map") - (feat.fdata/enable-objects-map file) - file) - - file (if (contains? (:features file) "fdata/pointer-map") - (binding [pmap/*tracked* (pmap/create-tracked)] - (let [file (feat.fdata/enable-pointer-map file)] - (feat.fdata/persist-pointers! cfg id) - file)) - file) - - file (-> file - (update :features db/encode-pgarray conn "text") - (update :data blob/encode))] - - (db/update! conn :file - {:has-media-trimmed true - :features (:features file) - :version (:version file) - :data (:data file)} - {:id id} - {::db/return-keys true}))) - -(def ^:private - sql:get-candidates - "SELECT f.id, - f.data, - f.revn, - f.version, - f.features, - f.modified_at - FROM file AS f - WHERE f.has_media_trimmed IS false - AND f.modified_at < now() - ?::interval - AND f.deleted_at IS NULL - ORDER BY f.modified_at DESC - FOR UPDATE - SKIP LOCKED") - -(defn- get-candidates - [{:keys [::db/conn ::min-age ::file-id]}] - (if (uuid? file-id) - (do - (l/warn :hint "explicit file id passed on params" :file-id (str file-id)) - (db/query conn :file {:id file-id})) - - (let [min-age (db/interval min-age)] - (db/cursor conn [sql:get-candidates min-age] {:chunk-size 1})))) +(declare ^:private get-file) +(declare ^:private decode-file) +(declare ^:private persist-file!) (def ^:private sql:mark-file-media-object-deleted "UPDATE file_media_object @@ -172,7 +113,6 @@ file)) - (def ^:private sql:get-files-for-library "SELECT f.id, f.data, f.modified_at, f.features, f.version FROM file AS f @@ -274,16 +214,74 @@ (cfv/validate-file-schema! file) file)) +(def ^:private sql:get-file + "SELECT f.id, + f.data, + f.revn, + f.version, + f.features, + f.modified_at + FROM file AS f + WHERE f.has_media_trimmed IS false + AND f.modified_at < now() - ?::interval + AND f.deleted_at IS NULL + AND f.id = ? + FOR UPDATE + SKIP LOCKED") + +(defn- get-file + [{:keys [::db/conn ::min-age ::file-id]}] + (->> (db/exec! conn [sql:get-file min-age file-id]) + (first))) + +(defn- decode-file + [cfg {:keys [id] :as file}] + (binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)] + (-> file + (update :features db/decode-pgarray #{}) + (update :data blob/decode) + (update :data feat.fdata/process-pointers deref) + (update :data feat.fdata/process-objects (partial into {})) + (update :data assoc :id id) + (fmg/migrate-file)))) + +(defn- persist-file! + [{:keys [::db/conn] :as cfg} {:keys [id] :as file}] + (let [file (if (contains? (:features file) "fdata/objects-map") + (feat.fdata/enable-objects-map file) + file) + + file (if (contains? (:features file) "fdata/pointer-map") + (binding [pmap/*tracked* (pmap/create-tracked)] + (let [file (feat.fdata/enable-pointer-map file)] + (feat.fdata/persist-pointers! cfg id) + file)) + file) + + file (-> file + (update :features db/encode-pgarray conn "text") + (update :data blob/encode))] + + (db/update! conn :file + {:has-media-trimmed true + :features (:features file) + :version (:version file) + :data (:data file)} + {:id id} + {::db/return-keys true}))) + (defn- process-file! - [cfg file] + [cfg] (try - (let [file (decode-file cfg file) - file (clean-media! cfg file) - file (update-file! cfg file)] - (clean-data-fragments! cfg file)) + (if-let [file (get-file cfg)] + (let [file (decode-file cfg file) + file (clean-media! cfg file) + file (persist-file! cfg file)] + (clean-data-fragments! cfg file)) + (l/dbg :hint "skip" :file-id (str (::file-id cfg)))) (catch Throwable cause - (l/err :hint "error on cleaning file (skiping)" - :file-id (str (:id file)) + (l/err :hint "error on cleaning file" + :file-id (str (::file-id cfg)) :cause cause)))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -293,33 +291,16 @@ (defmethod ig/pre-init-spec ::handler [_] (s/keys :req [::db/pool ::sto/storage])) -(defmethod ig/prep-key ::handler - [_ cfg] - (assoc cfg ::min-age (cf/get-deletion-delay))) - (defmethod ig/init-key ::handler [_ cfg] (fn [{:keys [props] :as task}] - (db/tx-run! cfg - (fn [{:keys [::db/conn] :as cfg}] - (let [min-age (dt/duration (or (:min-age props) (::min-age cfg))) - cfg (-> cfg - (update ::sto/storage media/configure-assets-storage conn) - (assoc ::file-id (:file-id props)) - (assoc ::min-age min-age)) + (let [min-age (dt/duration (:min-age props 0)) + cfg (-> cfg + (assoc ::db/rollback (:rollback? props)) + (assoc ::file-id (:file-id props)) + (assoc ::min-age (db/interval min-age)))] - total (reduce (fn [total file] - (process-file! cfg file) - (inc total)) - 0 - (get-candidates cfg))] - - (l/inf :hint "finished" - :min-age (dt/format-duration min-age) - :processed total) - - ;; Allow optional rollback passed by params - (when (:rollback? props) - (db/rollback! conn)) - - {:processed total}))))) + (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] + (let [cfg (update cfg ::sto/storage media/configure-assets-storage conn)] + (process-file! cfg)))) + nil))) diff --git a/backend/src/app/tasks/file_gc_scheduler.clj b/backend/src/app/tasks/file_gc_scheduler.clj new file mode 100644 index 0000000000..11c24e8890 --- /dev/null +++ b/backend/src/app/tasks/file_gc_scheduler.clj @@ -0,0 +1,65 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.file-gc-scheduler + "A maintenance task that is responsible of properly scheduling the + file-gc task for all files that matches the eligibility threshold." + (:require + [app.common.logging :as l] + [app.config :as cf] + [app.db :as db] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(def ^:private + sql:get-candidates + "SELECT f.id, + f.modified_at + FROM file AS f + WHERE f.has_media_trimmed IS false + AND f.modified_at < now() - ?::interval + AND f.deleted_at IS NULL + ORDER BY f.modified_at DESC + FOR UPDATE + SKIP LOCKED") + +(defn- get-candidates + [{:keys [::db/conn ::min-age] :as cfg}] + (let [min-age (db/interval min-age)] + (db/cursor conn [sql:get-candidates min-age] {:chunk-size 10}))) + +(defn- schedule! + [{:keys [::min-age] :as cfg}] + (let [total (reduce (fn [total {:keys [id]}] + (let [params {:file-id id :min-age min-age}] + (wrk/submit! (assoc cfg ::wrk/params params)) + (inc total))) + 0 + (get-candidates cfg))] + + {:processed total})) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/prep-key ::handler + [_ cfg] + (assoc cfg ::min-age (cf/get-deletion-delay))) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [{:keys [props] :as task}] + (let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))] + (-> cfg + (assoc ::db/rollback (:rollback? props)) + (assoc ::min-age min-age) + (assoc ::wrk/task :file-gc) + (assoc ::wrk/priority 10) + (assoc ::wrk/mark-retries 0) + (assoc ::wrk/delay 1000) + (db/tx-run! schedule!)))))