Split file-gc task in two separated tasks

Add a new file-gc-scheduler task for analizing all files for
elegibility and leave file-gc task with the responsability to
performn the GC operation.
This commit is contained in:
Andrey Antukh 2024-08-01 16:11:25 +02:00
parent d6f528acd2
commit 253b9e5bd8
3 changed files with 148 additions and 98 deletions

View file

@ -344,6 +344,7 @@
{:sendmail (ig/ref ::email/handler) {:sendmail (ig/ref ::email/handler)
:objects-gc (ig/ref :app.tasks.objects-gc/handler) :objects-gc (ig/ref :app.tasks.objects-gc/handler)
:file-gc (ig/ref :app.tasks.file-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler)
:file-gc-scheduler (ig/ref :app.tasks.file-gc-scheduler/handler)
:file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler) :telemetry (ig/ref :app.tasks.telemetry/handler)
@ -394,6 +395,9 @@
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
::sto/storage (ig/ref ::sto/storage)} ::sto/storage (ig/ref ::sto/storage)}
:app.tasks.file-gc-scheduler/handler
{::db/pool (ig/ref ::db/pool)}
:app.tasks.file-xlog-gc/handler :app.tasks.file-xlog-gc/handler
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
@ -485,7 +489,7 @@
:task :tasks-gc} :task :tasks-gc}
{:cron #app/cron "0 0 2 * * ?" ;; daily {:cron #app/cron "0 0 2 * * ?" ;; daily
:task :file-gc} :task :file-gc-scheduler}
{:cron #app/cron "0 30 */3,23 * * ?" {:cron #app/cron "0 30 */3,23 * * ?"
:task :telemetry} :task :telemetry}

View file

@ -10,6 +10,7 @@
file is eligible to be garbage collected after some period of file is eligible to be garbage collected after some period of
inactivity (the default threshold is 72h)." inactivity (the default threshold is 72h)."
(:require (:require
[app.common.data :as d]
[app.binfile.common :as bfc] [app.binfile.common :as bfc]
[app.common.files.migrations :as fmg] [app.common.files.migrations :as fmg]
[app.common.files.validate :as cfv] [app.common.files.validate :as cfv]
@ -30,69 +31,9 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare ^:private clean-file!) (declare ^:private get-file)
(declare ^:private decode-file)
(defn- decode-file (declare ^:private persist-file!)
[cfg {:keys [id] :as file}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(-> file
(update :features db/decode-pgarray #{})
(update :data blob/decode)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(update :data assoc :id id)
(fmg/migrate-file))))
(defn- update-file!
[{:keys [::db/conn] :as cfg} {:keys [id] :as file}]
(let [file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! cfg id)
file))
file)
file (-> file
(update :features db/encode-pgarray conn "text")
(update :data blob/encode))]
(db/update! conn :file
{:has-media-trimmed true
:features (:features file)
:version (:version file)
:data (:data file)}
{:id id}
{::db/return-keys true})))
(def ^:private
sql:get-candidates
"SELECT f.id,
f.data,
f.revn,
f.version,
f.features,
f.modified_at
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
ORDER BY f.modified_at DESC
FOR UPDATE
SKIP LOCKED")
(defn- get-candidates
[{:keys [::db/conn ::min-age ::file-id]}]
(if (uuid? file-id)
(do
(l/warn :hint "explicit file id passed on params" :file-id (str file-id))
(db/query conn :file {:id file-id}))
(let [min-age (db/interval min-age)]
(db/cursor conn [sql:get-candidates min-age] {:chunk-size 1}))))
(def ^:private sql:mark-file-media-object-deleted (def ^:private sql:mark-file-media-object-deleted
"UPDATE file_media_object "UPDATE file_media_object
@ -172,7 +113,6 @@
file)) file))
(def ^:private sql:get-files-for-library (def ^:private sql:get-files-for-library
"SELECT f.id, f.data, f.modified_at, f.features, f.version "SELECT f.id, f.data, f.modified_at, f.features, f.version
FROM file AS f FROM file AS f
@ -274,16 +214,74 @@
(cfv/validate-file-schema! file) (cfv/validate-file-schema! file)
file)) file))
(def ^:private sql:get-file
"SELECT f.id,
f.data,
f.revn,
f.version,
f.features,
f.modified_at
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
AND f.id = ?
FOR UPDATE
SKIP LOCKED")
(defn- get-file
[{:keys [::db/conn ::min-age ::file-id]}]
(->> (db/exec! conn [sql:get-file min-age file-id])
(first)))
(defn- decode-file
[cfg {:keys [id] :as file}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(-> file
(update :features db/decode-pgarray #{})
(update :data blob/decode)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(update :data assoc :id id)
(fmg/migrate-file))))
(defn- persist-file!
[{:keys [::db/conn] :as cfg} {:keys [id] :as file}]
(let [file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! cfg id)
file))
file)
file (-> file
(update :features db/encode-pgarray conn "text")
(update :data blob/encode))]
(db/update! conn :file
{:has-media-trimmed true
:features (:features file)
:version (:version file)
:data (:data file)}
{:id id}
{::db/return-keys true})))
(defn- process-file! (defn- process-file!
[cfg file] [cfg]
(try (try
(let [file (decode-file cfg file) (if-let [file (get-file cfg)]
file (clean-media! cfg file) (let [file (decode-file cfg file)
file (update-file! cfg file)] file (clean-media! cfg file)
(clean-data-fragments! cfg file)) file (persist-file! cfg file)]
(clean-data-fragments! cfg file))
(l/dbg :hint "skip" :file-id (str (::file-id cfg))))
(catch Throwable cause (catch Throwable cause
(l/err :hint "error on cleaning file (skiping)" (l/err :hint "error on cleaning file"
:file-id (str (:id file)) :file-id (str (::file-id cfg))
:cause cause)))) :cause cause))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -293,33 +291,16 @@
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool ::sto/storage])) (s/keys :req [::db/pool ::sto/storage]))
(defmethod ig/prep-key ::handler
[_ cfg]
(assoc cfg ::min-age (cf/get-deletion-delay)))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ cfg] [_ cfg]
(fn [{:keys [props] :as task}] (fn [{:keys [props] :as task}]
(db/tx-run! cfg (let [min-age (dt/duration (:min-age props 0))
(fn [{:keys [::db/conn] :as cfg}] cfg (-> cfg
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg))) (assoc ::db/rollback (:rollback? props))
cfg (-> cfg (assoc ::file-id (:file-id props))
(update ::sto/storage media/configure-assets-storage conn) (assoc ::min-age (db/interval min-age)))]
(assoc ::file-id (:file-id props))
(assoc ::min-age min-age))
total (reduce (fn [total file] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(process-file! cfg file) (let [cfg (update cfg ::sto/storage media/configure-assets-storage conn)]
(inc total)) (process-file! cfg))))
0 nil)))
(get-candidates cfg))]
(l/inf :hint "finished"
:min-age (dt/format-duration min-age)
:processed total)
;; Allow optional rollback passed by params
(when (:rollback? props)
(db/rollback! conn))
{:processed total})))))

View file

@ -0,0 +1,65 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.tasks.file-gc-scheduler
"A maintenance task that is responsible of properly scheduling the
file-gc task for all files that matches the eligibility threshold."
(:require
[app.common.logging :as l]
[app.config :as cf]
[app.db :as db]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(def ^:private
sql:get-candidates
"SELECT f.id,
f.modified_at
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
ORDER BY f.modified_at DESC
FOR UPDATE
SKIP LOCKED")
(defn- get-candidates
[{:keys [::db/conn ::min-age] :as cfg}]
(let [min-age (db/interval min-age)]
(db/cursor conn [sql:get-candidates min-age] {:chunk-size 10})))
(defn- schedule!
[{:keys [::min-age] :as cfg}]
(let [total (reduce (fn [total {:keys [id]}]
(let [params {:file-id id :min-age min-age}]
(wrk/submit! (assoc cfg ::wrk/params params))
(inc total)))
0
(get-candidates cfg))]
{:processed total}))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/prep-key ::handler
[_ cfg]
(assoc cfg ::min-age (cf/get-deletion-delay)))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))]
(-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::min-age min-age)
(assoc ::wrk/task :file-gc)
(assoc ::wrk/priority 10)
(assoc ::wrk/mark-retries 0)
(assoc ::wrk/delay 1000)
(db/tx-run! schedule!)))))