Merge branch 'staging' into develop

This commit is contained in:
Andrey Antukh 2022-02-16 14:00:46 +01:00
commit 60d37b6de0
16 changed files with 535 additions and 426 deletions

View file

@ -181,9 +181,11 @@
(s/def ::storage-assets-fs-directory ::us/string)
(s/def ::storage-assets-s3-bucket ::us/string)
(s/def ::storage-assets-s3-region ::us/keyword)
(s/def ::storage-assets-s3-endpoint ::us/string)
(s/def ::storage-fdata-s3-bucket ::us/string)
(s/def ::storage-fdata-s3-region ::us/keyword)
(s/def ::storage-fdata-s3-prefix ::us/string)
(s/def ::storage-fdata-s3-endpoint ::us/string)
(s/def ::telemetry-uri ::us/string)
(s/def ::telemetry-with-taiga ::us/boolean)
(s/def ::tenant ::us/string)
@ -278,10 +280,12 @@
::storage-assets-fs-directory
::storage-assets-s3-bucket
::storage-assets-s3-region
::storage-assets-s3-endpoint
::fdata-storage-backend
::storage-fdata-s3-bucket
::storage-fdata-s3-region
::storage-fdata-s3-prefix
::storage-fdata-s3-endpoint
::telemetry-enabled
::telemetry-uri
::telemetry-referer

View file

@ -52,10 +52,10 @@
:body (sto/get-object-bytes storage obj)}
:s3
(let [url (sto/get-object-url storage obj {:max-age signature-max-age})]
(let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})]
{:status 307
:headers {"location" (str url)
"x-host" (:host url)
"x-host" (cond-> host port (str ":" port))
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body ""})

View file

@ -49,10 +49,6 @@
:app.storage/gc-touched-task
{:pool (ig/ref :app.db/pool)}
:app.storage/recheck-task
{:pool (ig/ref :app.db/pool)
:storage (ig/ref :app.storage/storage)}
:app.http.session/session
{:pool (ig/ref :app.db/pool)
:tokens (ig/ref :app.tokens/tokens)}
@ -163,9 +159,6 @@
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :session-gc}
{:cron #app/cron "0 0 * * * ?" ;; hourly
:task :storage-recheck}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :objects-gc}
@ -198,7 +191,6 @@
:file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler)
:storage-deleted-gc (ig/ref :app.storage/gc-deleted-task)
:storage-touched-gc (ig/ref :app.storage/gc-touched-task)
:storage-recheck (ig/ref :app.storage/recheck-task)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler)
:session-gc (ig/ref :app.http.session/gc-task)
@ -304,27 +296,28 @@
:app.storage/storage
{:pool (ig/ref :app.db/pool)
:executor (ig/ref :app.worker/executor)
:backends
{:assets-s3 (ig/ref [::assets :app.storage.s3/backend])
:assets-db (ig/ref [::assets :app.storage.db/backend])
:assets-fs (ig/ref [::assets :app.storage.fs/backend])
:backends {
:assets-s3 (ig/ref [::assets :app.storage.s3/backend])
:assets-db (ig/ref [::assets :app.storage.db/backend])
:assets-fs (ig/ref [::assets :app.storage.fs/backend])
:tmp (ig/ref [::tmp :app.storage.fs/backend])
:fdata-s3 (ig/ref [::fdata :app.storage.s3/backend])
:tmp (ig/ref [::tmp :app.storage.fs/backend])
:fdata-s3 (ig/ref [::fdata :app.storage.s3/backend])
;; keep this for backward compatibility
:s3 (ig/ref [::assets :app.storage.s3/backend])
:fs (ig/ref [::assets :app.storage.fs/backend])}}
;; keep this for backward compatibility
:s3 (ig/ref [::assets :app.storage.s3/backend])
:fs (ig/ref [::assets :app.storage.fs/backend])}}
[::fdata :app.storage.s3/backend]
{:region (cf/get :storage-fdata-s3-region)
:bucket (cf/get :storage-fdata-s3-bucket)
:prefix (cf/get :storage-fdata-s3-prefix)}
{:region (cf/get :storage-fdata-s3-region)
:bucket (cf/get :storage-fdata-s3-bucket)
:endpoint (cf/get :storage-fdata-s3-endpoint)
:prefix (cf/get :storage-fdata-s3-prefix)}
[::assets :app.storage.s3/backend]
{:region (cf/get :storage-assets-s3-region)
:bucket (cf/get :storage-assets-s3-bucket)}
{:region (cf/get :storage-assets-s3-region)
:endpoint (cf/get :storage-assets-s3-endpoint)
:bucket (cf/get :storage-assets-s3-bucket)}
[::assets :app.storage.fs/backend]
{:directory (cf/get :storage-assets-fs-directory)}

View file

@ -326,8 +326,10 @@
(defn configure-assets-storage
"Given storage map, returns a storage configured with the appropriate
backend for assets."
[storage conn]
(-> storage
(assoc :conn conn)
(assoc :backend (cf/get :assets-storage-backend :assets-fs))))
([storage]
(assoc storage :backend (cf/get :assets-storage-backend :assets-fs)))
([storage conn]
(-> storage
(assoc :conn conn)
(assoc :backend (cf/get :assets-storage-backend :assets-fs)))))

View file

@ -9,12 +9,10 @@
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.media :as media]
[app.rpc.queries.teams :as teams]
[app.storage :as sto]
[app.util.rlimit :as rlimit]
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]))
@ -39,52 +37,57 @@
::font-id ::font-family ::font-weight ::font-style]))
(sv/defmethod ::create-font-variant
{::rlimit/permits (cf/get :rlimit-font)}
[{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}]
(db/with-atomic [conn pool]
(let [cfg (assoc cfg :conn conn)]
(teams/check-edition-permissions! conn profile-id team-id)
(create-font-variant cfg params))))
(teams/check-edition-permissions! pool profile-id team-id)
(create-font-variant cfg params))
(defn create-font-variant
[{:keys [conn storage] :as cfg} {:keys [data] :as params}]
[{:keys [storage pool] :as cfg} {:keys [data] :as params}]
(let [data (media/run {:cmd :generate-fonts :input data})
storage (media/configure-assets-storage storage conn)
storage (media/configure-assets-storage storage)]
otf (when-let [fdata (get data "font/otf")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/otf"}))
ttf (when-let [fdata (get data "font/ttf")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/ttf"}))
woff1 (when-let [fdata (get data "font/woff")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/woff"}))
woff2 (when-let [fdata (get data "font/woff2")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/woff2"}))]
(when (and (nil? otf)
(nil? ttf)
(nil? woff1)
(nil? woff2))
(when (and (not (contains? data "font/otf"))
(not (contains? data "font/ttf"))
(not (contains? data "font/woff"))
(not (contains? data "font/woff2")))
(ex/raise :type :validation
:code :invalid-font-upload))
(db/insert! conn :team-font-variant
{:id (uuid/next)
:team-id (:team-id params)
:font-id (:font-id params)
:font-family (:font-family params)
:font-weight (:font-weight params)
:font-style (:font-style params)
:woff1-file-id (:id woff1)
:woff2-file-id (:id woff2)
:otf-file-id (:id otf)
:ttf-file-id (:id ttf)})))
(let [otf (when-let [fdata (get data "font/otf")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/otf"
:reference :team-font-variant
:touched-at (dt/now)}))
ttf (when-let [fdata (get data "font/ttf")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/ttf"
:touched-at (dt/now)
:reference :team-font-variant}))
woff1 (when-let [fdata (get data "font/woff")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/woff"
:touched-at (dt/now)
:reference :team-font-variant}))
woff2 (when-let [fdata (get data "font/woff2")]
(sto/put-object storage {:content (sto/content fdata)
:content-type "font/woff2"
:touched-at (dt/now)
:reference :team-font-variant}))]
(db/insert! pool :team-font-variant
{:id (uuid/next)
:team-id (:team-id params)
:font-id (:font-id params)
:font-family (:font-family params)
:font-weight (:font-weight params)
:font-style (:font-style params)
:woff1-file-id (:id woff1)
:woff2-file-id (:id woff2)
:otf-file-id (:id otf)
:ttf-file-id (:id ttf)}))))
;; --- UPDATE FONT FAMILY

View file

@ -10,13 +10,11 @@
[app.common.media :as cm]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.media :as media]
[app.rpc.queries.teams :as teams]
[app.storage :as sto]
[app.util.http :as http]
[app.util.rlimit :as rlimit]
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
@ -49,13 +47,10 @@
:opt-un [::id]))
(sv/defmethod ::upload-file-media-object
{::rlimit/permits (cf/get :rlimit-image)}
[{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(let [file (select-file conn file-id)]
(teams/check-edition-permissions! conn profile-id (:team-id file))
(-> (assoc cfg :conn conn)
(create-file-media-object params)))))
(let [file (select-file pool file-id)]
(teams/check-edition-permissions! pool profile-id (:team-id file))
(create-file-media-object cfg params)))
(defn- big-enough-for-thumbnail?
"Checks if the provided image info is big enough for
@ -77,6 +72,9 @@
:code :unable-to-access-to-url
:cause e))))
;; TODO: we need to check the size before fetch resource, if not we
;; can start downloading very big object and cause OOM errors.
(defn- download-media
[{:keys [storage] :as cfg} url]
(let [result (fetch-url url)
@ -90,6 +88,8 @@
(-> (assoc storage :backend :tmp)
(sto/put-object {:content (sto/content data)
:content-type mtype
:reference :file-media-object
:touched-at (dt/now)
:expired-at (dt/in-future {:minutes 30})}))))
;; NOTE: we use the `on conflict do update` instead of `do nothing`
@ -102,13 +102,27 @@
on conflict (id) do update set created_at=file_media_object.created_at
returning *")
;; NOTE: the following function executes without a transaction, this
;; means that if something fails in the middle of this function, it
;; will probably leave leaked/unreferenced objects in the database and
;; probably in the storage layer. For handle possible object leakage,
;; we create all media objects marked as touched, this ensures that if
;; something fails, all leaked (already created storage objects) will
;; be eventually marked as deleted by the touched-gc task.
;;
;; The touched-gc task, performs periodic analisis of all touched
;; storage objects and check references of it. This is the reason why
;; `reference` metadata exists: it indicates the name of the table
;; witch holds the reference to storage object (it some kind of
;; inverse, soft referential integrity).
(defn create-file-media-object
[{:keys [conn storage] :as cfg} {:keys [id file-id is-local name content] :as params}]
[{:keys [storage pool] :as cfg} {:keys [id file-id is-local name content] :as params}]
(media/validate-media-type (:content-type content))
(let [storage (media/configure-assets-storage storage conn)
source-path (fs/path (:tempfile content))
(let [source-path (fs/path (:tempfile content))
source-mtype (:content-type content)
source-info (media/run {:cmd :info :input {:path source-path :mtype source-mtype}})
storage (media/configure-assets-storage storage)
thumb (when (and (not (svg-image? source-info))
(big-enough-for-thumbnail? source-info))
@ -119,16 +133,25 @@
image (if (= (:mtype source-info) "image/svg+xml")
(let [data (slurp source-path)]
(sto/put-object storage {:content (sto/content data)
:content-type (:mtype source-info)}))
(sto/put-object storage {:content (sto/content source-path)
:content-type (:mtype source-info)}))
(sto/put-object storage
{:content (sto/content data)
:content-type (:mtype source-info)
:reference :file-media-object
:touched-at (dt/now)}))
(sto/put-object storage
{:content (sto/content source-path)
:content-type (:mtype source-info)
:reference :file-media-object
:touched-at (dt/now)}))
thumb (when thumb
(sto/put-object storage {:content (sto/content (:data thumb) (:size thumb))
:content-type (:mtype thumb)}))]
(sto/put-object storage
{:content (sto/content (:data thumb) (:size thumb))
:content-type (:mtype thumb)
:reference :file-media-object
:touched-at (dt/now)}))]
(db/exec-one! conn [sql:create-file-media-object
(db/exec-one! pool [sql:create-file-media-object
(or id (uuid/next))
file-id is-local name
(:id image)
@ -145,19 +168,16 @@
(sv/defmethod ::create-file-media-object-from-url
[{:keys [pool storage] :as cfg} {:keys [profile-id file-id url name] :as params}]
(db/with-atomic [conn pool]
(let [file (select-file conn file-id)]
(teams/check-edition-permissions! conn profile-id (:team-id file))
(let [mobj (download-media cfg url)
content {:filename "tempfile"
:size (:size mobj)
:tempfile (sto/get-object-path storage mobj)
:content-type (:content-type (meta mobj))}
params' (merge params {:content content
:name (or name (:filename content))})]
(-> (assoc cfg :conn conn)
(create-file-media-object params'))))))
(let [file (select-file pool file-id)]
(teams/check-edition-permissions! pool profile-id (:team-id file))
(let [mobj (download-media cfg url)
content {:filename "tempfile"
:size (:size mobj)
:tempfile (sto/get-object-path storage mobj)
:content-type (:content-type (meta mobj))}]
(->> (merge params {:content content :name (or name (:filename content))})
(create-file-media-object cfg)))))
;; --- Clone File Media object (Upload and create from url)
@ -189,7 +209,6 @@
:height (:height mobj)
:mtype (:mtype mobj)})))
;; --- HELPERS
(def ^:private

View file

@ -18,11 +18,9 @@
[app.storage.impl :as impl]
[app.storage.s3 :as ss3]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[datoteka.core :as fs]
[integrant.core :as ig]
[promesa.exec :as px]))
[integrant.core :as ig]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Storage Module State
@ -40,7 +38,7 @@
:db ::sdb/backend))))
(defmethod ig/pre-init-spec ::storage [_]
(s/keys :req-un [::wrk/executor ::db/pool ::backends]))
(s/keys :req-un [::db/pool ::backends]))
(defmethod ig/prep-key ::storage
[_ {:keys [backends] :as cfg}]
@ -53,78 +51,74 @@
(assoc :backends (d/without-nils backends))))
(s/def ::storage
(s/keys :req-un [::backends ::wrk/executor ::db/pool]))
(s/keys :req-un [::backends ::db/pool]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Database Objects
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defrecord StorageObject [id size created-at expired-at backend])
(defrecord StorageObject [id size created-at expired-at touched-at backend])
(defn storage-object?
[v]
(instance? StorageObject v))
(def ^:private
sql:insert-storage-object
"insert into storage_object (id, size, backend, metadata)
values (?, ?, ?, ?::jsonb)
returning *")
(s/def ::storage-object storage-object?)
(s/def ::storage-content impl/content?)
(def ^:private
sql:insert-storage-object-with-expiration
"insert into storage_object (id, size, backend, metadata, deleted_at)
values (?, ?, ?, ?::jsonb, ?)
returning *")
(defn- insert-object
[conn id size backend mdata expiration]
(if expiration
(db/exec-one! conn [sql:insert-storage-object-with-expiration id size backend mdata expiration])
(db/exec-one! conn [sql:insert-storage-object id size backend mdata])))
(defn- clone-database-object
;; If we in this condition branch, this means we come from the
;; clone-object, so we just need to clone it with a new backend.
[{:keys [conn backend]} object]
(let [id (uuid/random)
mdata (meta object)
result (db/insert! conn :storage-object
{:id id
:size (:size object)
:backend (name backend)
:metadata (db/tjson mdata)
:deleted-at (:expired-at object)
:touched-at (:touched-at object)})]
(assoc object
:id (:id result)
:backend backend
:created-at (:created-at result)
:touched-at (:touched-at result))))
(defn- create-database-object
[{:keys [conn backend]} {:keys [content] :as object}]
(if (instance? StorageObject object)
;; If we in this condition branch, this means we come from the
;; clone-object, so we just need to clone it with a new backend.
(let [id (uuid/random)
mdata (meta object)
result (insert-object conn
id
(:size object)
(name backend)
(db/tjson mdata)
(:expired-at object))]
(assoc object
:id (:id result)
:backend backend
:created-at (:created-at result)))
(let [id (uuid/random)
mdata (dissoc object :content :expired-at)
result (insert-object conn
id
(count content)
(name backend)
(db/tjson mdata)
(:expired-at object))]
(StorageObject. (:id result)
(:size result)
(:created-at result)
(:deleted-at result)
backend
mdata
nil))))
(us/assert ::storage-content content)
(let [id (uuid/random)
mdata (dissoc object :content :expired-at :touched-at)
result (db/insert! conn :storage-object
{:id id
:size (count content)
:backend (name backend)
:metadata (db/tjson mdata)
:deleted-at (:expired-at object)
:touched-at (:touched-at object)})]
(StorageObject. (:id result)
(:size result)
(:created-at result)
(:deleted-at result)
(:touched-at result)
backend
mdata
nil)))
(def ^:private sql:retrieve-storage-object
"select * from storage_object where id = ? and (deleted_at is null or deleted_at > now())")
(defn row->storage-object [res]
(let [mdata (some-> (:metadata res) (db/decode-transit-pgobject))]
(let [mdata (or (some-> (:metadata res) (db/decode-transit-pgobject)) {})]
(StorageObject. (:id res)
(:size res)
(:created-at res)
(:deleted-at res)
(:touched-at res)
(keyword (:backend res))
mdata
nil)))
@ -142,10 +136,6 @@
(let [result (db/exec-one! conn [sql:delete-storage-object id])]
(pos? (:next.jdbc/update-count result))))
(defn- register-recheck
[{:keys [pool] :as storage} backend id]
(db/insert! pool :storage-pending {:id id :backend (name backend)}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -170,17 +160,13 @@
(defn put-object
"Creates a new object with the provided content."
[{:keys [pool conn backend executor] :as storage} {:keys [content] :as params}]
[{:keys [pool conn backend] :as storage} {:keys [content] :as params}]
(us/assert ::storage storage)
(us/assert impl/content? content)
(us/assert ::storage-content content)
(us/assert ::us/keyword backend)
(let [storage (assoc storage :conn (or conn pool))
object (create-database-object storage params)]
;; Schedule to execute in background; in an other transaction and
;; register the currently created storage object id for a later
;; recheck.
(px/run! executor #(register-recheck storage backend (:id object)))
;; Store the data finally on the underlying storage subsystem.
(-> (impl/resolve-backend storage backend)
(impl/put-object object content))
@ -190,10 +176,12 @@
(defn clone-object
"Creates a clone of the provided object using backend based efficient
method. Always clones objects to the configured default."
[{:keys [pool conn] :as storage} object]
[{:keys [pool conn backend] :as storage} object]
(us/assert ::storage storage)
(us/assert ::storage-object object)
(us/assert ::us/keyword backend)
(let [storage (assoc storage :conn (or conn pool))
object* (create-database-object storage object)]
object* (clone-database-object storage object)]
(if (= (:backend object) (:backend storage))
;; if the source and destination backends are the same, we
;; proceed to use the fast path with specific copy
@ -269,7 +257,7 @@
;; A task responsible to permanently delete already marked as deleted
;; storage files.
(declare sql:retrieve-deleted-objects)
(declare sql:retrieve-deleted-objects-chunk)
(s/def ::min-age ::dt/duration)
@ -278,44 +266,46 @@
(defmethod ig/init-key ::gc-deleted-task
[_ {:keys [pool storage min-age] :as cfg}]
(letfn [(group-by-backend [rows]
(let [conj (fnil conj [])]
[(reduce (fn [acc {:keys [id backend]}]
(update acc (keyword backend) conj id))
{}
rows)
(count rows)]))
(letfn [(retrieve-deleted-objects-chunk [conn cursor]
(let [min-age (db/interval min-age)
rows (db/exec! conn [sql:retrieve-deleted-objects-chunk min-age cursor])]
[(some-> rows peek :created-at)
(some->> (seq rows) (d/group-by' #(-> % :backend keyword) :id) seq)]))
(retrieve-deleted-objects [conn]
(let [min-age (db/interval min-age)
rows (db/exec! conn [sql:retrieve-deleted-objects min-age])]
(some-> (seq rows) (group-by-backend))))
(->> (d/iteration (fn [cursor]
(retrieve-deleted-objects-chunk conn cursor))
:initk (dt/now)
:vf second
:kf first)
(sequence cat)))
(delete-in-bulk [conn [backend ids]]
(delete-in-bulk [conn backend ids]
(let [backend (impl/resolve-backend storage backend)
backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend ids)))]
(fn [_]
(db/with-atomic [conn pool]
(loop [n 0]
(if-let [[groups total] (retrieve-deleted-objects conn)]
(loop [total 0
groups (retrieve-deleted-objects conn)]
(if-let [[backend ids] (first groups)]
(do
(run! (partial delete-in-bulk conn) groups)
(recur (+ n ^long total)))
(delete-in-bulk conn backend ids)
(recur (+ total (count ids))
(rest groups)))
(do
(l/info :task "gc-deleted"
:hint "permanently delete items"
:count n)
{:deleted n})))))))
(l/info :task "gc-deleted" :count total)
{:deleted total})))))))
(def sql:retrieve-deleted-objects
(def sql:retrieve-deleted-objects-chunk
"with items_part as (
select s.id
from storage_object as s
where s.deleted_at is not null
and s.deleted_at < (now() - ?::interval)
order by s.deleted_at
and s.created_at < ?
order by s.created_at desc
limit 100
)
delete from storage_object
@ -326,157 +316,102 @@
;; Garbage Collection: Analyze touched objects
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; This task is part of the garbage collection of storage objects and
;; is responsible on analyzing the touched objects and mark them for deletion
;; if corresponds.
;; This task is part of the garbage collection of storage objects and is responsible on analyzing the touched
;; objects and mark them for deletion if corresponds.
;;
;; When file_media_object is deleted, the depending storage_object are
;; marked as touched. This means that some files that depend on a
;; concrete storage_object are no longer exists and maybe this
;; storage_object is no longer necessary and can be eligible for
;; elimination. This task periodically analyzes touched objects and
;; mark them as freeze (means that has other references and the object
;; is still valid) or deleted (no more references to this object so is
;; ready to be deleted).
;; For example: when file_media_object is deleted, the depending storage_object are marked as touched. This
;; means that some files that depend on a concrete storage_object are no longer exists and maybe this
;; storage_object is no longer necessary and can be eligible for elimination. This task periodically analyzes
;; touched objects and mark them as freeze (means that has other references and the object is still valid) or
;; deleted (no more references to this object so is ready to be deleted).
(declare sql:retrieve-touched-objects)
(declare sql:retrieve-touched-objects-chunk)
(declare sql:retrieve-file-media-object-nrefs)
(declare sql:retrieve-team-font-variant-nrefs)
(defmethod ig/pre-init-spec ::gc-touched-task [_]
(s/keys :req-un [::db/pool]))
(defmethod ig/init-key ::gc-touched-task
[_ {:keys [pool] :as cfg}]
(letfn [(group-results [rows]
(let [conj (fnil conj [])]
(reduce (fn [acc {:keys [id nrefs]}]
(if (pos? nrefs)
(update acc :to-freeze conj id)
(update acc :to-delete conj id)))
{}
rows)))
(letfn [(has-team-font-variant-nrefs? [conn id]
(-> (db/exec-one! conn [sql:retrieve-team-font-variant-nrefs id id id id]) :nrefs pos?))
(retrieve-touched [conn]
(let [rows (db/exec! conn [sql:retrieve-touched-objects])]
(some-> (seq rows) (group-results))))
(mark-delete-in-bulk [conn ids]
(db/exec-one! conn ["update storage_object set deleted_at=now(), touched_at=null where id = ANY(?)"
(db/create-array conn "uuid" (into-array java.util.UUID ids))]))
(has-file-media-object-nrefs? [conn id]
(-> (db/exec-one! conn [sql:retrieve-file-media-object-nrefs id id]) :nrefs pos?))
(mark-freeze-in-bulk [conn ids]
(db/exec-one! conn ["update storage_object set touched_at=null where id = ANY(?)"
(db/create-array conn "uuid" (into-array java.util.UUID ids))]))]
(db/create-array conn "uuid" ids)]))
(mark-delete-in-bulk [conn ids]
(db/exec-one! conn ["update storage_object set deleted_at=now(), touched_at=null where id = ANY(?)"
(db/create-array conn "uuid" ids)]))
(retrieve-touched-chunk [conn cursor]
(let [rows (->> (db/exec! conn [sql:retrieve-touched-objects-chunk cursor])
(mapv #(d/update-when % :metadata db/decode-transit-pgobject)))]
(when (seq rows)
[(-> rows peek :created-at)
;; NOTE: we use the :file-media-object as default value for backward compatibility because when we
;; deploy it we can have old backend instances running in the same time as the new one and we can
;; still have storage-objects created without reference value. And we know that if it does not
;; have value, it means :file-media-object.
(d/group-by' #(or (-> % :metadata :reference) :file-media-object) :id rows)])))
(retrieve-touched [conn]
(->> (d/iteration (fn [cursor]
(retrieve-touched-chunk conn cursor))
:initk (dt/now)
:vf second
:kf first)
(sequence cat)))
(process-objects! [conn pred-fn ids]
(loop [to-freeze #{}
to-delete #{}
ids (seq ids)]
(if-let [id (first ids)]
(if (pred-fn conn id)
(recur (conj to-freeze id) to-delete (rest ids))
(recur to-freeze (conj to-delete id) (rest ids)))
(do
(some->> (seq to-freeze) (mark-freeze-in-bulk conn))
(some->> (seq to-delete) (mark-delete-in-bulk conn))
[(count to-freeze) (count to-delete)]))))
]
(fn [_]
(db/with-atomic [conn pool]
(loop [cntf 0
cntd 0]
(if-let [{:keys [to-delete to-freeze]} (retrieve-touched conn)]
(loop [to-freeze 0
to-delete 0
groups (retrieve-touched conn)]
(if-let [[reference ids] (first groups)]
(let [[f d] (case reference
:file-media-object (process-objects! conn has-file-media-object-nrefs? ids)
:team-font-variant (process-objects! conn has-team-font-variant-nrefs? ids)
(ex/raise :type :internal :code :unexpected-unknown-reference))]
(recur (+ to-freeze f)
(+ to-delete d)
(rest groups)))
(do
(when (seq to-delete) (mark-delete-in-bulk conn to-delete))
(when (seq to-freeze) (mark-freeze-in-bulk conn to-freeze))
(recur (+ cntf (count to-freeze))
(+ cntd (count to-delete))))
(do
(l/info :task "gc-touched"
:hint "mark freeze"
:count cntf)
(l/info :task "gc-touched"
:hint "mark for deletion"
:count cntd)
{:freeze cntf :delete cntd})))))))
(l/info :task "gc-touched" :to-freeze to-freeze :to-delete to-delete)
{:freeze to-freeze :delete to-delete})))))))
(def sql:retrieve-touched-objects
"select so.id,
((select count(*) from file_media_object where media_id = so.id) +
(select count(*) from file_media_object where thumbnail_id = so.id)) as nrefs
from storage_object as so
(def sql:retrieve-touched-objects-chunk
"select so.* from storage_object as so
where so.touched_at is not null
order by so.touched_at
limit 100;")
and so.created_at < ?
order by so.created_at desc
limit 500;")
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Recheck Stalled Task
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def sql:retrieve-file-media-object-nrefs
"select ((select count(*) from file_media_object where media_id = ?) +
(select count(*) from file_media_object where thumbnail_id = ?)) as nrefs")
;; Because the physical storage (filesystem, s3, ... except db) is not
;; transactional, in some situations we can found physical object
;; leakage. That situations happens when the transaction that writes
;; the file aborts, leaving the file written to the underlying storage
;; but the reference on the database is lost with the rollback.
;;
;; For this situations we need to write a "log" of inserted files that
;; are checked in some time in future. If physical file exists but the
;; database refence does not exists means that leaked file is found
;; and is immediately deleted. The responsibility of this task is
;; check that write log for possible leaked files.
(def recheck-min-age (dt/duration {:hours 1}))
(declare sql:retrieve-pending-to-recheck)
(declare sql:exists-storage-object)
(defmethod ig/pre-init-spec ::recheck-task [_]
(s/keys :req-un [::storage ::db/pool]))
(defmethod ig/init-key ::recheck-task
[_ {:keys [pool storage] :as cfg}]
(letfn [(group-results [rows]
(let [conj (fnil conj [])]
(reduce (fn [acc {:keys [id exist] :as row}]
(cond-> (update acc :all conj id)
(false? exist)
(update :to-delete conj (dissoc row :exist))))
{}
rows)))
(group-by-backend [rows]
(let [conj (fnil conj [])]
(reduce (fn [acc {:keys [id backend]}]
(update acc (keyword backend) conj id))
{}
rows)))
(retrieve-pending [conn]
(let [rows (db/exec! conn [sql:retrieve-pending-to-recheck (db/interval recheck-min-age)])]
(some-> (seq rows) (group-results))))
(delete-group [conn [backend ids]]
(let [backend (impl/resolve-backend storage backend)
backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend ids)))
(delete-all [conn ids]
(let [ids (db/create-array conn "uuid" (into-array java.util.UUID ids))]
(db/exec-one! conn ["delete from storage_pending where id = ANY(?)" ids])))]
(fn [_]
(db/with-atomic [conn pool]
(loop [n 0 d 0]
(if-let [{:keys [all to-delete]} (retrieve-pending conn)]
(let [groups (group-by-backend to-delete)]
(run! (partial delete-group conn) groups)
(delete-all conn all)
(recur (+ n (count all))
(+ d (count to-delete))))
(do
(l/info :task "recheck"
:hint "recheck items"
:processed n
:deleted d)
{:processed n :deleted d})))))))
(def sql:retrieve-pending-to-recheck
"select sp.id,
sp.backend,
sp.created_at,
(case when count(so.id) > 0 then true
else false
end) as exist
from storage_pending as sp
left join storage_object as so
on (so.id = sp.id)
where sp.created_at < now() - ?::interval
group by 1,2,3
order by sp.created_at asc
limit 100")
(def sql:retrieve-team-font-variant-nrefs
"select ((select count(*) from team_font_variant where woff1_file_id = ?) +
(select count(*) from team_font_variant where woff2_file_id = ?) +
(select count(*) from team_font_variant where otf_file_id = ?) +
(select count(*) from team_font_variant where ttf_file_id = ?)) as nrefs")

View file

@ -56,9 +56,10 @@
(s/def ::region #{:eu-central-1})
(s/def ::bucket ::us/string)
(s/def ::prefix ::us/string)
(s/def ::endpoint ::us/string)
(defmethod ig/pre-init-spec ::backend [_]
(s/keys :opt-un [::region ::bucket ::prefix]))
(s/keys :opt-un [::region ::bucket ::prefix ::endpoint]))
(defmethod ig/prep-key ::backend
[_ {:keys [prefix] :as cfg}]
@ -119,20 +120,31 @@
(defn- ^Region lookup-region
[region]
(case region
:eu-central-1 Region/EU_CENTRAL_1))
(Region/of (name region)))
(defn build-s3-client
[{:keys [region]}]
(.. (S3Client/builder)
(region (lookup-region region))
(build)))
[{:keys [region endpoint]}]
(if (string? endpoint)
(let [uri (java.net.URI. endpoint)]
(.. (S3Client/builder)
(endpointOverride uri)
(region (lookup-region region))
(build)))
(.. (S3Client/builder)
(region (lookup-region region))
(build))))
(defn build-s3-presigner
[{:keys [region]}]
(.. (S3Presigner/builder)
(region (lookup-region region))
(build)))
[{:keys [region endpoint]}]
(if (string? endpoint)
(let [uri (java.net.URI. endpoint)]
(.. (S3Presigner/builder)
(endpointOverride uri)
(region (lookup-region region))
(build)))
(.. (S3Presigner/builder)
(region (lookup-region region))
(build))))
(defn put-object
[{:keys [client bucket prefix]} {:keys [id] :as object} content]