diff --git a/backend/resources/log4j2-experiments.xml b/backend/resources/log4j2-experiments.xml
index 88542c277..3357aae31 100644
--- a/backend/resources/log4j2-experiments.xml
+++ b/backend/resources/log4j2-experiments.xml
@@ -48,12 +48,6 @@
-
-
-
-
-
-
diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj
index 71b0c538a..8b3cd1da1 100644
--- a/backend/src/app/features/components_v2.clj
+++ b/backend/src/app/features/components_v2.clj
@@ -77,10 +77,6 @@
internal functions without the need to explicitly pass it top down."
nil)
-(def ^:dynamic ^:private *team-id*
- "A dynamic var that holds the current processing team-id."
- nil)
-
(def ^:dynamic ^:private *file-stats*
"An internal dynamic var for collect stats by file."
nil)
@@ -1194,12 +1190,11 @@
;; The media processing adds the data to the
;; input map and returns it.
(media/run {:cmd :info :input item}))
+
(catch Throwable _
- (let [team-id *team-id*]
- (l/wrn :hint "unable to process embedded images on svg file"
- :team-id (str team-id)
- :file-id (str file-id)
- :media-id (str media-id)))
+ (l/wrn :hint "unable to process embedded images on svg file"
+ :file-id (str file-id)
+ :media-id (str media-id))
nil)))
(persist-image [acc {:keys [path size width height mtype href] :as item}]
@@ -1332,24 +1327,20 @@
(catch Throwable cause
(vreset! err true)
(let [cause (pu/unwrap-exception cause)
- edata (ex-data cause)
- team-id *team-id*]
+ edata (ex-data cause)]
(cond
(instance? org.xml.sax.SAXParseException cause)
(l/inf :hint "skip processing media object: invalid svg found"
- :team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj)))
(instance? org.graalvm.polyglot.PolyglotException cause)
(l/inf :hint "skip processing media object: invalid svg found"
- :team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj)))
(= (:type edata) :not-found)
(l/inf :hint "skip processing media object: underlying object does not exist"
- :team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj)))
@@ -1357,7 +1348,6 @@
(let [skip? *skip-on-graphic-error*]
(l/wrn :hint "unable to process file media object"
:skiped skip?
- :team-id (str team-id)
:file-id (str (:id fdata))
:id (str (:id mobj))
:cause cause)
@@ -1524,7 +1514,9 @@
(defn migrate-file!
[system file-id & {:keys [validate? skip-on-graphic-error? label]}]
- (let [tpoint (dt/tpoint)]
+ (let [tpoint (dt/tpoint)
+ err (volatile! false)]
+
(binding [*file-stats* (atom {})
*skip-on-graphic-error* skip-on-graphic-error?]
(try
@@ -1533,40 +1525,50 @@
:validate validate?
:skip-on-graphic-error skip-on-graphic-error?)
- (let [system (update system ::sto/storage media/configure-assets-storage)]
- (db/tx-run! system
- (fn [system]
- (try
- (binding [*system* system]
- (when (string? label)
- (fsnap/take-file-snapshot! system {:file-id file-id
- :label (str "migration/" label)}))
- (let [file (get-file system file-id)]
- (events/tap :progress
- {:op :migrate-file
- :name (:name file)
- :id (:id file)})
+ (db/tx-run! (update system ::sto/storage media/configure-assets-storage)
+ (fn [system]
+ (binding [*system* system]
+ (when (string? label)
+ (fsnap/take-file-snapshot! system {:file-id file-id
+ :label (str "migration/" label)}))
+ (let [file (get-file system file-id)]
+ (events/tap :progress
+ {:op :migrate-file
+ :name (:name file)
+ :id (:id file)})
- (process-file system file :validate? validate?)))
+ (process-file system file :validate? validate?)))))
- (catch Throwable cause
- (let [team-id *team-id*]
- (l/wrn :hint "error on processing file"
- :team-id (str team-id)
- :file-id (str file-id))
- (throw cause)))))))
+ (catch Throwable cause
+ (vreset! err true)
+ (l/wrn :hint "error on processing file"
+ :file-id (str file-id)
+ :cause cause)
+ (throw cause))
(finally
(let [elapsed (tpoint)
components (get @*file-stats* :processed-components 0)
graphics (get @*file-stats* :processed-graphics 0)]
- (l/dbg :hint "migrate:file:end"
- :file-id (str file-id)
- :graphics graphics
- :components components
- :validate validate?
- :elapsed (dt/format-duration elapsed))
+ (if (cache/cache? *cache*)
+ (let [cache-stats (cache/stats *cache*)]
+ (l/dbg :hint "migrate:file:end"
+ :file-id (str file-id)
+ :graphics graphics
+ :components components
+ :validate validate?
+ :crt (mth/to-fixed (:hit-rate cache-stats) 2)
+ :crq (str (:req-count cache-stats))
+ :error @err
+ :elapsed (dt/format-duration elapsed)))
+ (l/dbg :hint "migrate:file:end"
+ :file-id (str file-id)
+ :graphics graphics
+ :components components
+ :validate validate?
+ :error @err
+ :elapsed (dt/format-duration elapsed)))
(some-> *stats* (swap! update :processed-files (fnil inc 0)))
(some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
@@ -1607,13 +1609,15 @@
(update-team-features! conn id features)))))]
- (binding [*team-stats* (atom {})
- *team-id* team-id]
+ (binding [*team-stats* (atom {})]
(try
(db/tx-run! system migrate-team team-id)
(catch Throwable cause
(vreset! err true)
+ (l/wrn :hint "error on processing team"
+ :team-id (str team-id)
+ :cause cause)
(throw cause))
(finally
diff --git a/backend/src/app/rpc/commands/files_snapshot.clj b/backend/src/app/rpc/commands/files_snapshot.clj
index 68b3a017c..3b90023fb 100644
--- a/backend/src/app/rpc/commands/files_snapshot.clj
+++ b/backend/src/app/rpc/commands/files_snapshot.clj
@@ -70,8 +70,8 @@
(some? (:data snapshot)))
(l/debug :hint "snapshot found"
- :snapshot-id (:id snapshot)
- :file-id file-id)
+ :snapshot-id (str (:id snapshot))
+ :file-id (str file-id))
(db/update! conn :file
{:data (:data snapshot)}
@@ -112,7 +112,9 @@
(when-let [file (db/get* conn :file {:id file-id})]
(let [id (uuid/next)
label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123)))]
- (l/debug :hint "persisting file snapshot" :file-id file-id :label label)
+ (l/debug :hint "persisting file snapshot"
+ :file-id (str file-id)
+ :label label)
(db/insert! conn :file-change
{:id id
:revn (:revn file)
diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj
index 4adf55293..9bada320a 100644
--- a/backend/src/app/srepl/components_v2.clj
+++ b/backend/src/app/srepl/components_v2.clj
@@ -32,12 +32,20 @@
(defn- report-progress-files
[tpoint]
(fn [_ _ oldv newv]
- (when (not= (:processed-files oldv)
- (:processed-files newv))
- (let [elapsed (tpoint)]
+ (when (or (not= (:processed-files oldv)
+ (:processed-files newv))
+ (not= (:errors oldv)
+ (:errors newv)))
+ (let [completed (:processed-files newv 0)
+ errors (:errors newv 0)
+ elapsed (dt/format-duration (tpoint))]
+ (events/tap :progress-report
+ {:elapsed elapsed
+ :completed completed
+ :errors errors})
(l/dbg :hint "progress"
- :completed (:processed-files newv)
- :elapsed (dt/format-duration elapsed))))))
+ :completed completed
+ :elapsed elapsed)))))
(defn- report-progress-teams
[tpoint]
@@ -101,13 +109,47 @@
(def ^:private sql:get-teams-by-report
"WITH teams AS (
SELECT t.id t.features, mr.name
- FROM migration_report AS mr
+ FROM migration_team_report AS mr
JOIN team AS t ON (t.id = mr.team_id)
WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM teams %(pred)s")
+(def ^:private sql:get-files-by-created-at
+ "SELECT id, features
+ FROM file
+ WHERE deleted_at IS NULL
+ ORDER BY created_at DESC")
+
+(def ^:private sql:get-files-by-modified-at
+ "SELECT id, features
+ FROM file
+ WHERE deleted_at IS NULL
+ ORDER BY modified_at DESC")
+
+(def ^:private sql:get-files-by-graphics
+ "WITH files AS (
+ SELECT f.id, f.features,
+ (SELECT count(*) FROM file_media_object AS fmo
+ WHERE fmo.mtype = 'image/svg+xml'
+ AND fmo.is_local = false
+ AND fmo.file_id = f.id) AS graphics
+ FROM file AS f
+ WHERE f.deleted_at IS NULL
+ ORDER BY 3 ASC
+ ) SELECT * FROM files %(pred)s")
+
+(def ^:private sql:get-files-by-report
+ "WITH files AS (
+ SELECT t.id t.features, mr.name
+ FROM migration_file_report AS mr
+ JOIN file AS t ON (t.id = mr.file_id)
+ WHERE t.deleted_at IS NULL
+ AND mr.error IS NOT NULL
+ ORDER BY mr.created_at
+ ) SELECT id, features FROM files %(pred)s")
+
(defn- read-pred
[entries]
(let [entries (if (and (vector? entries)
@@ -140,7 +182,6 @@
:activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics
:report sql:get-teams-by-report)
-
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
@@ -154,34 +195,78 @@
(contains? features "components/v2")))
(map :id))))
-(def ^:private sql:report-table
- "CREATE UNLOGGED TABLE IF NOT EXISTS migration_report (
+(defn- get-files
+ [conn query pred]
+ (let [query (d/nilv query :created-at)
+ sql (case query
+ :created-at sql:get-files-by-created-at
+ :modified-at sql:get-files-by-modified-at
+ :graphics sql:get-files-by-graphics
+ :report sql:get-files-by-report)
+ sql (if pred
+ (let [[pred-sql & pred-params] (read-pred pred)]
+ (apply vector
+ (str/format sql {:pred pred-sql})
+ pred-params))
+ [(str/format sql {:pred ""})])]
+
+ (->> (db/cursor conn sql {:chunk-size 500})
+ (map feat/decode-row)
+ (remove (fn [{:keys [features]}]
+ (contains? features "components/v2")))
+ (map :id))))
+
+(def ^:private sql:team-report-table
+ "CREATE UNLOGGED TABLE IF NOT EXISTS migration_team_report (
id bigserial NOT NULL,
label text NOT NULL,
team_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
- PRIMARY KEY (label, created_at, id)
- )")
+ PRIMARY KEY (label, created_at, id))")
-(defn- create-report-table!
+(def ^:private sql:file-report-table
+ "CREATE UNLOGGED TABLE IF NOT EXISTS migration_file_report (
+ id bigserial NOT NULL,
+ label text NOT NULL,
+ file_id UUID NOT NULL,
+ error text NULL,
+ created_at timestamptz NOT NULL DEFAULT now(),
+ elapsed bigint NOT NULL,
+ PRIMARY KEY (label, created_at, id))")
+
+(defn- create-report-tables!
[system]
- (db/exec-one! system [sql:report-table]))
+ (db/exec-one! system [sql:team-report-table])
+ (db/exec-one! system [sql:file-report-table]))
-(defn- clean-reports!
+(defn- clean-team-reports!
[system label]
- (db/delete! system :migration-report {:label label}))
+ (db/delete! system :migration-team-report {:label label}))
-(defn- report!
+(defn- team-report!
[system team-id label elapsed error]
- (db/insert! system :migration-report
+ (db/insert! system :migration-team-report
{:label label
:team-id team-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
+(defn- clean-file-reports!
+ [system label]
+ (db/delete! system :migration-file-report {:label label}))
+
+(defn- file-report!
+ [system file-id label elapsed error]
+ (db/insert! system :migration-file-report
+ {:label label
+ :file-id file-id
+ :elapsed (inst-ms elapsed)
+ :error error}
+ {::db/return-keys false}))
+
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
@@ -318,12 +403,11 @@
:skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label)
- (report! main/system team-id label (tpoint) nil))
+ (team-report! main/system team-id label (tpoint) nil))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
- :team-id (str team-id)
- :cause cause)
+ :team-id (str team-id))
(events/tap :error
(ex-info "unexpected error on processing team (skiping)"
@@ -333,7 +417,7 @@
(swap! stats update :errors (fnil inc 0))
(when (string? label)
- (report! main/system team-id label (tpoint) (ex-message cause))))
+ (team-report! main/system team-id label (tpoint) (ex-message cause))))
(finally
(ps/release! sjobs)))))
@@ -365,8 +449,8 @@
svgo/*semaphore* sprocs]
(try
(when (string? label)
- (create-report-table! main/system)
- (clean-reports! main/system label))
+ (create-report-tables! main/system)
+ (clean-team-reports! main/system label))
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
@@ -399,6 +483,146 @@
:rollback rollback?
:elapsed elapsed)))))))
+
+(defn migrate-files!
+ "A REPL helper for migrate all files.
+
+ This function starts multiple concurrent file migration processes
+ until thw maximum number of jobs is reached which by default has the
+ value of `1`. This is controled with the `:max-jobs` option.
+
+ If you want to run this on multiple machines you will need to specify
+ the total number of partitions and the current partition.
+
+ In order to get the report table populated, you will need to provide
+ a correct `:label`. That label is also used for persist a file
+ snaphot before continue with the migration."
+ [& {:keys [max-jobs max-items max-time rollback? validate? query
+ pred max-procs cache skip-on-graphic-error?
+ label partitions current-partition]
+ :or {validate? false
+ rollback? true
+ max-jobs 1
+ current-partition 1
+ skip-on-graphic-error? true
+ max-items Long/MAX_VALUE}}]
+
+ (when (int? partitions)
+ (when-not (int? current-partition)
+ (throw (IllegalArgumentException. "missing `current-partition` parameter")))
+ (when-not (<= 0 current-partition partitions)
+ (throw (IllegalArgumentException. "invalid value on `current-partition` parameter"))))
+
+ (let [stats (atom {})
+ tpoint (dt/tpoint)
+ mtime (some-> max-time dt/duration)
+
+ factory (px/thread-factory :virtual false :prefix "penpot/migration/")
+ executor (px/cached-executor :factory factory)
+
+ max-procs (or max-procs max-jobs)
+ sjobs (ps/create :permits max-jobs)
+ sprocs (ps/create :permits max-procs)
+
+ cache (if (int? cache)
+ (cache/create :executor (::wrk/executor main/system)
+ :max-items cache)
+ nil)
+
+ migrate-file
+ (fn [file-id]
+ (let [tpoint (dt/tpoint)]
+ (try
+ (db/tx-run! (assoc main/system ::db/rollback rollback?)
+ (fn [system]
+ (db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"])
+ (feat/migrate-file! system file-id
+ :label label
+ :validate? validate?
+ :skip-on-graphic-error? skip-on-graphic-error?)))
+
+ (when (string? label)
+ (file-report! main/system file-id label (tpoint) nil))
+
+ (catch Throwable cause
+ (l/wrn :hint "unexpected error on processing file (skiping)"
+ :file-id (str file-id))
+
+ (events/tap :error
+ (ex-info "unexpected error on processing file (skiping)"
+ {:file-id file-id}
+ cause))
+
+ (swap! stats update :errors (fnil inc 0))
+
+ (when (string? label)
+ (file-report! main/system file-id label (tpoint) (ex-message cause))))
+
+ (finally
+ (ps/release! sjobs)))))
+
+ process-file
+ (fn [file-id]
+ (ps/acquire! sjobs)
+ (let [ts (tpoint)]
+ (if (and mtime (neg? (compare mtime ts)))
+ (do
+ (l/inf :hint "max time constraint reached"
+ :file-id (str file-id)
+ :elapsed (dt/format-duration ts))
+ (ps/release! sjobs)
+ (reduced nil))
+
+ (px/run! executor (partial migrate-file file-id)))))]
+
+ (l/dbg :hint "migrate:start"
+ :label label
+ :rollback rollback?
+ :max-jobs max-jobs
+ :max-items max-items)
+
+ (add-watch stats :progress-report (report-progress-files tpoint))
+
+ (binding [feat/*stats* stats
+ feat/*cache* cache
+ svgo/*semaphore* sprocs]
+ (try
+ (when (string? label)
+ (create-report-tables! main/system)
+ (clean-file-reports! main/system label))
+
+ (db/tx-run! main/system
+ (fn [{:keys [::db/conn] :as system}]
+ (db/exec! conn ["SET statement_timeout = 0"])
+ (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
+
+ (run! process-file
+ (->> (get-files conn query pred)
+ (filter (fn [file-id]
+ (if (int? partitions)
+ (= current-partition (-> (uuid/hash-int file-id)
+ (mod partitions)
+ (inc)))
+ true)))
+ (take max-items)))
+
+ ;; Close and await tasks
+ (pu/close! executor)))
+
+ (-> (deref stats)
+ (assoc :elapsed (dt/format-duration (tpoint))))
+
+ (catch Throwable cause
+ (l/dbg :hint "migrate:error" :cause cause)
+ (events/tap :error cause))
+
+ (finally
+ (let [elapsed (dt/format-duration (tpoint))]
+ (l/dbg :hint "migrate:end"
+ :rollback rollback?
+ :elapsed elapsed)))))))
+
+
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE PROCESS HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;