From 267045e1132eb0b16c2a8daf9122ddb4fe0c800d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 6 Feb 2024 17:21:34 +0100 Subject: [PATCH] :sparkles: Improve migration scripts --- backend/resources/log4j2-experiments.xml | 6 - backend/src/app/features/components_v2.clj | 92 +++--- .../src/app/rpc/commands/files_snapshot.clj | 8 +- backend/src/app/srepl/components_v2.clj | 270 ++++++++++++++++-- 4 files changed, 300 insertions(+), 76 deletions(-) diff --git a/backend/resources/log4j2-experiments.xml b/backend/resources/log4j2-experiments.xml index 88542c277..3357aae31 100644 --- a/backend/resources/log4j2-experiments.xml +++ b/backend/resources/log4j2-experiments.xml @@ -48,12 +48,6 @@ - - - - - - diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index 71b0c538a..8b3cd1da1 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -77,10 +77,6 @@ internal functions without the need to explicitly pass it top down." nil) -(def ^:dynamic ^:private *team-id* - "A dynamic var that holds the current processing team-id." - nil) - (def ^:dynamic ^:private *file-stats* "An internal dynamic var for collect stats by file." nil) @@ -1194,12 +1190,11 @@ ;; The media processing adds the data to the ;; input map and returns it. (media/run {:cmd :info :input item})) + (catch Throwable _ - (let [team-id *team-id*] - (l/wrn :hint "unable to process embedded images on svg file" - :team-id (str team-id) - :file-id (str file-id) - :media-id (str media-id))) + (l/wrn :hint "unable to process embedded images on svg file" + :file-id (str file-id) + :media-id (str media-id)) nil))) (persist-image [acc {:keys [path size width height mtype href] :as item}] @@ -1332,24 +1327,20 @@ (catch Throwable cause (vreset! err true) (let [cause (pu/unwrap-exception cause) - edata (ex-data cause) - team-id *team-id*] + edata (ex-data cause)] (cond (instance? org.xml.sax.SAXParseException cause) (l/inf :hint "skip processing media object: invalid svg found" - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj))) (instance? org.graalvm.polyglot.PolyglotException cause) (l/inf :hint "skip processing media object: invalid svg found" - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj))) (= (:type edata) :not-found) (l/inf :hint "skip processing media object: underlying object does not exist" - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj))) @@ -1357,7 +1348,6 @@ (let [skip? *skip-on-graphic-error*] (l/wrn :hint "unable to process file media object" :skiped skip? - :team-id (str team-id) :file-id (str (:id fdata)) :id (str (:id mobj)) :cause cause) @@ -1524,7 +1514,9 @@ (defn migrate-file! [system file-id & {:keys [validate? skip-on-graphic-error? label]}] - (let [tpoint (dt/tpoint)] + (let [tpoint (dt/tpoint) + err (volatile! false)] + (binding [*file-stats* (atom {}) *skip-on-graphic-error* skip-on-graphic-error?] (try @@ -1533,40 +1525,50 @@ :validate validate? :skip-on-graphic-error skip-on-graphic-error?) - (let [system (update system ::sto/storage media/configure-assets-storage)] - (db/tx-run! system - (fn [system] - (try - (binding [*system* system] - (when (string? label) - (fsnap/take-file-snapshot! system {:file-id file-id - :label (str "migration/" label)})) - (let [file (get-file system file-id)] - (events/tap :progress - {:op :migrate-file - :name (:name file) - :id (:id file)}) + (db/tx-run! (update system ::sto/storage media/configure-assets-storage) + (fn [system] + (binding [*system* system] + (when (string? label) + (fsnap/take-file-snapshot! system {:file-id file-id + :label (str "migration/" label)})) + (let [file (get-file system file-id)] + (events/tap :progress + {:op :migrate-file + :name (:name file) + :id (:id file)}) - (process-file system file :validate? validate?))) + (process-file system file :validate? validate?))))) - (catch Throwable cause - (let [team-id *team-id*] - (l/wrn :hint "error on processing file" - :team-id (str team-id) - :file-id (str file-id)) - (throw cause))))))) + (catch Throwable cause + (vreset! err true) + (l/wrn :hint "error on processing file" + :file-id (str file-id) + :cause cause) + (throw cause)) (finally (let [elapsed (tpoint) components (get @*file-stats* :processed-components 0) graphics (get @*file-stats* :processed-graphics 0)] - (l/dbg :hint "migrate:file:end" - :file-id (str file-id) - :graphics graphics - :components components - :validate validate? - :elapsed (dt/format-duration elapsed)) + (if (cache/cache? *cache*) + (let [cache-stats (cache/stats *cache*)] + (l/dbg :hint "migrate:file:end" + :file-id (str file-id) + :graphics graphics + :components components + :validate validate? + :crt (mth/to-fixed (:hit-rate cache-stats) 2) + :crq (str (:req-count cache-stats)) + :error @err + :elapsed (dt/format-duration elapsed))) + (l/dbg :hint "migrate:file:end" + :file-id (str file-id) + :graphics graphics + :components components + :validate validate? + :error @err + :elapsed (dt/format-duration elapsed))) (some-> *stats* (swap! update :processed-files (fnil inc 0))) (some-> *team-stats* (swap! update :processed-files (fnil inc 0))))))))) @@ -1607,13 +1609,15 @@ (update-team-features! conn id features)))))] - (binding [*team-stats* (atom {}) - *team-id* team-id] + (binding [*team-stats* (atom {})] (try (db/tx-run! system migrate-team team-id) (catch Throwable cause (vreset! err true) + (l/wrn :hint "error on processing team" + :team-id (str team-id) + :cause cause) (throw cause)) (finally diff --git a/backend/src/app/rpc/commands/files_snapshot.clj b/backend/src/app/rpc/commands/files_snapshot.clj index 68b3a017c..3b90023fb 100644 --- a/backend/src/app/rpc/commands/files_snapshot.clj +++ b/backend/src/app/rpc/commands/files_snapshot.clj @@ -70,8 +70,8 @@ (some? (:data snapshot))) (l/debug :hint "snapshot found" - :snapshot-id (:id snapshot) - :file-id file-id) + :snapshot-id (str (:id snapshot)) + :file-id (str file-id)) (db/update! conn :file {:data (:data snapshot)} @@ -112,7 +112,9 @@ (when-let [file (db/get* conn :file {:id file-id})] (let [id (uuid/next) label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123)))] - (l/debug :hint "persisting file snapshot" :file-id file-id :label label) + (l/debug :hint "persisting file snapshot" + :file-id (str file-id) + :label label) (db/insert! conn :file-change {:id id :revn (:revn file) diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index 4adf55293..9bada320a 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -32,12 +32,20 @@ (defn- report-progress-files [tpoint] (fn [_ _ oldv newv] - (when (not= (:processed-files oldv) - (:processed-files newv)) - (let [elapsed (tpoint)] + (when (or (not= (:processed-files oldv) + (:processed-files newv)) + (not= (:errors oldv) + (:errors newv))) + (let [completed (:processed-files newv 0) + errors (:errors newv 0) + elapsed (dt/format-duration (tpoint))] + (events/tap :progress-report + {:elapsed elapsed + :completed completed + :errors errors}) (l/dbg :hint "progress" - :completed (:processed-files newv) - :elapsed (dt/format-duration elapsed)))))) + :completed completed + :elapsed elapsed))))) (defn- report-progress-teams [tpoint] @@ -101,13 +109,47 @@ (def ^:private sql:get-teams-by-report "WITH teams AS ( SELECT t.id t.features, mr.name - FROM migration_report AS mr + FROM migration_team_report AS mr JOIN team AS t ON (t.id = mr.team_id) WHERE t.deleted_at IS NULL AND mr.error IS NOT NULL ORDER BY mr.created_at ) SELECT id, features FROM teams %(pred)s") +(def ^:private sql:get-files-by-created-at + "SELECT id, features + FROM file + WHERE deleted_at IS NULL + ORDER BY created_at DESC") + +(def ^:private sql:get-files-by-modified-at + "SELECT id, features + FROM file + WHERE deleted_at IS NULL + ORDER BY modified_at DESC") + +(def ^:private sql:get-files-by-graphics + "WITH files AS ( + SELECT f.id, f.features, + (SELECT count(*) FROM file_media_object AS fmo + WHERE fmo.mtype = 'image/svg+xml' + AND fmo.is_local = false + AND fmo.file_id = f.id) AS graphics + FROM file AS f + WHERE f.deleted_at IS NULL + ORDER BY 3 ASC + ) SELECT * FROM files %(pred)s") + +(def ^:private sql:get-files-by-report + "WITH files AS ( + SELECT t.id t.features, mr.name + FROM migration_file_report AS mr + JOIN file AS t ON (t.id = mr.file_id) + WHERE t.deleted_at IS NULL + AND mr.error IS NOT NULL + ORDER BY mr.created_at + ) SELECT id, features FROM files %(pred)s") + (defn- read-pred [entries] (let [entries (if (and (vector? entries) @@ -140,7 +182,6 @@ :activity sql:get-teams-by-activity :graphics sql:get-teams-by-graphics :report sql:get-teams-by-report) - sql (if pred (let [[pred-sql & pred-params] (read-pred pred)] (apply vector @@ -154,34 +195,78 @@ (contains? features "components/v2"))) (map :id)))) -(def ^:private sql:report-table - "CREATE UNLOGGED TABLE IF NOT EXISTS migration_report ( +(defn- get-files + [conn query pred] + (let [query (d/nilv query :created-at) + sql (case query + :created-at sql:get-files-by-created-at + :modified-at sql:get-files-by-modified-at + :graphics sql:get-files-by-graphics + :report sql:get-files-by-report) + sql (if pred + (let [[pred-sql & pred-params] (read-pred pred)] + (apply vector + (str/format sql {:pred pred-sql}) + pred-params)) + [(str/format sql {:pred ""})])] + + (->> (db/cursor conn sql {:chunk-size 500}) + (map feat/decode-row) + (remove (fn [{:keys [features]}] + (contains? features "components/v2"))) + (map :id)))) + +(def ^:private sql:team-report-table + "CREATE UNLOGGED TABLE IF NOT EXISTS migration_team_report ( id bigserial NOT NULL, label text NOT NULL, team_id UUID NOT NULL, error text NULL, created_at timestamptz NOT NULL DEFAULT now(), elapsed bigint NOT NULL, - PRIMARY KEY (label, created_at, id) - )") + PRIMARY KEY (label, created_at, id))") -(defn- create-report-table! +(def ^:private sql:file-report-table + "CREATE UNLOGGED TABLE IF NOT EXISTS migration_file_report ( + id bigserial NOT NULL, + label text NOT NULL, + file_id UUID NOT NULL, + error text NULL, + created_at timestamptz NOT NULL DEFAULT now(), + elapsed bigint NOT NULL, + PRIMARY KEY (label, created_at, id))") + +(defn- create-report-tables! [system] - (db/exec-one! system [sql:report-table])) + (db/exec-one! system [sql:team-report-table]) + (db/exec-one! system [sql:file-report-table])) -(defn- clean-reports! +(defn- clean-team-reports! [system label] - (db/delete! system :migration-report {:label label})) + (db/delete! system :migration-team-report {:label label})) -(defn- report! +(defn- team-report! [system team-id label elapsed error] - (db/insert! system :migration-report + (db/insert! system :migration-team-report {:label label :team-id team-id :elapsed (inst-ms elapsed) :error error} {::db/return-keys false})) +(defn- clean-file-reports! + [system label] + (db/delete! system :migration-file-report {:label label})) + +(defn- file-report! + [system file-id label elapsed error] + (db/insert! system :migration-file-report + {:label label + :file-id file-id + :elapsed (inst-ms elapsed) + :error error} + {::db/return-keys false})) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; PUBLIC API @@ -318,12 +403,11 @@ :skip-on-graphic-error? skip-on-graphic-error?))) (when (string? label) - (report! main/system team-id label (tpoint) nil)) + (team-report! main/system team-id label (tpoint) nil)) (catch Throwable cause (l/wrn :hint "unexpected error on processing team (skiping)" - :team-id (str team-id) - :cause cause) + :team-id (str team-id)) (events/tap :error (ex-info "unexpected error on processing team (skiping)" @@ -333,7 +417,7 @@ (swap! stats update :errors (fnil inc 0)) (when (string? label) - (report! main/system team-id label (tpoint) (ex-message cause)))) + (team-report! main/system team-id label (tpoint) (ex-message cause)))) (finally (ps/release! sjobs))))) @@ -365,8 +449,8 @@ svgo/*semaphore* sprocs] (try (when (string? label) - (create-report-table! main/system) - (clean-reports! main/system label)) + (create-report-tables! main/system) + (clean-team-reports! main/system label)) (db/tx-run! main/system (fn [{:keys [::db/conn] :as system}] @@ -399,6 +483,146 @@ :rollback rollback? :elapsed elapsed))))))) + +(defn migrate-files! + "A REPL helper for migrate all files. + + This function starts multiple concurrent file migration processes + until thw maximum number of jobs is reached which by default has the + value of `1`. This is controled with the `:max-jobs` option. + + If you want to run this on multiple machines you will need to specify + the total number of partitions and the current partition. + + In order to get the report table populated, you will need to provide + a correct `:label`. That label is also used for persist a file + snaphot before continue with the migration." + [& {:keys [max-jobs max-items max-time rollback? validate? query + pred max-procs cache skip-on-graphic-error? + label partitions current-partition] + :or {validate? false + rollback? true + max-jobs 1 + current-partition 1 + skip-on-graphic-error? true + max-items Long/MAX_VALUE}}] + + (when (int? partitions) + (when-not (int? current-partition) + (throw (IllegalArgumentException. "missing `current-partition` parameter"))) + (when-not (<= 0 current-partition partitions) + (throw (IllegalArgumentException. "invalid value on `current-partition` parameter")))) + + (let [stats (atom {}) + tpoint (dt/tpoint) + mtime (some-> max-time dt/duration) + + factory (px/thread-factory :virtual false :prefix "penpot/migration/") + executor (px/cached-executor :factory factory) + + max-procs (or max-procs max-jobs) + sjobs (ps/create :permits max-jobs) + sprocs (ps/create :permits max-procs) + + cache (if (int? cache) + (cache/create :executor (::wrk/executor main/system) + :max-items cache) + nil) + + migrate-file + (fn [file-id] + (let [tpoint (dt/tpoint)] + (try + (db/tx-run! (assoc main/system ::db/rollback rollback?) + (fn [system] + (db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) + (feat/migrate-file! system file-id + :label label + :validate? validate? + :skip-on-graphic-error? skip-on-graphic-error?))) + + (when (string? label) + (file-report! main/system file-id label (tpoint) nil)) + + (catch Throwable cause + (l/wrn :hint "unexpected error on processing file (skiping)" + :file-id (str file-id)) + + (events/tap :error + (ex-info "unexpected error on processing file (skiping)" + {:file-id file-id} + cause)) + + (swap! stats update :errors (fnil inc 0)) + + (when (string? label) + (file-report! main/system file-id label (tpoint) (ex-message cause)))) + + (finally + (ps/release! sjobs))))) + + process-file + (fn [file-id] + (ps/acquire! sjobs) + (let [ts (tpoint)] + (if (and mtime (neg? (compare mtime ts))) + (do + (l/inf :hint "max time constraint reached" + :file-id (str file-id) + :elapsed (dt/format-duration ts)) + (ps/release! sjobs) + (reduced nil)) + + (px/run! executor (partial migrate-file file-id)))))] + + (l/dbg :hint "migrate:start" + :label label + :rollback rollback? + :max-jobs max-jobs + :max-items max-items) + + (add-watch stats :progress-report (report-progress-files tpoint)) + + (binding [feat/*stats* stats + feat/*cache* cache + svgo/*semaphore* sprocs] + (try + (when (string? label) + (create-report-tables! main/system) + (clean-file-reports! main/system label)) + + (db/tx-run! main/system + (fn [{:keys [::db/conn] :as system}] + (db/exec! conn ["SET statement_timeout = 0"]) + (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) + + (run! process-file + (->> (get-files conn query pred) + (filter (fn [file-id] + (if (int? partitions) + (= current-partition (-> (uuid/hash-int file-id) + (mod partitions) + (inc))) + true))) + (take max-items))) + + ;; Close and await tasks + (pu/close! executor))) + + (-> (deref stats) + (assoc :elapsed (dt/format-duration (tpoint)))) + + (catch Throwable cause + (l/dbg :hint "migrate:error" :cause cause) + (events/tap :error cause)) + + (finally + (let [elapsed (dt/format-duration (tpoint))] + (l/dbg :hint "migrate:end" + :rollback rollback? + :elapsed elapsed))))))) + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; FILE PROCESS HELPERS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;