♻️ Refactor file data migrations subsystem (#5692)

* ♻️ Refactor file data migrations subsystem

* 📎 Add backend scripts/run helper script
This commit is contained in:
Andrey Antukh 2025-01-31 13:37:41 +01:00 committed by GitHub
parent 96e99f6a78
commit f871f88f30
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 465 additions and 476 deletions

View file

@ -22,6 +22,7 @@
[app.db :as db]
[app.db.sql :as-alias sql]
[app.features.fdata :as feat.fdata]
[app.features.file-migrations :as feat.fmigr]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc :as-alias rpc]
@ -243,7 +244,8 @@
(when (contains? (:features file) "fdata/pointer-map")
(feat.fdata/persist-pointers! cfg id))
file)))
(feat.fmigr/upsert-migrations! conn file)
(feat.fmigr/resolve-applied-migrations cfg file))))
(defn get-file
[{:keys [::db/conn ::wrk/executor] :as cfg} id
@ -264,6 +266,7 @@
{::db/check-deleted (not include-deleted?)
::db/remove-deleted (not include-deleted?)
::sql/for-update lock-for-update?})
(feat.fmigr/resolve-applied-migrations cfg)
(feat.fdata/resolve-file-data cfg))
;; NOTE: we perform the file decoding in a separate thread

View file

@ -6,13 +6,13 @@
(ns app.rpc.commands.files-create
(:require
[app.binfile.common :as bfc]
[app.common.data.macros :as dm]
[app.common.features :as cfeat]
[app.common.schema :as sm]
[app.common.types.file :as ctf]
[app.config :as cf]
[app.db :as db]
[app.features.fdata :as feat.fdata]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc :as-alias rpc]
@ -21,7 +21,6 @@
[app.rpc.doc :as-alias doc]
[app.rpc.permissions :as perms]
[app.rpc.quotes :as quotes]
[app.util.blob :as blob]
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[app.util.time :as dt]
@ -48,34 +47,19 @@
(binding [pmap/*tracked* (pmap/create-tracked)
cfeat/*current* features]
(let [file (ctf/make-file {:id id
:project-id project-id
:name name
:revn revn
:is-shared is-shared
:features features
:ignore-sync-until ignore-sync-until
:modified-at modified-at
:deleted-at deleted-at
:create-page create-page
:page-id page-id})
file (if (contains? features "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? features "fdata/pointer-map")
(feat.fdata/enable-pointer-map file)
file)]
(db/insert! conn :file
(-> file
(update :data blob/encode)
(update :features db/encode-pgarray conn "text"))
{::db/return-keys false})
(when (contains? features "fdata/pointer-map")
(feat.fdata/persist-pointers! cfg (:id file)))
(let [file (ctf/make-file {:id id
:project-id project-id
:name name
:revn revn
:is-shared is-shared
:features features
:ignore-sync-until ignore-sync-until
:modified-at modified-at
:deleted-at deleted-at
:create-page create-page
:page-id page-id})
file (-> (bfc/insert-file! cfg file)
(bfc/decode-row))]
(->> (assoc params :file-id (:id file) :role :owner)
(create-file-role! conn))

View file

@ -19,6 +19,7 @@
[app.config :as cf]
[app.db :as db]
[app.features.fdata :as feat.fdata]
[app.features.file-migrations :as feat.fmigr]
[app.http.errors :as errors]
[app.loggers.audit :as audit]
[app.loggers.webhooks :as webhooks]
@ -204,36 +205,27 @@
{:keys [profile-id file features changes session-id skip-validate] :as params}]
(let [;; Retrieve the file data
file (feat.fdata/resolve-file-data cfg file)
file (feat.fmigr/resolve-applied-migrations cfg file)
file (feat.fdata/resolve-file-data cfg file)
file (assoc file :features
(-> features
(set/difference cfeat/frontend-only-features)
(set/union (:features file))))]
file (assoc file :features
(-> features
(set/difference cfeat/frontend-only-features)
(set/union (:features file))))
;; We create a new lexycal scope for clearly delimit the result of
;; executing this update file operation and all its side effects
(let [file (px/invoke! executor
(fn []
;; Process the file data on separated thread for avoid to do
;; the CPU intensive operation on vthread.
(binding [cfeat/*current* features
cfeat/*previous* (:features file)]
(update-file-data! cfg file
process-changes-and-validate
changes skip-validate))))]
;; Process the file data on separated thread for avoid to do
;; the CPU intensive operation on vthread.
file (px/invoke! executor
(fn []
(binding [cfeat/*current* features
cfeat/*previous* (:features file)]
(update-file-data! cfg file
process-changes-and-validate
changes skip-validate))))]
(when (feat.fdata/offloaded? file)
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
(some->> (:data-ref-id file) (sto/touch-object! storage))))
(persist-file! cfg file)
(let [params (assoc params :file file)
response {:revn (:revn file)
:lagged (get-lagged-changes conn params)}
features (db/create-array conn "text" (:features file))
deleted-at (if (::snapshot-data file)
(dt/plus timestamp (cf/get-deletion-delay))
(dt/plus timestamp (dt/duration {:hours 1})))]
(feat.fmigr/upsert-migrations! conn file)
(persist-file! cfg file)
;; Insert change (xlog) with deleted_at in a future data for
;; make them automatically eleggible for GC once they expires
@ -243,19 +235,27 @@
:profile-id profile-id
:created-at timestamp
:updated-at timestamp
:deleted-at deleted-at
:deleted-at (if (::snapshot-data file)
(dt/plus timestamp (cf/get-deletion-delay))
(dt/plus timestamp (dt/duration {:hours 1})))
:file-id (:id file)
:revn (:revn file)
:version (:version file)
:features features
:features (:features file)
:label (::snapshot-label file)
:data (::snapshot-data file)
:changes (blob/encode changes)}
{::db/return-keys false})
;; Send asynchronous notifications
(send-notifications! cfg params)
(send-notifications! cfg params file))
(when (feat.fdata/offloaded? file)
(let [storage (sto/resolve cfg ::db/reuse-conn true)]
(some->> (:data-ref-id file) (sto/touch-object! storage))))
(let [response {:revn (:revn file)
:lagged (get-lagged-changes conn params)}]
(vary-meta response assoc ::audit/replace-props
{:id (:id file)
:name (:name file)
@ -265,9 +265,10 @@
(defn update-file!
"A public api that allows apply a transformation to a file with all context setup."
[cfg file-id update-fn & args]
[{:keys [::db/conn] :as cfg} file-id update-fn & args]
(let [file (get-file cfg file-id)
file (apply update-file-data! cfg file update-fn args)]
(feat.fmigr/upsert-migrations! conn file)
(persist-file! cfg file)))
(def ^:private sql:get-file
@ -295,8 +296,7 @@
It also updates the project modified-at attr."
[{:keys [::db/conn ::timestamp]} file]
(let [features (db/create-array conn "text" (:features file))
;; The timestamp can be nil because this function is also
(let [;; The timestamp can be nil because this function is also
;; intended to be used outside of this module
modified-at (or timestamp (dt/now))]
@ -309,7 +309,7 @@
{:revn (:revn file)
:data (:data file)
:version (:version file)
:features features
:features (:features file)
:data-backend nil
:data-ref-id nil
:modified-at modified-at
@ -368,38 +368,16 @@
(-> file
(assoc ::snapshot-data snapshot)
(assoc ::snapshot-label label)))
file)
file)]
file (cond-> file
(contains? cfeat/*current* "fdata/objects-map")
(feat.fdata/enable-objects-map)
(contains? cfeat/*current* "fdata/pointer-map")
(feat.fdata/enable-pointer-map)
:always
(update :data blob/encode))]
(feat.fdata/persist-pointers! cfg id)
file)))
(bfc/encode-file cfg file))))
(defn- get-file-libraries
"A helper for preload file libraries, mainly used for perform file
semantical and structural validation"
[{:keys [::db/conn] :as cfg} file]
(->> (files/get-file-libraries conn (:id file))
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)
pmap/*tracked* nil]
;; We do not resolve the objects maps here
;; because there is a lower probability that all
;; shapes needed to be loded into memory, so we
;; leeave it on lazy status
(-> (files/get-file cfg id :migrate? false)
(update :data feat.fdata/process-pointers deref) ; ensure all pointers resolved
(update :data feat.fdata/process-objects (partial into {}))
(fmg/migrate-file))))))
(into [file] (map #(bfc/get-file cfg (:id %))))
(d/index-by :id)))
(defn- soft-validate-file-schema!
@ -494,7 +472,7 @@
(vec)))
(defn- send-notifications!
[cfg {:keys [file team changes session-id] :as params}]
[cfg {:keys [team changes session-id] :as params} file]
(let [lchanges (filter library-change? changes)
msgbus (::mbus/msgbus cfg)]

View file

@ -56,8 +56,8 @@
(vswap! bfc/*state* update :index bfc/update-index fmeds :id)
;; Process and persist file
(let [file (->> (bfc/process-file file)
(bfc/save-file! cfg))]
(let [file (bfc/process-file file)]
(bfc/insert-file! cfg file ::db/return-keys false)
;; The file profile creation is optional, so when no profile is
;; present (when this function is called from profile less
@ -86,7 +86,7 @@
fmeds)]
(db/insert! conn :file-media-object params ::db/return-keys false))
(bfc/decode-file cfg file))))
file)))
(def ^:private
schema:duplicate-file