Improve migration script performance and api usability

This commit is contained in:
Andrey Antukh 2024-01-03 15:16:14 +01:00
parent 471fd78174
commit 41287d8fc5
10 changed files with 559 additions and 379 deletions

View file

@ -39,27 +39,54 @@
[app.rpc.commands.media :as cmd.media]
[app.storage :as sto]
[app.storage.tmp :as tmp]
[app.svgo :as svgo]
[app.util.blob :as blob]
[app.util.pointer-map :as pmap]
[app.util.time :as dt]
[buddy.core.codecs :as bc]
[cuerdas.core :as str]
[datoteka.io :as io]
[promesa.exec :as px]
[promesa.exec.semaphore :as ps]
[promesa.util :as pu]))
[promesa.core :as p]))
(def ^:dynamic *system* nil)
(def ^:dynamic *stats* nil)
(def ^:dynamic *file-stats* nil)
(def ^:dynamic *team-stats* nil)
(def ^:dynamic *semaphore* nil)
(def ^:dynamic *skip-on-error* true)
(def ^:dynamic *stats*
"A dynamic var for setting up state for collect stats globally."
nil)
(def ^:dynamic *skip-on-error*
"A dynamic var for setting up the default error behavior."
true)
(def ^:dynamic ^:private *system*
"An internal var for making the current `system` available to all
internal functions without the need to explicitly pass it top down."
nil)
(def ^:dynamic ^:private *max-procs*
"A dynamic variable that can optionally indicates the maxumum number
of concurrent graphics migration processes."
nil)
(def ^:dynamic ^:private *file-stats*
"An internal dynamic var for collect stats by file."
nil)
(def ^:dynamic ^:private *team-stats*
"An internal dynamic var for collect stats by team."
nil)
(def grid-gap 50)
(def frame-gap 200)
(def max-group-size 50)
(defn decode-row
[{:keys [features data] :as row}]
(cond-> row
(some? features)
(assoc :features (db/decode-pgarray features #{}))
(some? data)
(assoc :data (blob/decode data))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE PREPARATION BEFORE MIGRATION
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -220,19 +247,17 @@
(fn [file-data]
;; Transform component and copy heads to frames, and set the
;; frame-id of its childrens
(letfn [(fix-container
[container]
(letfn [(fix-container [container]
(update container :objects update-vals fix-shape))
(fix-shape
[shape]
(fix-shape [shape]
(if (or (nil? (:parent-id shape)) (ctk/instance-head? shape))
(assoc shape
:type :frame ; Old groups must be converted
:fills (or (:fills shape) []) ; to frames and conform to spec
:hide-in-viewer (or (:hide-in-viewer shape) true)
:rx (or (:rx shape) 0)
:ry (or (:ry shape) 0))
:type :frame ; Old groups must be converted
:fills (or (:fills shape) []) ; to frames and conform to spec
:hide-in-viewer (or (:hide-in-viewer shape) true)
:rx (or (:rx shape) 0)
:ry (or (:ry shape) 0))
shape))]
(-> file-data
(update :pages-index update-vals fix-container)
@ -310,10 +335,10 @@
(defn- get-asset-groups
[assets generic-name]
(let [; Group by first element of the path.
(let [;; Group by first element of the path.
groups (d/group-by #(first (cfh/split-path (:path %))) assets)
; Split large groups in chunks of max-group-size elements
;; Split large groups in chunks of max-group-size elements
groups (loop [groups (seq groups)
result {}]
(if (empty? groups)
@ -334,15 +359,14 @@
result
splits)))))))
; Sort assets in each group by path
;; Sort assets in each group by path
groups (update-vals groups (fn [assets]
(sort-by (fn [{:keys [path name]}]
(str/lower (cfh/merge-path-item path name)))
assets)))
assets)))]
; Sort groups by name
groups (into (sorted-map) groups)]
groups))
;; Sort groups by name
(into (sorted-map) groups)))
(defn- create-frame
[name position width height]
@ -612,14 +636,11 @@
(defn- create-shapes-for-svg
[{:keys [id] :as mobj} file-id objects frame-id position]
(let [svg-text (get-svg-content id)
optimizer (::csvg/optimizer *system*)
svg-text (csvg/optimize optimizer svg-text)
svg-data (-> (csvg/parse svg-text)
(assoc :name (:name mobj))
(collect-and-persist-images file-id))]
(let [svg-text (get-svg-content id)
svg-text (svgo/optimize *system* svg-text)
svg-data (-> (csvg/parse svg-text)
(assoc :name (:name mobj))
(collect-and-persist-images file-id))]
(sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false)))
@ -678,9 +699,7 @@
(defn- create-media-grid
[fdata page-id frame-id grid media-group]
(let [factory (px/thread-factory :virtual true)
executor (px/fixed-executor :parallelism 10 :factory factory)
process (fn [mobj position]
(let [process (fn [mobj position]
(let [position (gpt/add position (gpt/point grid-gap grid-gap))
tp1 (dt/tpoint)]
(try
@ -690,7 +709,6 @@
:file-id (str (:id fdata))
:id (str (:id mobj))
:cause cause)
(if-not *skip-on-error*
(throw cause)
nil))
@ -699,21 +717,24 @@
:file-id (str (:id fdata))
:media-id (str (:id mobj))
:elapsed (dt/format-duration (tp1)))))))]
(try
(->> (d/zip media-group grid)
(map (fn [[mobj position]]
(sse/tap {:type :migration-progress
:section :graphics
:name (:name mobj)})
(px/submit! executor (partial process mobj position))))
(reduce (fn [fdata promise]
(if-let [changes (deref promise)]
(-> (assoc-in fdata [:options :components-v2] true)
(cp/process-changes changes false))
fdata))
fdata))
(finally
(pu/close! executor)))))
(->> (d/zip media-group grid)
(partition-all (or *max-procs* 1))
(mapcat (fn [partition]
(->> partition
(map (fn [[mobj position]]
(sse/tap {:type :migration-progress
:section :graphics
:name (:name mobj)})
(p/vthread (process mobj position))))
(doall)
(map deref)
(doall))))
(filter some?)
(reduce (fn [fdata changes]
(-> (assoc-in fdata [:options :components-v2] true)
(cp/process-changes changes false)))
fdata))))
(defn- migrate-graphics
[fdata]
@ -759,6 +780,11 @@
(create-media-grid fdata page-id (:id frame) grid assets)
(gpt/add position (gpt/point 0 (+ height (* 2 grid-gap) frame-gap))))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRIVATE HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- migrate-fdata
[fdata libs]
(let [migrated? (dm/get-in fdata [:options :components-v2])]
@ -771,11 +797,22 @@
(defn- get-file
[system id]
(binding [pmap/*load-fn* (partial fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? false)
(-> (db/get system :file {:id id}
{::db/remove-deleted false
::db/check-deleted false})
(decode-row)
(update :data assoc :id id)
(update :data fdata/process-pointers deref)
(fmg/migrate-file))))
(defn- get-team
[system team-id]
(-> (db/get system :team {:id team-id}
{::db/remove-deleted false
::db/check-deleted false})
(decode-row)))
(defn- validate-file!
[file libs throw-on-validate?]
(try
@ -791,7 +828,8 @@
(let [file (get-file system id)
libs (->> (files/get-file-libraries conn id)
(into [file] (comp (map :id) (map (partial get-file system))))
(into [file] (comp (map :id)
(map (partial get-file system))))
(d/index-by :id))
file (-> file
@ -820,13 +858,35 @@
(dissoc file :data)))
(def ^:private sql:get-and-lock-team-files
"SELECT f.id
FROM file AS f
JOIN project AS p ON (p.id = f.project_id)
WHERE p.team_id = ?
FOR UPDATE")
(defn- get-and-lock-files
[conn team-id]
(->> (db/cursor conn [sql:get-and-lock-team-files team-id])
(map :id)))
(defn- update-team-features!
[conn team-id features]
(let [features (db/create-array conn "text" features)]
(db/update! conn :team
{:features features}
{:id team-id})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn migrate-file!
[system file-id & {:keys [validate? throw-on-validate?]}]
(let [tpoint (dt/tpoint)
file-id (if (string? file-id)
(parse-uuid file-id)
file-id)]
(binding [*file-stats* (atom {})]
[system file-id & {:keys [validate? throw-on-validate? max-procs]}]
(let [tpoint (dt/tpoint)]
(binding [*file-stats* (atom {})
*max-procs* max-procs]
(try
(l/dbg :hint "migrate:file:start" :file-id (str file-id))
@ -838,7 +898,6 @@
(process-file system file-id
:validate? validate?
:throw-on-validate? throw-on-validate?)))))
(finally
(let [elapsed (tpoint)
components (get @*file-stats* :processed/components 0)
@ -854,75 +913,51 @@
(some-> *team-stats* (swap! update :processed/files (fnil inc 0)))))))))
(defn migrate-team!
[system team-id & {:keys [validate? throw-on-validate?]}]
(let [tpoint (dt/tpoint)
team-id (if (string? team-id)
(parse-uuid team-id)
team-id)]
(l/dbg :hint "migrate:team:start" :team-id (dm/str team-id))
[system team-id & {:keys [validate? throw-on-validate? max-procs]}]
(l/dbg :hint "migrate:team:start"
:team-id (dm/str team-id))
(let [tpoint (dt/tpoint)
migrate-file
(fn [system file-id]
(migrate-file! system file-id
:max-procs max-procs
:validate? validate?
:throw-on-validate? throw-on-validate?))
migrate-team
(fn [{:keys [::db/conn] :as system} {:keys [id features] :as team}]
(let [features (-> features
(disj "ephimeral/v2-migration")
(conj "components/v2")
(conj "layout/grid")
(conj "styles/v2"))]
(run! (partial migrate-file system)
(get-and-lock-files conn id))
(update-team-features! conn id features)))]
(binding [*team-stats* (atom {})]
(try
;; We execute this out of transaction because we want this
;; change to be visible to all other sessions before starting
;; the migration
(let [sql (str "UPDATE team SET features = "
" array_append(features, 'ephimeral/v2-migration') "
" WHERE id = ?")]
(db/exec-one! system [sql team-id]))
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
;; Lock the team
(db/exec-one! conn ["SET idle_in_transaction_session_timeout = 0"])
(let [{:keys [features] :as team} (-> (db/get conn :team {:id team-id})
(update :features db/decode-pgarray #{}))]
(if (contains? features "components/v2")
(l/dbg :hint "team already migrated")
(let [sql (str/concat
"SELECT f.id FROM file AS f "
" JOIN project AS p ON (p.id = f.project_id) "
"WHERE p.team_id = ? AND f.deleted_at IS NULL AND p.deleted_at IS NULL "
"FOR UPDATE")]
(doseq [file-id (->> (db/exec! conn [sql team-id])
(map :id))]
(migrate-file! system file-id
:validate? validate?
:throw-on-validate? throw-on-validate?))
(let [features (-> features
(disj "ephimeral/v2-migration")
(conj "components/v2")
(conj "layout/grid")
(conj "styles/v2"))]
(db/update! conn :team
{:features (db/create-array conn "text" features)}
{:id team-id})
nil))))))
(db/tx-run! system (fn [system]
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"])
(let [team (get-team system team-id)]
(if (contains? (:features team) "components/v2")
(l/inf :hint "team already migrated")
(migrate-team system team)))))
(finally
(some-> *semaphore* ps/release!)
(let [elapsed (tpoint)]
(let [elapsed (tpoint)
components (get @*team-stats* :processed/components 0)
graphics (get @*team-stats* :processed/graphics 0)
files (get @*team-stats* :processed/files 0)]
(some-> *stats* (swap! update :processed/teams (fnil inc 0)))
;; We execute this out of transaction because we want this
;; change to be visible to all other sessions before starting
;; the migration
(let [sql (str "UPDATE team SET features = "
" array_remove(features, 'ephimeral/v2-migration') "
" WHERE id = ?")]
(db/exec-one! system [sql team-id]))
(let [components (get @*team-stats* :processed/components 0)
graphics (get @*team-stats* :processed/graphics 0)
files (get @*team-stats* :processed/files 0)]
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files files
:components components
:graphics graphics
:elapsed (dt/format-duration elapsed)))))))))
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:files files
:components components
:graphics graphics
:elapsed (dt/format-duration elapsed))))))))