diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index b9cf8a79a..21a47d05f 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -20,6 +20,7 @@ [app.common.geom.rect :as grc] [app.common.geom.shapes :as gsh] [app.common.logging :as l] + [app.common.math :as mth] [app.common.svg :as csvg] [app.common.svg.shapes-builder :as sbuilder] [app.common.types.component :as ctk] @@ -48,7 +49,8 @@ [buddy.core.codecs :as bc] [cuerdas.core :as str] [datoteka.io :as io] - [promesa.core :as p])) + [promesa.exec :as px] + [promesa.util :as pu])) (def ^:dynamic *stats* "A dynamic var for setting up state for collect stats globally." @@ -67,6 +69,10 @@ internal functions without the need to explicitly pass it top down." nil) +(def ^:dynamic ^:private *team-id* + "A dynamic var that holds the current processing team-id." + nil) + (def ^:dynamic ^:private *file-stats* "An internal dynamic var for collect stats by file." nil) @@ -575,32 +581,40 @@ (if (> ext-idx 0) (subs filename 0 ext-idx) filename))) (defn- collect-and-persist-images - [svg-data file-id] + [svg-data file-id media-id] (letfn [(process-image [{:keys [href] :as item}] - (let [item (if (str/starts-with? href "data:") - (let [[mtype data] (parse-datauri href) - size (alength data) - path (tmp/tempfile :prefix "penpot.media.download.") - written (io/write-to-file! data path :size size)] + (try + (let [item (if (str/starts-with? href "data:") + (let [[mtype data] (parse-datauri href) + size (alength data) + path (tmp/tempfile :prefix "penpot.media.download.") + written (io/write-to-file! data path :size size)] - (when (not= written size) - (ex/raise :type :internal - :code :mismatch-write-size - :hint "unexpected state: unable to write to file")) + (when (not= written size) + (ex/raise :type :internal + :code :mismatch-write-size + :hint "unexpected state: unable to write to file")) - (-> item - (assoc :size size) - (assoc :path path) - (assoc :filename "tempfile") - (assoc :mtype mtype))) + (-> item + (assoc :size size) + (assoc :path path) + (assoc :filename "tempfile") + (assoc :mtype mtype))) - (let [result (cmd.media/download-image *system* href)] - (-> (merge item result) - (assoc :name (extract-name href)))))] + (let [result (cmd.media/download-image *system* href)] + (-> (merge item result) + (assoc :name (extract-name href)))))] - ;; The media processing adds the data to the - ;; input map and returns it. - (media/run {:cmd :info :input item}))) + ;; The media processing adds the data to the + ;; input map and returns it. + (media/run {:cmd :info :input item})) + (catch Throwable _ + (let [team-id *team-id*] + (l/wrn :hint "unable to process embedded images on svg file" + :team-id (str team-id) + :file-id (str file-id) + :media-id (str media-id))) + nil))) (persist-image [acc {:keys [path size width height mtype href] :as item}] (let [storage (::sto/storage *system*) @@ -639,9 +653,7 @@ (defn- resolve-sobject-id [id] (let [fmobject (db/get *system* :file-media-object {:id id} - {::db/check-deleted false - ::db/remove-deleted false - ::sql/columns [:media-id]})] + {::sql/columns [:media-id]})] (:media-id fmobject))) (defn- get-sobject-content @@ -660,12 +672,11 @@ (assoc :name (:name mobj))))) sid (resolve-sobject-id id) - svg-data (if (cache/cache? *cache*) - (cache/get *cache* sid get-svg) + (cache/get *cache* sid (px/wrap-bindings get-svg)) (get-svg sid)) - svg-data (collect-and-persist-images file-id)] + svg-data (collect-and-persist-images svg-data file-id id)] (sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false))) @@ -724,25 +735,49 @@ (defn- create-media-grid [fdata page-id frame-id grid media-group] - (letfn [(process [fdata mobj position] + (letfn [(process-media-object [fdata mobj position] (let [position (gpt/add position (gpt/point grid-gap grid-gap)) - tp (dt/tpoint)] + tp (dt/tpoint) + err (volatile! false)] (try (let [changes (process-media-object fdata page-id frame-id mobj position)] (cp/process-changes fdata changes false)) + (catch Throwable cause - (if *skip-on-graphic-error* - (l/wrn :hint "unable to process file media object (skiping)" - :file-id (str (:id fdata)) - :id (str (:id mobj)) - :cause cause) - (throw cause)) - nil) + (vreset! err true) + (let [cause (pu/unwrap-exception cause) + edata (ex-data cause) + team-id *team-id*] + (cond + (instance? org.xml.sax.SAXParseException cause) + (l/inf :hint "skip processing media object: invalid svg found" + :team-id (str team-id) + :file-id (str (:id fdata)) + :id (str (:id mobj))) + + (= (:type edata) :not-found) + (l/inf :hint "skip processing media object: underlying object does not exist" + :team-id (str team-id) + :file-id (str (:id fdata)) + :id (str (:id mobj))) + + :else + (let [skip? *skip-on-graphic-error*] + (l/wrn :hint "unable to process file media object" + :skiped skip? + :team-id (str team-id) + :file-id (str (:id fdata)) + :id (str (:id mobj)) + :cause cause) + (when-not skip? + (throw cause)))) + nil)) (finally (let [elapsed (tp)] (l/trc :hint "graphic processed" :file-id (str (:id fdata)) :media-id (str (:id mobj)) + :error @err :elapsed (dt/format-duration elapsed)))))))] (->> (d/zip media-group grid) @@ -750,7 +785,8 @@ (sse/tap {:type :migration-progress :section :graphics :name (:name mobj)}) - (or (process fdata mobj position) fdata)) + (or (process-media-object fdata mobj position) + fdata)) (assoc-in fdata [:options :components-v2] true))))) (defn- migrate-graphics @@ -822,7 +858,6 @@ (update :data fdata/process-pointers deref) (fmg/migrate-file)))) - (defn- get-team [system team-id] (-> (db/get system :team {:id team-id} @@ -870,12 +905,13 @@ (dissoc file :data))) - (def ^:private sql:get-and-lock-team-files "SELECT f.id FROM file AS f JOIN project AS p ON (p.id = f.project_id) WHERE p.team_id = ? + AND p.deleted_at IS NULL + AND f.deleted_at IS NULL FOR UPDATE") (defn- get-and-lock-files @@ -900,14 +936,26 @@ (binding [*file-stats* (atom {}) *skip-on-graphic-error* skip-on-graphic-error?] (try - (l/dbg :hint "migrate:file:start" :file-id (str file-id)) + (l/dbg :hint "migrate:file:start" + :file-id (str file-id) + :validate validate? + :skip-on-graphics-error skip-on-graphic-error?) (let [system (update system ::sto/storage media/configure-assets-storage)] (db/tx-run! system (fn [system] - (binding [*system* system] - (fsnap/take-file-snapshot! system {:file-id file-id :label "migration/components-v2"}) - (process-file system file-id :validate? validate?))))) + (try + (binding [*system* system] + (fsnap/take-file-snapshot! system {:file-id file-id :label "migration/components-v2"}) + (process-file system file-id :validate? validate?)) + + (catch Throwable cause + (let [team-id *team-id*] + (l/wrn :hint "error on processing file" + :team-id (str team-id) + :file-id (str file-id)) + (throw cause))))))) + (finally (let [elapsed (tpoint) components (get @*file-stats* :processed/components 0) @@ -917,6 +965,7 @@ :file-id (str file-id) :graphics graphics :components components + :validate validate? :elapsed (dt/format-duration elapsed)) (some-> *stats* (swap! update :processed/files (fnil inc 0))) @@ -929,6 +978,7 @@ :team-id (dm/str team-id)) (let [tpoint (dt/tpoint) + err (volatile! false) migrate-file (fn [system file-id] @@ -948,7 +998,8 @@ (update-team-features! conn id features)))] - (binding [*team-stats* (atom {})] + (binding [*team-stats* (atom {}) + *team-id* team-id] (try (db/tx-run! system (fn [system] (db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) @@ -956,6 +1007,10 @@ (if (contains? (:features team) "components/v2") (l/inf :hint "team already migrated") (migrate-team system team))))) + (catch Throwable cause + (vreset! err true) + (throw cause)) + (finally (let [elapsed (tpoint) components (get @*team-stats* :processed/components 0) @@ -964,9 +1019,21 @@ (some-> *stats* (swap! update :processed/teams (fnil inc 0))) - (l/dbg :hint "migrate:team:end" - :team-id (dm/str team-id) - :files files - :components components - :graphics graphics - :elapsed (dt/format-duration elapsed)))))))) + (if (cache/cache? *cache*) + (let [cache-stats (cache/stats *cache*)] + (l/dbg :hint "migrate:team:end" + :team-id (dm/str team-id) + :files files + :components components + :graphics graphics + :crt (mth/to-fixed (:hit-rate cache-stats) 2) + :crq (str (:req-count cache-stats)) + :error @err + :elapsed (dt/format-duration elapsed))) + + (l/dbg :hint "migrate:team:end" + :team-id (dm/str team-id) + :files files + :components components + :graphics graphics + :elapsed (dt/format-duration elapsed))))))))) diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index 30b23dcea..7c82d1b1c 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -10,10 +10,12 @@ [app.common.pprint :as pp] [app.db :as db] [app.features.components-v2 :as feat] + [app.main :as main] [app.svgo :as svgo] + [app.util.cache :as cache] [app.util.time :as dt] + [app.worker :as-alias wrk] [cuerdas.core :as str] - [promesa.core :as p] [promesa.exec :as px] [promesa.exec.semaphore :as ps] [promesa.util :as pu])) @@ -36,8 +38,7 @@ (fn [_ _ oldv newv] (when (not= (:processed/files oldv) (:processed/files newv)) - (let [completed (:processed/files newv) - elapsed (tpoint)] + (let [elapsed (tpoint)] (l/dbg :hint "progress" :completed (:processed/files newv) :elapsed (dt/format-duration elapsed)))))) @@ -56,71 +57,13 @@ :completed completed :elapsed elapsed))))) -(defn- get-total-files - [pool & {:keys [team-id]}] - (if (some? team-id) - (let [sql (str/concat - "SELECT count(f.id) AS count FROM file AS f " - " JOIN project AS p ON (p.id = f.project_id) " - " WHERE p.team_id = ? AND f.deleted_at IS NULL " - " AND p.deleted_at IS NULL") - res (db/exec-one! pool [sql team-id])] - (:count res)) +(def ^:private sql:get-teams-1 + "SELECT id, features + FROM team + WHERE deleted_at IS NULL + ORDER BY created_at DESC") - (let [sql (str/concat - "SELECT count(id) AS count FROM file " - " WHERE deleted_at IS NULL") - res (db/exec-one! pool [sql])] - (:count res)))) - -(defn- get-total-teams - [pool] - (let [sql (str/concat - "SELECT count(id) AS count FROM team " - " WHERE deleted_at IS NULL") - res (db/exec-one! pool [sql])] - (:count res))) - -(defn- mark-team-migration! - [{:keys [::db/pool]} team-id] - ;; We execute this out of transaction because we want this - ;; change to be visible to all other sessions before starting - ;; the migration - (let [sql (str "UPDATE team SET features = " - " array_append(features, 'ephimeral/v2-migration') " - " WHERE id = ?")] - (db/exec-one! pool [sql team-id]))) - -(defn- unmark-team-migration! - [{:keys [::db/pool]} team-id] - ;; We execute this out of transaction because we want this - ;; change to be visible to all other sessions before starting - ;; the migration - (let [sql (str "UPDATE team SET features = " - " array_remove(features, 'ephimeral/v2-migration') " - " WHERE id = ?")] - (db/exec-one! pool [sql team-id]))) - -;; (def ^:private sql:get-teams -;; "SELECT id, features -;; FROM team -;; WHERE deleted_at IS NULL -;; ORDER BY created_at DESC") - -;; (def ^:private sql:get-teams -;; "SELECT t.id, t.features, -;; (SELECT count(*) -;; FROM file_media_object AS fmo -;; JOIN file AS f ON (f.id = fmo.file_id) -;; JOIN project AS p ON (p.id = f.project_id) -;; WHERE p.team_id = t.id -;; AND fmo.mtype = 'image/svg+xml' -;; AND fmo.is_local = false) AS graphics -;; FROM team AS t -;; ORDER BY t.created_at DESC") - - -(def ^:private sql:get-teams +(def ^:private sql:get-teams-2 "WITH teams AS ( SELECT t.id, t.features, (SELECT count(*) @@ -136,20 +79,37 @@ SELECT * FROM teams ") (defn- read-pred - [[op val field]] - (let [field (name field)] - (case op - :lt [(str/ffmt "WHERE % < ?" field) val] - :lte [(str/ffmt "WHERE % <= ?" field) val] - :gt [(str/ffmt "WHERE % > ?" field) val] - :gte [(str/ffmt "WHERE % >= ?" field) val] - :eq [(str/ffmt "WHERE % = ?" field) val] - [""]))) + [entries] + (let [entries (if (and (vector? entries) + (keyword? (first entries))) + [entries] + entries)] + (loop [params [] + queries [] + entries (seq entries)] + (if-let [[op val field] (first entries)] + (let [field (name field) + cond (case op + :lt (str/ffmt "% < ?" field) + :lte (str/ffmt "% <= ?" field) + :gt (str/ffmt "% > ?" field) + :gte (str/ffmt "% >= ?" field) + :eq (str/ffmt "% = ?" field))] + (recur (conj params val) + (conj queries cond) + (rest entries))) + + (let [sql (apply str "WHERE " (str/join " AND " queries))] + (apply vector sql params)))))) (defn- get-teams [conn pred] - (let [[sql & params] (read-pred pred)] - (->> (db/cursor conn (apply vector (str sql:get-teams sql) params)) + (let [sql (if pred + (let [[sql & params] (read-pred pred)] + (apply vector (str sql:get-teams-2 sql) params)) + [sql:get-teams-1])] + + (->> (db/cursor conn sql) (map feat/decode-row) (remove (fn [{:keys [features]}] (or (contains? features "ephimeral/v2-migration") @@ -162,8 +122,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn migrate-file! - [system file-id & {:keys [rollback?] :or {rollback? true}}] - + [file-id & {:keys [rollback? validate?] :or {rollback? true validate? false}}] (l/dbg :hint "migrate:start" :rollback rollback?) (let [tpoint (dt/tpoint) file-id (if (string? file-id) @@ -171,8 +130,8 @@ file-id)] (binding [feat/*stats* (atom {})] (try - (-> (assoc system ::db/rollback rollback?) - (feat/migrate-file! file-id)) + (-> (assoc main/system ::db/rollback rollback?) + (feat/migrate-file! file-id :validate? validate?)) (-> (deref feat/*stats*) (assoc :elapsed (dt/format-duration (tpoint)))) @@ -185,12 +144,10 @@ (l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed))))))) (defn migrate-team! - [{:keys [::db/pool] :as system} team-id & {:keys [rollback? skip-on-graphic-error? validate? skip-mark?] - :or {rollback? true - validate? true - skip-on-graphic-error? false - skip-mark? false} - :as opts}] + [team-id & {:keys [rollback? skip-on-graphic-error? validate?] + :or {rollback? true + validate? true + skip-on-graphic-error? false}}] (l/dbg :hint "migrate:start" :rollback rollback?) @@ -204,10 +161,7 @@ (binding [feat/*stats* stats] (try - (when-not skip-mark? - (mark-team-migration! system team-id)) - - (-> (assoc system ::db/rollback rollback?) + (-> (assoc main/system ::db/rollback rollback?) (feat/migrate-team! team-id :validate? validate? :skip-on-graphics-error? skip-on-graphic-error?)) @@ -219,9 +173,6 @@ (l/dbg :hint "migrate:error" :cause cause)) (finally - (when-not skip-mark? - (unmark-team-migration! system team-id)) - (let [elapsed (dt/format-duration (tpoint))] (l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed))))))) @@ -232,28 +183,31 @@ until thw maximum number of jobs is reached which by default has the value of `1`. This is controled with the `:max-jobs` option." - [{:keys [::db/pool] :as system} & {:keys [max-jobs max-items max-time - rollback? validate? preset - pred max-procs skip-mark? - on-start on-progress on-error on-end] - :or {validate? true - rollback? true - preset :shutdown-on-failure - skip-mark? true - max-jobs 1 - max-items Long/MAX_VALUE} - :as opts}] + [& {:keys [max-jobs max-items max-time rollback? validate? + pred max-procs cache on-start on-progress on-error on-end + skip-on-graphic-error?] + :or {validate? false + rollback? true + max-jobs 1 + skip-on-graphic-error? true + max-items Long/MAX_VALUE}}] (let [stats (atom {}) tpoint (dt/tpoint) mtime (some-> max-time dt/duration) - factory (px/thread-factory :virtual false :prefix "penpot/migration/compv2/") + factory (px/thread-factory :virtual false :prefix "penpot/migration/") executor (px/cached-executor :factory factory) + max-procs (or max-procs max-jobs) sjobs (ps/create :permits max-jobs) sprocs (ps/create :permits max-procs) + cache (if (int? cache) + (cache/create :executor (::wrk/executor main/system) + :max-items cache) + nil) + migrate-team (fn [team-id] (ps/acquire! sjobs) @@ -268,18 +222,18 @@ (px/run! executor (fn [] (try - (when-not skip-mark? - (mark-team-migration! system team-id)) - (-> (assoc system ::db/rollback rollback?) - (feat/migrate-team! team-id :validate? validate?)) + (-> (assoc main/system ::db/rollback rollback?) + (feat/migrate-team! team-id + :validate? validate? + :skip-on-graphics-error? skip-on-graphic-error?)) + (catch Throwable cause - (l/err :hint "unexpected error on processing team (skiping)" + (l/wrn :hint "unexpected error on processing team (skiping)" :team-id (str team-id) :cause cause)) + (finally - (ps/release! sjobs) - (when-not skip-mark? - (unmark-team-migration! system team-id)))))))))] + (ps/release! sjobs))))))))] (l/dbg :hint "migrate:start" :rollback rollback? @@ -289,14 +243,17 @@ (add-watch stats :progress-report (report-progress-teams tpoint on-progress)) (binding [feat/*stats* stats + feat/*cache* cache svgo/*semaphore* sprocs] (try (when (fn? on-start) (on-start {:rollback rollback?})) - (db/tx-run! system + (db/tx-run! main/system (fn [{:keys [::db/conn]}] - (run! (partial migrate-team) + (db/exec! conn ["SET statement_timeout = 0;"]) + (db/exec! conn ["SET idle_in_transaction_session_timeout = 0;"]) + (run! migrate-team (->> (get-teams conn pred) (take max-items)))))