🎉 Add file-data offload mechanism

This commit is contained in:
Andrey Antukh 2024-08-01 16:17:22 +02:00
parent f6bfe3931c
commit 0e92bcc0de
35 changed files with 502 additions and 216 deletions

View file

@ -27,6 +27,7 @@ export PENPOT_FLAGS="\
enable-file-snapshot \ enable-file-snapshot \
enable-webhooks \ enable-webhooks \
enable-access-tokens \ enable-access-tokens \
enable-tiered-file-data-storage \
enable-file-validation \ enable-file-validation \
enable-file-schema-validation"; enable-file-schema-validation";
@ -62,9 +63,10 @@ mc mb penpot-s3/penpot -p -q
export AWS_ACCESS_KEY_ID=penpot-devenv export AWS_ACCESS_KEY_ID=penpot-devenv
export AWS_SECRET_ACCESS_KEY=penpot-devenv export AWS_SECRET_ACCESS_KEY=penpot-devenv
export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3
export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000 export PENPOT_OBJECTS_STORAGE_BACKEND=s3
export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot export PENPOT_OBJECTS_STORAGE_S3_ENDPOINT=http://minio:9000
export PENPOT_OBJECTS_STORAGE_S3_BUCKET=penpot
export OPTIONS=" export OPTIONS="
-A:jmx-remote -A:dev \ -A:jmx-remote -A:dev \

View file

@ -19,6 +19,7 @@ export PENPOT_FLAGS="\
enable-smtp \ enable-smtp \
enable-file-snapshot \ enable-file-snapshot \
enable-access-tokens \ enable-access-tokens \
enable-tiered-file-data-storage \
enable-file-validation \ enable-file-validation \
enable-file-schema-validation"; enable-file-schema-validation";
@ -56,9 +57,9 @@ mc mb penpot-s3/penpot -p -q
export AWS_ACCESS_KEY_ID=penpot-devenv export AWS_ACCESS_KEY_ID=penpot-devenv
export AWS_SECRET_ACCESS_KEY=penpot-devenv export AWS_SECRET_ACCESS_KEY=penpot-devenv
export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3 export PENPOT_OBJECTS_STORAGE_BACKEND=s3
export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000 export PENPOT_OBJECTS_STORAGE_S3_ENDPOINT=http://minio:9000
export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot export PENPOT_OBJECTS_STORAGE_S3_BUCKET=penpot
entrypoint=${1:-app.main}; entrypoint=${1:-app.main};

View file

@ -22,7 +22,6 @@
[app.db :as db] [app.db :as db]
[app.loggers.audit :as-alias audit] [app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.media :as media]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.commands.teams :as teams] [app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc] [app.rpc.doc :as-alias doc]
@ -403,9 +402,9 @@
(write-obj! output rels))) (write-obj! output rels)))
(defmethod write-section :v1/sobjects (defmethod write-section :v1/sobjects
[{:keys [::sto/storage ::output]}] [{:keys [::output] :as cfg}]
(let [sids (-> bfc/*state* deref :sids) (let [sids (-> bfc/*state* deref :sids)
storage (media/configure-assets-storage storage)] storage (sto/resolve cfg)]
(l/dbg :hint "found sobjects" (l/dbg :hint "found sobjects"
:items (count sids) :items (count sids)
@ -620,8 +619,8 @@
::l/sync? true)))))) ::l/sync? true))))))
(defmethod read-section :v1/sobjects (defmethod read-section :v1/sobjects
[{:keys [::sto/storage ::db/conn ::input ::bfc/overwrite ::bfc/timestamp]}] [{:keys [::db/conn ::input ::bfc/overwrite ::bfc/timestamp] :as cfg}]
(let [storage (media/configure-assets-storage storage) (let [storage (sto/resolve cfg)
ids (read-obj! input) ids (read-obj! input)
thumb? (into #{} (map :media-id) (:thumbnails @bfc/*state*))] thumb? (into #{} (map :media-id) (:thumbnails @bfc/*state*))]

View file

@ -20,7 +20,6 @@
[app.db.sql :as sql] [app.db.sql :as sql]
[app.loggers.audit :as-alias audit] [app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.media :as media]
[app.storage :as sto] [app.storage :as sto]
[app.storage.tmp :as tmp] [app.storage.tmp :as tmp]
[app.util.events :as events] [app.util.events :as events]
@ -347,9 +346,7 @@
[cfg team-id] [cfg team-id]
(let [id (uuid/next) (let [id (uuid/next)
tp (dt/tpoint) tp (dt/tpoint)
cfg (create-database cfg)]
cfg (-> (create-database cfg)
(update ::sto/storage media/configure-assets-storage))]
(l/inf :hint "start" (l/inf :hint "start"
:operation "export" :operation "export"
@ -390,7 +387,6 @@
tp (dt/tpoint) tp (dt/tpoint)
cfg (-> (create-database cfg path) cfg (-> (create-database cfg path)
(update ::sto/storage media/configure-assets-storage)
(assoc ::bfc/timestamp (dt/now)))] (assoc ::bfc/timestamp (dt/now)))]
(l/inf :hint "start" (l/inf :hint "start"

View file

@ -52,8 +52,8 @@
:redis-uri "redis://redis/0" :redis-uri "redis://redis/0"
:assets-storage-backend :assets-fs :objects-storage-backend "fs"
:storage-assets-fs-directory "assets" :objects-storage-fs-directory "assets"
:assets-path "/internal/assets/" :assets-path "/internal/assets/"
:smtp-default-reply-to "Penpot <no-reply@example.com>" :smtp-default-reply-to "Penpot <no-reply@example.com>"
@ -207,16 +207,24 @@
[:prepl-host {:optional true} :string] [:prepl-host {:optional true} :string]
[:prepl-port {:optional true} :int] [:prepl-port {:optional true} :int]
[:assets-storage-backend {:optional true} :keyword]
[:media-directory {:optional true} :string] ;; REVIEW [:media-directory {:optional true} :string] ;; REVIEW
[:media-uri {:optional true} :string] [:media-uri {:optional true} :string]
[:assets-path {:optional true} :string] [:assets-path {:optional true} :string]
;; Legacy, will be removed in 2.5
[:assets-storage-backend {:optional true} :keyword]
[:storage-assets-fs-directory {:optional true} :string] [:storage-assets-fs-directory {:optional true} :string]
[:storage-assets-s3-bucket {:optional true} :string] [:storage-assets-s3-bucket {:optional true} :string]
[:storage-assets-s3-region {:optional true} :keyword] [:storage-assets-s3-region {:optional true} :keyword]
[:storage-assets-s3-endpoint {:optional true} :string] [:storage-assets-s3-endpoint {:optional true} :string]
[:storage-assets-s3-io-threads {:optional true} :int]])) [:storage-assets-s3-io-threads {:optional true} :int]
[:objects-storage-backend {:optional true} :keyword]
[:objects-storage-fs-directory {:optional true} :string]
[:objects-storage-s3-bucket {:optional true} :string]
[:objects-storage-s3-region {:optional true} :keyword]
[:objects-storage-s3-endpoint {:optional true} :string]
[:objects-storage-s3-io-threads {:optional true} :int]]))
(def default-flags (def default-flags
[:enable-backend-api-doc [:enable-backend-api-doc

View file

@ -153,7 +153,7 @@
(s/def ::conn some?) (s/def ::conn some?)
(s/def ::nilable-pool (s/nilable ::pool)) (s/def ::nilable-pool (s/nilable ::pool))
(s/def ::pool pool?) (s/def ::pool pool?)
(s/def ::pool-or-conn some?) (s/def ::connectable some?)
(defn closed? (defn closed?
[pool] [pool]

View file

@ -62,6 +62,7 @@
[datoteka.io :as io] [datoteka.io :as io]
[promesa.util :as pu])) [promesa.util :as pu]))
(def ^:dynamic *stats* (def ^:dynamic *stats*
"A dynamic var for setting up state for collect stats globally." "A dynamic var for setting up state for collect stats globally."
nil) nil)
@ -1742,7 +1743,7 @@
:validate validate? :validate validate?
:skip-on-graphic-error skip-on-graphic-error?) :skip-on-graphic-error skip-on-graphic-error?)
(db/tx-run! (update system ::sto/storage media/configure-assets-storage) (db/tx-run! system
(fn [system] (fn [system]
(binding [*system* system] (binding [*system* system]
(when (string? label) (when (string? label)

View file

@ -12,6 +12,7 @@
[app.common.logging :as l] [app.common.logging :as l]
[app.db :as db] [app.db :as db]
[app.db.sql :as-alias sql] [app.db.sql :as-alias sql]
[app.storage :as sto]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.objects-map :as omap] [app.util.objects-map :as omap]
[app.util.pointer-map :as pmap])) [app.util.pointer-map :as pmap]))
@ -55,12 +56,28 @@
;; POINTER-MAP ;; POINTER-MAP
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn get-file-data
"Get file data given a file instance."
[system {:keys [data-backend data-ref-id] :as file} & {:keys [touch]}]
(if (= data-backend "objects-storage")
(let [storage (sto/resolve system ::db/reuse-conn true)
object (sto/get-object storage data-ref-id)]
(when touch (sto/touch-object! storage data-ref-id))
(sto/get-object-bytes storage object))
(:data file)))
(defn resolve-file-data
[system file & {:as opts}]
(let [data (get-file-data system file opts)]
(assoc file :data data)))
(defn load-pointer (defn load-pointer
"A database loader pointer helper" "A database loader pointer helper"
[system file-id id] [system file-id id]
(let [fragment (db/get* system :file-data-fragment (let [fragment (db/get* system :file-data-fragment
{:id id :file-id file-id} {:id id :file-id file-id}
{::sql/columns [:data]})] {::sql/columns [:data :data-backend :data-ref-id :id]})]
(l/trc :hint "load pointer" (l/trc :hint "load pointer"
:file-id (str file-id) :file-id (str file-id)
@ -74,7 +91,9 @@
:file-id file-id :file-id file-id
:fragment-id id)) :fragment-id id))
(blob/decode (:data fragment)))) (let [data (get-file-data system fragment)]
;; FIXME: conditional thread scheduling for decoding big objects
(blob/decode data))))
(defn persist-pointers! (defn persist-pointers!
"Persist all currently tracked pointer objects" "Persist all currently tracked pointer objects"

View file

@ -57,11 +57,10 @@
(defn- serve-object (defn- serve-object
"Helper function that returns the appropriate response depending on "Helper function that returns the appropriate response depending on
the storage object backend type." the storage object backend type."
[{:keys [::sto/storage] :as cfg} {:keys [backend] :as obj}] [cfg {:keys [backend] :as obj}]
(let [backend (sto/resolve-backend storage backend)] (case backend
(case (::sto/type backend) (:s3 :assets-s3) (serve-object-from-s3 cfg obj)
:s3 (serve-object-from-s3 cfg obj) (:fs :assets-fs) (serve-object-from-fs cfg obj)))
:fs (serve-object-from-fs cfg obj))))
(defn objects-handler (defn objects-handler
"Handler that servers storage objects by id." "Handler that servers storage objects by id."

View file

@ -345,6 +345,7 @@
:objects-gc (ig/ref :app.tasks.objects-gc/handler) :objects-gc (ig/ref :app.tasks.objects-gc/handler)
:file-gc (ig/ref :app.tasks.file-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler)
:file-gc-scheduler (ig/ref :app.tasks.file-gc-scheduler/handler) :file-gc-scheduler (ig/ref :app.tasks.file-gc-scheduler/handler)
:offload-file-data (ig/ref :app.tasks.offload-file-data/handler)
:file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler) :telemetry (ig/ref :app.tasks.telemetry/handler)
@ -398,6 +399,10 @@
:app.tasks.file-gc-scheduler/handler :app.tasks.file-gc-scheduler/handler
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
:app.tasks.offload-file-data/handler
{::db/pool (ig/ref ::db/pool)
::sto/storage (ig/ref ::sto/storage)}
:app.tasks.file-xlog-gc/handler :app.tasks.file-xlog-gc/handler
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
@ -452,17 +457,28 @@
::sto/storage ::sto/storage
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
::sto/backends ::sto/backends
{:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) {:s3 (ig/ref :app.storage.s3/backend)
:assets-fs (ig/ref [::assets :app.storage.fs/backend])}} :fs (ig/ref :app.storage.fs/backend)
[::assets :app.storage.s3/backend] ;; LEGACY (should not be removed, can only be removed after an
{::sto.s3/region (cf/get :storage-assets-s3-region) ;; explicit migration because the database objects/rows will
::sto.s3/endpoint (cf/get :storage-assets-s3-endpoint) ;; still reference the old names).
::sto.s3/bucket (cf/get :storage-assets-s3-bucket) :assets-s3 (ig/ref :app.storage.s3/backend)
::sto.s3/io-threads (cf/get :storage-assets-s3-io-threads)} :assets-fs (ig/ref :app.storage.fs/backend)}}
[::assets :app.storage.fs/backend] :app.storage.s3/backend
{::sto.fs/directory (cf/get :storage-assets-fs-directory)}}) {::sto.s3/region (or (cf/get :storage-assets-s3-region)
(cf/get :objects-storage-s3-region))
::sto.s3/endpoint (or (cf/get :storage-assets-s3-endpoint)
(cf/get :objects-storage-s3-endpoint))
::sto.s3/bucket (or (cf/get :storage-assets-s3-bucket)
(cf/get :objects-storage-s3-bucket))
::sto.s3/io-threads (or (cf/get :storage-assets-s3-io-threads)
(cf/get :objects-storage-s3-io-threads))}
:app.storage.fs/backend
{::sto.fs/directory (or (cf/get :storage-assets-fs-directory)
(cf/get :objects-storage-fs-directory))}})
(def worker-config (def worker-config

View file

@ -313,17 +313,3 @@
(= stype :ttf) (= stype :ttf)
(-> (assoc "font/otf" (ttf->otf sfnt)) (-> (assoc "font/otf" (ttf->otf sfnt))
(assoc "font/ttf" sfnt))))))))) (assoc "font/ttf" sfnt)))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Utility functions
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn configure-assets-storage
"Given storage map, returns a storage configured with the appropriate
backend for assets and optional connection attached."
([storage]
(assoc storage ::sto/backend (cf/get :assets-storage-backend :assets-fs)))
([storage pool-or-conn]
(-> (configure-assets-storage storage)
(assoc ::db/pool-or-conn pool-or-conn))))

View file

@ -382,7 +382,16 @@
:fn (mg/resource "app/migrations/sql/0120-mod-audit-log-table.sql")} :fn (mg/resource "app/migrations/sql/0120-mod-audit-log-table.sql")}
{:name "0121-mod-file-data-fragment-table" {:name "0121-mod-file-data-fragment-table"
:fn (mg/resource "app/migrations/sql/0121-mod-file-data-fragment-table.sql")}]) :fn (mg/resource "app/migrations/sql/0121-mod-file-data-fragment-table.sql")}
{:name "0122-mod-file-table"
:fn (mg/resource "app/migrations/sql/0122-mod-file-table.sql")}
{:name "0122-mod-file-data-fragment-table"
:fn (mg/resource "app/migrations/sql/0122-mod-file-data-fragment-table.sql")}
{:name "0123-mod-file-change-table"
:fn (mg/resource "app/migrations/sql/0123-mod-file-change-table.sql")}])
(defn apply-migrations! (defn apply-migrations!
[pool name migrations] [pool name migrations]

View file

@ -0,0 +1,6 @@
ALTER TABLE file_data_fragment
ADD COLUMN data_backend text NULL,
ADD COLUMN data_ref_id uuid NULL;
CREATE INDEX IF NOT EXISTS file_data_fragment__data_ref_id__idx
ON file_data_fragment (data_ref_id);

View file

@ -0,0 +1,6 @@
ALTER TABLE file_data_fragment
ADD COLUMN data_backend text NULL,
ADD COLUMN data_ref_id uuid NULL;
CREATE INDEX IF NOT EXISTS file_data_fragment__data_ref_id__idx
ON file_data_fragment (data_ref_id);

View file

@ -0,0 +1,4 @@
ALTER TABLE file ADD COLUMN data_ref_id uuid NULL;
CREATE INDEX IF NOT EXISTS file__data_ref_id__idx
ON file (data_ref_id);

View file

@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS file_change__created_at__label__idx
ON file_change (created_at, label);

View file

@ -522,7 +522,6 @@
(create-recovery-token) (create-recovery-token)
(send-email-notification conn))))))) (send-email-notification conn)))))))
(def schema:request-profile-recovery (def schema:request-profile-recovery
[:map {:title "request-profile-recovery"} [:map {:title "request-profile-recovery"}
[:email ::sm/email]]) [:email ::sm/email]])

View file

@ -68,6 +68,9 @@
:max-version fmg/version)) :max-version fmg/version))
file)) file))
;; --- FILE DATA
;; --- FILE PERMISSIONS ;; --- FILE PERMISSIONS
(def ^:private sql:file-permissions (def ^:private sql:file-permissions
@ -258,11 +261,12 @@
(let [params (merge {:id id} (let [params (merge {:id id}
(when (some? project-id) (when (some? project-id)
{:project-id project-id})) {:project-id project-id}))
file (-> (db/get conn :file params file (->> (db/get conn :file params
{::db/check-deleted (not include-deleted?) {::db/check-deleted (not include-deleted?)
::db/remove-deleted (not include-deleted?) ::db/remove-deleted (not include-deleted?)
::sql/for-update lock-for-update?}) ::sql/for-update lock-for-update?})
(decode-row))] (feat.fdata/resolve-file-data cfg)
(decode-row))]
(if (and migrate? (fmg/need-migration? file)) (if (and migrate? (fmg/need-migration? file))
(migrate-file cfg file) (migrate-file cfg file)
file))) file)))
@ -328,8 +332,10 @@
(defn- get-file-fragment (defn- get-file-fragment
[cfg file-id fragment-id] [cfg file-id fragment-id]
(some-> (db/get cfg :file-data-fragment {:file-id file-id :id fragment-id}) (let [resolve-file-data (partial feat.fdata/resolve-file-data cfg)]
(update :data blob/decode))) (some-> (db/get cfg :file-data-fragment {:file-id file-id :id fragment-id})
(resolve-file-data)
(update :data blob/decode))))
(sv/defmethod ::get-file-fragment (sv/defmethod ::get-file-fragment
"Retrieve a file fragment by its ID. Only authenticated users." "Retrieve a file fragment by its ID. Only authenticated users."
@ -802,7 +808,8 @@
(db/update! cfg :file (db/update! cfg :file
{:revn (inc (:revn file)) {:revn (inc (:revn file))
:data (blob/encode (:data file)) :data (blob/encode (:data file))
:modified-at (dt/now)} :modified-at (dt/now)
:has-media-trimmed false}
{:id file-id}) {:id file-id})
(feat.fdata/persist-pointers! cfg file-id)))) (feat.fdata/persist-pointers! cfg file-id))))

View file

@ -14,7 +14,6 @@
[app.db :as db] [app.db :as db]
[app.db.sql :as-alias sql] [app.db.sql :as-alias sql]
[app.main :as-alias main] [app.main :as-alias main]
[app.media :as media]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.commands.files :as files] [app.rpc.commands.files :as files]
[app.rpc.commands.profile :as profile] [app.rpc.commands.profile :as profile]
@ -63,8 +62,8 @@
(db/run! cfg get-file-snapshots params)) (db/run! cfg get-file-snapshots params))
(defn restore-file-snapshot! (defn restore-file-snapshot!
[{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}] [{:keys [::db/conn] :as cfg} {:keys [file-id id]}]
(let [storage (media/configure-assets-storage storage conn) (let [storage (sto/resolve cfg {::db/reuse-conn true})
file (files/get-minimal-file conn file-id {::db/for-update true}) file (files/get-minimal-file conn file-id {::db/for-update true})
snapshot (db/get* conn :file-change snapshot (db/get* conn :file-change
{:file-id file-id {:file-id file-id

View file

@ -295,8 +295,7 @@
(db/run! cfg files/check-edition-permissions! profile-id file-id) (db/run! cfg files/check-edition-permissions! profile-id file-id)
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (create-file-object-thumbnail! cfg file-id object-id media (or tag "frame")))
(create-file-object-thumbnail! cfg file-id object-id media (or tag "frame"))))
;; --- MUTATION COMMAND: delete-file-object-thumbnail ;; --- MUTATION COMMAND: delete-file-object-thumbnail
@ -327,7 +326,7 @@
(files/check-edition-permissions! cfg profile-id file-id) (files/check-edition-permissions! cfg profile-id file-id)
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(-> cfg (-> cfg
(update ::sto/storage media/configure-assets-storage conn) (update ::sto/storage sto/configure conn)
(delete-file-object-thumbnail! file-id object-id)) (delete-file-object-thumbnail! file-id object-id))
nil))) nil)))
@ -405,7 +404,6 @@
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(files/check-edition-permissions! conn profile-id file-id) (files/check-edition-permissions! conn profile-id file-id)
(when-not (db/read-only? conn) (when-not (db/read-only? conn)
(let [cfg (update cfg ::sto/storage media/configure-assets-storage) (let [media (create-file-thumbnail! cfg params)]
media (create-file-thumbnail! cfg params)]
{:uri (files/resolve-public-uri (:id media)) {:uri (files/resolve-public-uri (:id media))
:id (:id media)}))))) :id (:id media)})))))

View file

@ -227,8 +227,12 @@
(defn- update-file* (defn- update-file*
[{:keys [::db/conn ::wrk/executor] :as cfg} [{:keys [::db/conn ::wrk/executor] :as cfg}
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
(let [;; Process the file data on separated thread for avoid to do (let [;; Retrieve the file data
file (feat.fdata/resolve-file-data cfg file {:touch true})
;; Process the file data on separated thread for avoid to do
;; the CPU intensive operation on vthread. ;; the CPU intensive operation on vthread.
file (px/invoke! executor (partial update-file-data cfg file changes skip-validate)) file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))
features (db/create-array conn "text" (:features file))] features (db/create-array conn "text" (:features file))]
@ -254,6 +258,7 @@
:version (:version file) :version (:version file)
:features features :features features
:data-backend nil :data-backend nil
:data-ref-id nil
:modified-at created-at :modified-at created-at
:has-media-trimmed false} :has-media-trimmed false}
{:id (:id file)}) {:id (:id file)})

View file

@ -95,12 +95,11 @@
[cfg {:keys [::rpc/profile-id team-id] :as params}] [cfg {:keys [::rpc/profile-id team-id] :as params}]
(db/tx-run! cfg (db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}] (fn [{:keys [::db/conn] :as cfg}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (teams/check-edition-permissions! conn profile-id team-id)
(teams/check-edition-permissions! conn profile-id team-id) (quotes/check-quote! conn {::quotes/id ::quotes/font-variants-per-team
(quotes/check-quote! conn {::quotes/id ::quotes/font-variants-per-team ::quotes/profile-id profile-id
::quotes/profile-id profile-id ::quotes/team-id team-id})
::quotes/team-id team-id}) (create-font-variant cfg (assoc params :profile-id profile-id)))))
(create-font-variant cfg (assoc params :profile-id profile-id))))))
(defn create-font-variant (defn create-font-variant
[{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}] [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}]
@ -203,14 +202,13 @@
::sm/params schema:delete-font} ::sm/params schema:delete-font}
[cfg {:keys [::rpc/profile-id id team-id]}] [cfg {:keys [::rpc/profile-id id team-id]}]
(db/tx-run! cfg (db/tx-run! cfg
(fn [{:keys [::db/conn ::sto/storage] :as cfg}] (fn [{:keys [::db/conn] :as cfg}]
(teams/check-edition-permissions! conn profile-id team-id) (teams/check-edition-permissions! conn profile-id team-id)
(let [fonts (db/query conn :team-font-variant (let [fonts (db/query conn :team-font-variant
{:team-id team-id {:team-id team-id
:font-id id :font-id id
:deleted-at nil} :deleted-at nil}
{::sql/for-update true}) {::sql/for-update true})
storage (media/configure-assets-storage storage conn)
tnow (dt/now)] tnow (dt/now)]
(when-not (seq fonts) (when-not (seq fonts)
@ -220,11 +218,7 @@
(doseq [font fonts] (doseq [font fonts]
(db/update! conn :team-font-variant (db/update! conn :team-font-variant
{:deleted-at tnow} {:deleted-at tnow}
{:id (:id font)}) {:id (:id font)}))
(some->> (:woff1-file-id font) (sto/touch-object! storage))
(some->> (:woff2-file-id font) (sto/touch-object! storage))
(some->> (:ttf-file-id font) (sto/touch-object! storage))
(some->> (:otf-file-id font) (sto/touch-object! storage)))
(rph/with-meta (rph/wrap) (rph/with-meta (rph/wrap)
{::audit/props {:id id {::audit/props {:id id
@ -245,22 +239,16 @@
::sm/params schema:delete-font-variant} ::sm/params schema:delete-font-variant}
[cfg {:keys [::rpc/profile-id id team-id]}] [cfg {:keys [::rpc/profile-id id team-id]}]
(db/tx-run! cfg (db/tx-run! cfg
(fn [{:keys [::db/conn ::sto/storage] :as cfg}] (fn [{:keys [::db/conn] :as cfg}]
(teams/check-edition-permissions! conn profile-id team-id) (teams/check-edition-permissions! conn profile-id team-id)
(let [variant (db/get conn :team-font-variant (let [variant (db/get conn :team-font-variant
{:id id :team-id team-id} {:id id :team-id team-id}
{::sql/for-update true}) {::sql/for-update true})]
storage (media/configure-assets-storage storage conn)]
(db/update! conn :team-font-variant (db/update! conn :team-font-variant
{:deleted-at (dt/now)} {:deleted-at (dt/now)}
{:id (:id variant)}) {:id (:id variant)})
(some->> (:woff1-file-id variant) (sto/touch-object! storage))
(some->> (:woff2-file-id variant) (sto/touch-object! storage))
(some->> (:ttf-file-id variant) (sto/touch-object! storage))
(some->> (:otf-file-id variant) (sto/touch-object! storage))
(rph/with-meta (rph/wrap) (rph/with-meta (rph/wrap)
{::audit/props {:font-family (:font-family variant) {::audit/props {:font-family (:font-family variant)
:font-id (:font-id variant)}}))))) :font-id (:font-id variant)}})))))

View file

@ -56,21 +56,19 @@
::climit/id [[:process-image/by-profile ::rpc/profile-id] ::climit/id [[:process-image/by-profile ::rpc/profile-id]
[:process-image/global]]} [:process-image/global]]}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}] [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (files/check-edition-permissions! pool profile-id file-id)
(media/validate-media-type! content)
(media/validate-media-size! content)
(files/check-edition-permissions! pool profile-id file-id) (db/run! cfg (fn [cfg]
(media/validate-media-type! content) (let [object (create-file-media-object cfg params)
(media/validate-media-size! content) props {:name (:name params)
:file-id file-id
(db/run! cfg (fn [cfg] :is-local (:is-local params)
(let [object (create-file-media-object cfg params) :size (:size content)
props {:name (:name params) :mtype (:mtype content)}]
:file-id file-id (with-meta object
:is-local (:is-local params) {::audit/replace-props props})))))
:size (:size content)
:mtype (:mtype content)}]
(with-meta object
{::audit/replace-props props}))))))
(defn- big-enough-for-thumbnail? (defn- big-enough-for-thumbnail?
"Checks if the provided image info is big enough for "Checks if the provided image info is big enough for
@ -183,9 +181,8 @@
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:create-file-media-object-from-url} ::sm/params schema:create-file-media-object-from-url}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (files/check-edition-permissions! pool profile-id file-id)
(files/check-edition-permissions! pool profile-id file-id) (create-file-media-object-from-url cfg (assoc params :profile-id profile-id)))
(create-file-media-object-from-url cfg (assoc params :profile-id profile-id))))
(defn download-image (defn download-image
[{:keys [::http/client]} uri] [{:keys [::http/client]} uri]

View file

@ -210,8 +210,7 @@
[cfg {:keys [::rpc/profile-id file] :as params}] [cfg {:keys [::rpc/profile-id file] :as params}]
;; Validate incoming mime type ;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (update-profile-photo cfg (assoc params :profile-id profile-id)))
(update-profile-photo cfg (assoc params :profile-id profile-id))))
(defn update-profile-photo (defn update-profile-photo
[{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}] [{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}]

View file

@ -674,8 +674,7 @@
[cfg {:keys [::rpc/profile-id file] :as params}] [cfg {:keys [::rpc/profile-id file] :as params}]
;; Validate incoming mime type ;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (update-team-photo cfg (assoc params :profile-id profile-id)))
(update-team-photo cfg (assoc params :profile-id profile-id))))
(defn update-team-photo (defn update-team-photo
[{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id team-id] :as params}] [{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id team-id] :as params}]

View file

@ -6,11 +6,13 @@
(ns app.storage (ns app.storage
"Objects storage abstraction layer." "Objects storage abstraction layer."
(:refer-clojure :exclude [resolve])
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.data.macros :as dm] [app.common.data.macros :as dm]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db] [app.db :as db]
[app.storage.fs :as sfs] [app.storage.fs :as sfs]
[app.storage.impl :as impl] [app.storage.impl :as impl]
@ -18,16 +20,23 @@
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.fs :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig])
[promesa.core :as p])
(:import (:import
java.io.InputStream)) java.io.InputStream))
(defn get-legacy-backend
[]
(let [name (cf/get :assets-storage-backend)]
(case name
:assets-fs :fs
:assets-s3 :s3
:fs)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Storage Module State ;; Storage Module State
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::id #{:assets-fs :assets-s3}) (s/def ::id #{:assets-fs :assets-s3 :fs :s3})
(s/def ::s3 ::ss3/backend) (s/def ::s3 ::ss3/backend)
(s/def ::fs ::sfs/backend) (s/def ::fs ::sfs/backend)
(s/def ::type #{:fs :s3}) (s/def ::type #{:fs :s3})
@ -45,11 +54,13 @@
[_ {:keys [::backends ::db/pool] :as cfg}] [_ {:keys [::backends ::db/pool] :as cfg}]
(-> (d/without-nils cfg) (-> (d/without-nils cfg)
(assoc ::backends (d/without-nils backends)) (assoc ::backends (d/without-nils backends))
(assoc ::db/pool-or-conn pool))) (assoc ::backend (or (get-legacy-backend)
(cf/get :objects-storage-backend :fs)))
(assoc ::db/connectable pool)))
(s/def ::backend keyword?) (s/def ::backend keyword?)
(s/def ::storage (s/def ::storage
(s/keys :req [::backends ::db/pool ::db/pool-or-conn] (s/keys :req [::backends ::db/pool ::db/connectable]
:opt [::backend])) :opt [::backend]))
(s/def ::storage-with-backend (s/def ::storage-with-backend
@ -61,23 +72,26 @@
(defn get-metadata (defn get-metadata
[params] [params]
(into {} (reduce-kv (fn [res k _]
(remove (fn [[k _]] (qualified-keyword? k))) (if (qualified-keyword? k)
params)) (dissoc res k)
res))
params
params))
(defn- get-database-object-by-hash (defn- get-database-object-by-hash
[pool-or-conn backend bucket hash] [connectable backend bucket hash]
(let [sql (str "select * from storage_object " (let [sql (str "select * from storage_object "
" where (metadata->>'~:hash') = ? " " where (metadata->>'~:hash') = ? "
" and (metadata->>'~:bucket') = ? " " and (metadata->>'~:bucket') = ? "
" and backend = ?" " and backend = ?"
" and deleted_at is null" " and deleted_at is null"
" limit 1")] " limit 1")]
(some-> (db/exec-one! pool-or-conn [sql hash bucket (name backend)]) (some-> (db/exec-one! connectable [sql hash bucket (name backend)])
(update :metadata db/decode-transit-pgobject)))) (update :metadata db/decode-transit-pgobject))))
(defn- create-database-object (defn- create-database-object
[{:keys [::backend ::db/pool-or-conn]} {:keys [::content ::expired-at ::touched-at] :as params}] [{:keys [::backend ::db/connectable]} {:keys [::content ::expired-at ::touched-at ::touch] :as params}]
(let [id (or (:id params) (uuid/random)) (let [id (or (:id params) (uuid/random))
mdata (cond-> (get-metadata params) mdata (cond-> (get-metadata params)
(satisfies? impl/IContentHash content) (satisfies? impl/IContentHash content)
@ -86,7 +100,9 @@
:always :always
(dissoc :id)) (dissoc :id))
;; FIXME: touch object on deduplicated put operation ?? touched-at (if touch
(or touched-at (dt/now))
touched-at)
;; NOTE: for now we don't reuse the deleted objects, but in ;; NOTE: for now we don't reuse the deleted objects, but in
;; futute we can consider reusing deleted objects if we ;; futute we can consider reusing deleted objects if we
@ -95,10 +111,20 @@
result (when (and (::deduplicate? params) result (when (and (::deduplicate? params)
(:hash mdata) (:hash mdata)
(:bucket mdata)) (:bucket mdata))
(get-database-object-by-hash pool-or-conn backend (:bucket mdata) (:hash mdata))) (let [result (get-database-object-by-hash connectable backend
(:bucket mdata)
(:hash mdata))]
(if touch
(do
(db/update! connectable :storage-object
{:touched-at touched-at}
{:id (:id result)}
{::db/return-keys false})
(assoc result :touced-at touched-at))
result)))
result (or result result (or result
(-> (db/insert! pool-or-conn :storage-object (-> (db/insert! connectable :storage-object
{:id id {:id id
:size (impl/get-size content) :size (impl/get-size content)
:backend (name backend) :backend (name backend)
@ -154,9 +180,9 @@
(dm/export impl/object?) (dm/export impl/object?)
(defn get-object (defn get-object
[{:keys [::db/pool-or-conn] :as storage} id] [{:keys [::db/connectable] :as storage} id]
(us/assert! ::storage storage) (us/assert! ::storage storage)
(retrieve-database-object pool-or-conn id)) (retrieve-database-object connectable id))
(defn put-object! (defn put-object!
"Creates a new object with the provided content." "Creates a new object with the provided content."
@ -172,10 +198,10 @@
(defn touch-object! (defn touch-object!
"Mark object as touched." "Mark object as touched."
[{:keys [::db/pool-or-conn] :as storage} object-or-id] [{:keys [::db/connectable] :as storage} object-or-id]
(us/assert! ::storage storage) (us/assert! ::storage storage)
(let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)] (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)]
(-> (db/update! pool-or-conn :storage-object (-> (db/update! connectable :storage-object
{:touched-at (dt/now)} {:touched-at (dt/now)}
{:id id}) {:id id})
(db/get-update-count) (db/get-update-count)
@ -195,11 +221,10 @@
"Returns a byte array of object content." "Returns a byte array of object content."
[storage object] [storage object]
(us/assert! ::storage storage) (us/assert! ::storage storage)
(if (or (nil? (:expired-at object)) (when (or (nil? (:expired-at object))
(dt/is-after? (:expired-at object) (dt/now))) (dt/is-after? (:expired-at object) (dt/now)))
(-> (impl/resolve-backend storage (:backend object)) (-> (impl/resolve-backend storage (:backend object))
(impl/get-object-bytes object)) (impl/get-object-bytes object))))
(p/resolved nil)))
(defn get-object-url (defn get-object-url
([storage object] ([storage object]
@ -223,13 +248,26 @@
(-> (impl/get-object-url backend object nil) file-url->path)))) (-> (impl/get-object-url backend object nil) file-url->path))))
(defn del-object! (defn del-object!
[{:keys [::db/pool-or-conn] :as storage} object-or-id] [{:keys [::db/connectable] :as storage} object-or-id]
(us/assert! ::storage storage) (us/assert! ::storage storage)
(let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)
res (db/update! pool-or-conn :storage-object res (db/update! connectable :storage-object
{:deleted-at (dt/now)} {:deleted-at (dt/now)}
{:id id})] {:id id})]
(pos? (db/get-update-count res)))) (pos? (db/get-update-count res))))
(dm/export impl/resolve-backend)
(dm/export impl/calculate-hash) (dm/export impl/calculate-hash)
(defn configure
[storage connectable]
(assoc storage ::db/connectable connectable))
(defn resolve
"Resolves the storage instance with preconfigured backend. You can
specify to reuse the database connection from provided
cfg/system (default false)."
[cfg & {:as opts}]
(let [storage (::storage cfg)]
(if (::db/reuse-conn opts false)
(configure storage (db/get-connectable cfg))
storage)))

View file

@ -76,6 +76,24 @@
(-> (db/exec-one! conn [sql:has-file-thumbnail-refs id]) (-> (db/exec-one! conn [sql:has-file-thumbnail-refs id])
(get :has-refs))) (get :has-refs)))
(def ^:private
sql:has-file-data-refs
"SELECT EXISTS (SELECT 1 FROM file WHERE data_ref_id = ?) AS has_refs")
(defn- has-file-data-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-data-refs id])
(get :has-refs)))
(def ^:private
sql:has-file-data-fragment-refs
"SELECT EXISTS (SELECT 1 FROM file_data_fragment WHERE data_ref_id = ?) AS has_refs")
(defn- has-file-data-fragment-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-data-fragment-refs id])
(get :has-refs)))
(def ^:private sql:mark-freeze-in-bulk (def ^:private sql:mark-freeze-in-bulk
"UPDATE storage_object "UPDATE storage_object
SET touched_at = NULL SET touched_at = NULL
@ -148,6 +166,8 @@
"file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? ids bucket) "file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? ids bucket)
"file-thumbnail" (process-objects! conn has-file-thumbnails-refs? ids bucket) "file-thumbnail" (process-objects! conn has-file-thumbnails-refs? ids bucket)
"profile" (process-objects! conn has-profile-refs? ids bucket) "profile" (process-objects! conn has-profile-refs? ids bucket)
"file-data" (process-objects! conn has-file-data-refs? ids bucket)
"file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket)
(ex/raise :type :internal (ex/raise :type :internal
:code :unexpected-unknown-reference :code :unexpected-unknown-reference
:hint (dm/fmt "unknown reference '%'" bucket)))) :hint (dm/fmt "unknown reference '%'" bucket))))

View file

@ -207,15 +207,13 @@
(str "blake2b:" result))) (str "blake2b:" result)))
(defn resolve-backend (defn resolve-backend
[{:keys [::db/pool] :as storage} backend-id] [storage backend-id]
(let [backend (get-in storage [::sto/backends backend-id])] (let [backend (get-in storage [::sto/backends backend-id])]
(when-not backend (when-not backend
(ex/raise :type :internal (ex/raise :type :internal
:code :backend-not-configured :code :backend-not-configured
:hint (dm/fmt "backend '%' not configured" backend-id))) :hint (dm/fmt "backend '%' not configured" backend-id)))
(-> backend (assoc backend ::sto/id backend-id)))
(assoc ::sto/id backend-id)
(assoc ::db/pool pool))))
(defrecord StorageObject [id size created-at expired-at touched-at backend]) (defrecord StorageObject [id size created-at expired-at touched-at backend])

View file

@ -10,7 +10,6 @@
file is eligible to be garbage collected after some period of file is eligible to be garbage collected after some period of
inactivity (the default threshold is 72h)." inactivity (the default threshold is 72h)."
(:require (:require
[app.common.data :as d]
[app.binfile.common :as bfc] [app.binfile.common :as bfc]
[app.common.files.migrations :as fmg] [app.common.files.migrations :as fmg]
[app.common.files.validate :as cfv] [app.common.files.validate :as cfv]
@ -22,11 +21,11 @@
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.features.fdata :as feat.fdata] [app.features.fdata :as feat.fdata]
[app.media :as media]
[app.storage :as sto] [app.storage :as sto]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.pointer-map :as pmap] [app.util.pointer-map :as pmap]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk]
[clojure.set :as set] [clojure.set :as set]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig])) [integrant.core :as ig]))
@ -272,17 +271,16 @@
(defn- process-file! (defn- process-file!
[cfg] [cfg]
(try (if-let [file (get-file cfg)]
(if-let [file (get-file cfg)] (let [file (decode-file cfg file)
(let [file (decode-file cfg file) file (clean-media! cfg file)
file (clean-media! cfg file) file (persist-file! cfg file)]
file (persist-file! cfg file)] (clean-data-fragments! cfg file)
(clean-data-fragments! cfg file)) true)
(l/dbg :hint "skip" :file-id (str (::file-id cfg))))
(catch Throwable cause (do
(l/err :hint "error on cleaning file" (l/dbg :hint "skip" :file-id (str (::file-id cfg)))
:file-id (str (::file-id cfg)) false)))
:cause cause))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER ;; HANDLER
@ -294,13 +292,27 @@
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ cfg] [_ cfg]
(fn [{:keys [props] :as task}] (fn [{:keys [props] :as task}]
(let [min-age (dt/duration (:min-age props 0)) (let [min-age (dt/duration (or (:min-age props)
(cf/get-deletion-delay)))
cfg (-> cfg cfg (-> cfg
(assoc ::db/rollback (:rollback? props)) (assoc ::db/rollback (:rollback? props))
(assoc ::file-id (:file-id props)) (assoc ::file-id (:file-id props))
(assoc ::min-age (db/interval min-age)))] (assoc ::min-age (db/interval min-age)))]
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (try
(let [cfg (update cfg ::sto/storage media/configure-assets-storage conn)] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(process-file! cfg)))) (let [cfg (update cfg ::sto/storage sto/configure conn)
nil))) res (process-file! cfg)]
(when (contains? cf/flags :tiered-file-data-storage)
(wrk/submit! (-> cfg
(assoc ::wrk/task :offload-file-data)
(assoc ::wrk/params props)
(assoc ::wrk/priority 10)
(assoc ::wrk/delay 1000))))
res)))
(catch Throwable cause
(l/err :hint "error on cleaning file"
:file-id (str (:file-id props))
:cause cause))))))

View file

@ -8,7 +8,6 @@
"A maintenance task that is responsible of properly scheduling the "A maintenance task that is responsible of properly scheduling the
file-gc task for all files that matches the eligibility threshold." file-gc task for all files that matches the eligibility threshold."
(:require (:require
[app.common.logging :as l]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.util.time :as dt] [app.util.time :as dt]

View file

@ -11,7 +11,6 @@
[app.common.logging :as l] [app.common.logging :as l]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.media :as media]
[app.storage :as sto] [app.storage :as sto]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
@ -126,7 +125,7 @@
0))) 0)))
(def ^:private sql:get-files (def ^:private sql:get-files
"SELECT id, deleted_at, project_id "SELECT id, deleted_at, project_id, data_ref_id
FROM file FROM file
WHERE deleted_at IS NOT NULL WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval AND deleted_at < now() - ?::interval
@ -136,15 +135,17 @@
SKIP LOCKED") SKIP LOCKED")
(defn- delete-files! (defn- delete-files!
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] [{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1}) (->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id deleted-at project-id]}] (reduce (fn [total {:keys [id deleted-at project-id data-ref-id]}]
(l/trc :hint "permanently delete" (l/trc :hint "permanently delete"
:rel "file" :rel "file"
:id (str id) :id (str id)
:project-id (str project-id) :project-id (str project-id)
:deleted-at (dt/format-instant deleted-at)) :deleted-at (dt/format-instant deleted-at))
(some->> data-ref-id (sto/touch-object! storage))
;; And finally, permanently delete the file. ;; And finally, permanently delete the file.
(db/delete! conn :file {:id id}) (db/delete! conn :file {:id id})
@ -210,7 +211,7 @@
0))) 0)))
(def ^:private sql:get-file-data-fragments (def ^:private sql:get-file-data-fragments
"SELECT file_id, id, deleted_at "SELECT file_id, id, deleted_at, data_ref_id
FROM file_data_fragment FROM file_data_fragment
WHERE deleted_at IS NOT NULL WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval AND deleted_at < now() - ?::interval
@ -220,15 +221,16 @@
SKIP LOCKED") SKIP LOCKED")
(defn- delete-file-data-fragments! (defn- delete-file-data-fragments!
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}] [{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1}) (->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [file-id id deleted-at]}] (reduce (fn [total {:keys [file-id id deleted-at data-ref-id]}]
(l/trc :hint "permanently delete" (l/trc :hint "permanently delete"
:rel "file-data-fragment" :rel "file-data-fragment"
:id (str id) :id (str id)
:file-id (str file-id) :file-id (str file-id)
:deleted-at (dt/format-instant deleted-at)) :deleted-at (dt/format-instant deleted-at))
(some->> data-ref-id (sto/touch-object! storage))
(db/delete! conn :file-data-fragment {:file-id file-id :id id}) (db/delete! conn :file-data-fragment {:file-id file-id :id id})
(inc total)) (inc total))
@ -299,9 +301,7 @@
[_ cfg] [_ cfg]
(fn [{:keys [props] :as task}] (fn [{:keys [props] :as task}]
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg))) (let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))
cfg (-> cfg cfg (assoc cfg ::min-age (db/interval min-age))]
(assoc ::min-age (db/interval min-age))
(update ::sto/storage media/configure-assets-storage))]
(loop [procs (map deref deletion-proc-vars) (loop [procs (map deref deletion-proc-vars)
total 0] total 0]

View file

@ -0,0 +1,85 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.tasks.offload-file-data
"A maintenance task responsible of moving file data from hot
storage (the database row) to a cold storage (fs or s3)."
(:require
[app.common.logging :as l]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.storage :as sto]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(defn- offload-file-data!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(let [file (db/get conn :file {:id file-id}
{::sql/for-update true})
data (sto/content (:data file))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-data"
:content-type "application/octet-stream"
:file-id file-id})]
(l/trc :hint "offload file data"
:file-id (str file-id)
:storage-id (str (:id sobj)))
(db/update! conn :file
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id file-id}
{::db/return-keys false})))
(defn- offload-file-data-fragments!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(doseq [fragment (db/query conn :file-data-fragment
{:file-id file-id
:deleted-at nil
:data-backend nil}
{::db/for-update true})]
(let [data (sto/content (:data fragment))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-data-fragment"
:content-type "application/octet-stream"
:file-id file-id
:file-fragment-id (:id fragment)})]
(l/trc :hint "offload file data fragment"
:file-id (str file-id)
:file-fragment-id (str (:id fragment))
:storage-id (str (:id sobj)))
(db/update! conn :file-data-fragment
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id (:id fragment)}
{::db/return-keys false}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool ::sto/storage]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::file-id (:file-id props))
(db/tx-run! (fn [cfg]
(offload-file-data! cfg)
(offload-file-data-fragments! cfg))))))

View file

@ -149,8 +149,7 @@
shape-id (uuid/random)] shape-id (uuid/random)]
;; Preventive file-gc ;; Preventive file-gc
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; Check the number of fragments before adding the page ;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
@ -171,8 +170,7 @@
(t/is (= 3 (count rows)))) (t/is (= 3 (count rows))))
;; The file-gc should mark for remove unused fragments ;; The file-gc should mark for remove unused fragments
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; Check the number of fragments ;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
@ -210,15 +208,13 @@
(t/is (= 3 (count rows)))) (t/is (= 3 (count rows))))
;; The file-gc should mark for remove unused fragments ;; The file-gc should mark for remove unused fragments
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; The objects-gc should remove unused fragments ;; The objects-gc should remove unused fragments
(let [res (th/run-task! :objects-gc {:min-age 0})] (let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 3 (:processed res)))) (t/is (= 3 (:processed res))))
;; Check the number of fragments; should be 3 because changes ;; Check the number of fragments;
;; are also holding pointers to fragments;
(let [rows (th/db-query :file-data-fragment {:file-id (:id file) (let [rows (th/db-query :file-data-fragment {:file-id (:id file)
:deleted-at nil})] :deleted-at nil})]
(t/is (= 2 (count rows)))) (t/is (= 2 (count rows))))
@ -231,8 +227,7 @@
;; The file-gc should remove fragments related to changes ;; The file-gc should remove fragments related to changes
;; snapshots previously deleted. ;; snapshots previously deleted.
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; Check the number of fragments; ;; Check the number of fragments;
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})] (let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
@ -325,12 +320,10 @@
(t/is (= 0 (:delete res)))) (t/is (= 0 (:delete res))))
;; run the file-gc task immediately without forced min-age ;; run the file-gc task immediately without forced min-age
(let [res (th/run-task! :file-gc)] (t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
(t/is (= 0 (:processed res))))
;; run the task again ;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; retrieve file and check trimmed attribute ;; retrieve file and check trimmed attribute
(let [row (th/db-get :file {:id (:id file)})] (let [row (th/db-get :file {:id (:id file)})]
@ -367,8 +360,7 @@
;; Now, we have deleted the usage of pointers to the ;; Now, we have deleted the usage of pointers to the
;; file-media-objects, if we paste file-gc, they should be marked ;; file-media-objects, if we paste file-gc, they should be marked
;; as deleted. ;; as deleted.
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
(let [res (th/run-task! :objects-gc {:min-age 0})] (let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 3 (:processed res)))) (t/is (= 3 (:processed res))))
@ -490,12 +482,10 @@
:strokes [{:opacity 1 :stroke-image {:id (:id fmo5) :width 100 :height 100 :mtype "image/jpeg"}}]})}]) :strokes [{:opacity 1 :stroke-image {:id (:id fmo5) :width 100 :height 100 :mtype "image/jpeg"}}]})}])
;; run the file-gc task immediately without forced min-age ;; run the file-gc task immediately without forced min-age
(let [res (th/run-task! :file-gc)] (t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
(t/is (= 0 (:processed res))))
;; run the task again ;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
(let [res (th/run-task! :objects-gc {:min-age 0})] (let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 2 (:processed res)))) (t/is (= 2 (:processed res))))
@ -534,9 +524,7 @@
;; Now, we have deleted the usage of pointers to the ;; Now, we have deleted the usage of pointers to the
;; file-media-objects, if we paste file-gc, they should be marked ;; file-media-objects, if we paste file-gc, they should be marked
;; as deleted. ;; as deleted.
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(let [res (th/run-task! :objects-gc {:min-age 0})] (let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 7 (:processed res)))) (t/is (= 7 (:processed res))))
@ -659,12 +647,10 @@
(t/is (= 0 (:delete res)))) (t/is (= 0 (:delete res))))
;; run the file-gc task immediately without forced min-age ;; run the file-gc task immediately without forced min-age
(let [res (th/run-task! :file-gc)] (t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
(t/is (= 0 (:processed res))))
;; run the task again ;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; retrieve file and check trimmed attribute ;; retrieve file and check trimmed attribute
(let [row (th/db-get :file {:id (:id file)})] (let [row (th/db-get :file {:id (:id file)})]
@ -693,8 +679,7 @@
:page-id page-id :page-id page-id
:id frame-id-2}]) :id frame-id-2}])
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})] (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})]
(t/is (= 2 (count rows))) (t/is (= 2 (count rows)))
@ -727,8 +712,7 @@
:page-id page-id :page-id page-id
:id frame-id-1}]) :id frame-id-1}])
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})] (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})]
(t/is (= 1 (count rows))) (t/is (= 1 (count rows)))
@ -1127,8 +1111,7 @@
(th/sleep 300) (th/sleep 300)
;; run the task ;; run the task
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; check that object thumbnails are still here ;; check that object thumbnails are still here
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})] (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})]
@ -1157,8 +1140,7 @@
(t/is (= 2 (count rows)))) (t/is (= 2 (count rows))))
;; run the task again ;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
;; check that we have all object thumbnails ;; check that we have all object thumbnails
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})] (let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})]
@ -1220,8 +1202,7 @@
(t/is (= 2 (count rows))))) (t/is (= 2 (count rows)))))
(t/testing "gc task" (t/testing "gc task"
(let [res (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed res))))
(let [rows (th/db-query :file-thumbnail {:file-id (:id file)})] (let [rows (th/db-query :file-thumbnail {:file-id (:id file)})]
(t/is (= 2 (count rows))) (t/is (= 2 (count rows)))
@ -1232,3 +1213,113 @@
(let [rows (th/db-query :file-thumbnail {:file-id (:id file)})] (let [rows (th/db-query :file-thumbnail {:file-id (:id file)})]
(t/is (= 1 (count rows))))))) (t/is (= 1 (count rows)))))))
(defn- update-file!
[& {:keys [profile-id file-id changes revn] :or {revn 0}}]
(let [params {::th/type :update-file
::rpc/profile-id profile-id
:id file-id
:session-id (uuid/random)
:revn revn
:features cfeat/supported-features
:changes changes}
out (th/command! params)]
;; (th/print-result! out)
(t/is (nil? (:error out)))
(:result out)))
(t/deftest file-tiered-storage
(let [profile (th/create-profile* 1)
file (th/create-file* 1 {:profile-id (:id profile)
:project-id (:default-project-id profile)
:is-shared false})
page-id (uuid/random)
shape-id (uuid/random)]
;; Preventive file-gc
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; Preventive objects-gc
(let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 1 (:processed result))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 1 (count rows)))
(t/is (every? #(some? (:data %)) rows)))
;; Mark the file ellegible again for GC
(th/db-update! :file
{:has-media-trimmed false}
{:id (:id file)})
;; Run FileGC again, with tiered storage activated
(with-redefs [app.config/flags (conj app.config/flags :tiered-file-data-storage)]
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; The FileGC task will schedule an inner taskq
(th/run-pending-tasks!))
;; Clean objects after file-gc
(let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 1 (:processed result))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 1 (count rows)))
(t/is (every? #(nil? (:data %)) rows))
(t/is (every? #(uuid? (:data-ref-id %)) rows))
(t/is (every? #(= "objects-storage" (:data-backend %)) rows)))
(let [file (th/db-get :file {:id (:id file)})
storage (sto/resolve th/*system*)]
(t/is (= "objects-storage" (:data-backend file)))
(t/is (nil? (:data file)))
(t/is (uuid? (:data-ref-id file)))
(let [sobj (sto/get-object storage (:data-ref-id file))]
(t/is (= "file-data" (:bucket (meta sobj))))
(t/is (= (:id file) (:file-id (meta sobj))))))
;; Add shape to page that should load from cold storage again into the hot storage (db)
(update-file!
:file-id (:id file)
:profile-id (:id profile)
:revn 0
:changes
[{:type :add-page
:name "test"
:id page-id}])
;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 2 (count rows))))
;; Check the number of fragments
(let [[row1 row2 :as rows]
(th/db-query :file-data-fragment
{:file-id (:id file)
:deleted-at nil}
{:order-by [:created-at]})]
;; (pp/pprint rows)
(t/is (= 2 (count rows)))
(t/is (nil? (:data row1)))
(t/is (= "objects-storage" (:data-backend row1)))
(t/is (bytes? (:data row2)))
(t/is (nil? (:data-backend row2))))
;; The file-gc should mark for remove unused fragments
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; The objects-gc should remove unused fragments
(let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 2 (:processed res))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 2 (count rows)))
(t/is (every? #(bytes? (:data %)) rows))
(t/is (every? #(nil? (:data-ref-id %)) rows))
(t/is (every? #(nil? (:data-backend %)) rows)))))

View file

@ -114,8 +114,7 @@
;; Run the File GC task that should remove unused file object ;; Run the File GC task that should remove unused file object
;; thumbnails ;; thumbnails
(let [result (th/run-task! :file-gc {:min-age 0})] (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})
(t/is (= 1 (:processed result))))
(let [result (th/run-task! :objects-gc {:min-age 0})] (let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 3 (:processed result)))) (t/is (= 3 (:processed result))))
@ -134,7 +133,7 @@
(t/is (some? (sto/get-object storage (:media-id row2)))) (t/is (some? (sto/get-object storage (:media-id row2))))
;; run the task again ;; run the task again
(let [res (th/run-task! "storage-gc-touched" {:min-age 0})] (let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 1 (:delete res))) (t/is (= 1 (:delete res)))
(t/is (= 0 (:freeze res)))) (t/is (= 0 (:freeze res))))
@ -217,8 +216,7 @@
;; Run the File GC task that should remove unused file object ;; Run the File GC task that should remove unused file object
;; thumbnails ;; thumbnails
(let [result (th/run-task! :file-gc {:min-age 0})] (t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(t/is (= 1 (:processed result))))
(let [result (th/run-task! :objects-gc {:min-age 0})] (let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 2 (:processed result)))) (t/is (= 2 (:processed result))))

View file

@ -145,7 +145,7 @@
(t/is (nil? (:result out)))) (t/is (nil? (:result out))))
(let [res (th/run-task! :storage-gc-touched {:min-age 0})] (let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 6 (:freeze res))) (t/is (= 0 (:freeze res)))
(t/is (= 0 (:delete res)))) (t/is (= 0 (:delete res))))
(let [res (th/run-task! :objects-gc {:min-age 0})] (let [res (th/run-task! :objects-gc {:min-age 0})]
@ -207,7 +207,7 @@
(t/is (nil? (:result out)))) (t/is (nil? (:result out))))
(let [res (th/run-task! :storage-gc-touched {:min-age 0})] (let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 3 (:freeze res))) (t/is (= 0 (:freeze res)))
(t/is (= 0 (:delete res)))) (t/is (= 0 (:delete res))))
(let [res (th/run-task! :objects-gc {:min-age 0})] (let [res (th/run-task! :objects-gc {:min-age 0})]
@ -268,7 +268,7 @@
(t/is (nil? (:result out)))) (t/is (nil? (:result out))))
(let [res (th/run-task! :storage-gc-touched {:min-age 0})] (let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 3 (:freeze res))) (t/is (= 0 (:freeze res)))
(t/is (= 0 (:delete res)))) (t/is (= 0 (:delete res))))
(let [res (th/run-task! :objects-gc {:min-age 0})] (let [res (th/run-task! :objects-gc {:min-age 0})]