Simplify SVGO module API

This commit is contained in:
Andrey Antukh 2024-01-14 20:53:03 +01:00
parent 4fc391763e
commit 944d167bbb
6 changed files with 205 additions and 237 deletions

View file

@ -209,7 +209,6 @@
(s/def ::telemetry-uri ::us/string) (s/def ::telemetry-uri ::us/string)
(s/def ::telemetry-with-taiga ::us/boolean) (s/def ::telemetry-with-taiga ::us/boolean)
(s/def ::tenant ::us/string) (s/def ::tenant ::us/string)
(s/def ::svgo-max-procs ::us/integer)
(s/def ::config (s/def ::config
(s/keys :opt-un [::secret-key (s/keys :opt-un [::secret-key
@ -329,9 +328,7 @@
::telemetry-uri ::telemetry-uri
::telemetry-referer ::telemetry-referer
::telemetry-with-taiga ::telemetry-with-taiga
::tenant ::tenant]))
::svgo-max-procs]))
(def default-flags (def default-flags
[:enable-backend-api-doc [:enable-backend-api-doc

View file

@ -31,6 +31,7 @@
[app.common.types.shape-tree :as ctst] [app.common.types.shape-tree :as ctst]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.db.sql :as sql]
[app.features.fdata :as fdata] [app.features.fdata :as fdata]
[app.http.sse :as sse] [app.http.sse :as sse]
[app.media :as media] [app.media :as media]
@ -41,6 +42,7 @@
[app.storage.tmp :as tmp] [app.storage.tmp :as tmp]
[app.svgo :as svgo] [app.svgo :as svgo]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.cache :as cache]
[app.util.pointer-map :as pmap] [app.util.pointer-map :as pmap]
[app.util.time :as dt] [app.util.time :as dt]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
@ -52,20 +54,19 @@
"A dynamic var for setting up state for collect stats globally." "A dynamic var for setting up state for collect stats globally."
nil) nil)
(def ^:dynamic *skip-on-error* (def ^:dynamic *cache*
"A dynamic var for setting up the default error behavior." "A dynamic var for setting up a cache instance."
true) nil)
(def ^:dynamic *skip-on-graphic-error*
"A dynamic var for setting up the default error behavior for graphics processing."
nil)
(def ^:dynamic ^:private *system* (def ^:dynamic ^:private *system*
"An internal var for making the current `system` available to all "An internal var for making the current `system` available to all
internal functions without the need to explicitly pass it top down." internal functions without the need to explicitly pass it top down."
nil) nil)
(def ^:dynamic ^:private *max-procs*
"A dynamic variable that can optionally indicates the maxumum number
of concurrent graphics migration processes."
nil)
(def ^:dynamic ^:private *file-stats* (def ^:dynamic ^:private *file-stats*
"An internal dynamic var for collect stats by file." "An internal dynamic var for collect stats by file."
nil) nil)
@ -576,37 +577,30 @@
(defn- collect-and-persist-images (defn- collect-and-persist-images
[svg-data file-id] [svg-data file-id]
(letfn [(process-image [{:keys [href] :as item}] (letfn [(process-image [{:keys [href] :as item}]
(try (let [item (if (str/starts-with? href "data:")
(let [item (if (str/starts-with? href "data:") (let [[mtype data] (parse-datauri href)
(let [[mtype data] (parse-datauri href) size (alength data)
size (alength data) path (tmp/tempfile :prefix "penpot.media.download.")
path (tmp/tempfile :prefix "penpot.media.download.") written (io/write-to-file! data path :size size)]
written (io/write-to-file! data path :size size)]
(when (not= written size) (when (not= written size)
(ex/raise :type :internal (ex/raise :type :internal
:code :mismatch-write-size :code :mismatch-write-size
:hint "unexpected state: unable to write to file")) :hint "unexpected state: unable to write to file"))
(-> item (-> item
(assoc :size size) (assoc :size size)
(assoc :path path) (assoc :path path)
(assoc :filename "tempfile") (assoc :filename "tempfile")
(assoc :mtype mtype))) (assoc :mtype mtype)))
(let [result (cmd.media/download-image *system* href)] (let [result (cmd.media/download-image *system* href)]
(-> (merge item result) (-> (merge item result)
(assoc :name (extract-name href)))))] (assoc :name (extract-name href)))))]
;; The media processing adds the data to the ;; The media processing adds the data to the
;; input map and returns it. ;; input map and returns it.
(media/run {:cmd :info :input item})) (media/run {:cmd :info :input item})))
(catch Throwable cause
(l/warn :hint "unexpected exception on processing internal image shape (skiping)"
:cause cause)
(when-not *skip-on-error*
(throw cause)))))
(persist-image [acc {:keys [path size width height mtype href] :as item}] (persist-image [acc {:keys [path size width height mtype href] :as item}]
(let [storage (::sto/storage *system*) (let [storage (::sto/storage *system*)
@ -642,23 +636,36 @@
(completing persist-image) {}))] (completing persist-image) {}))]
(assoc svg-data :image-data images)))) (assoc svg-data :image-data images))))
(defn- get-svg-content (defn- resolve-sobject-id
[id]
(let [fmobject (db/get *system* :file-media-object {:id id}
{::db/check-deleted false
::db/remove-deleted false
::sql/columns [:media-id]})]
(:media-id fmobject)))
(defn- get-sobject-content
[id] [id]
(let [storage (::sto/storage *system*) (let [storage (::sto/storage *system*)
conn (::db/conn *system*) sobject (sto/get-object storage id)]
fmobject (db/get conn :file-media-object {:id id})
sobject (sto/get-object storage (:media-id fmobject))]
(with-open [stream (sto/get-object-data storage sobject)] (with-open [stream (sto/get-object-data storage sobject)]
(slurp stream)))) (slurp stream))))
(defn- create-shapes-for-svg (defn- create-shapes-for-svg
[{:keys [id] :as mobj} file-id objects frame-id position] [{:keys [id] :as mobj} file-id objects frame-id position]
(let [svg-text (get-svg-content id) (let [get-svg (fn [sid]
svg-text (svgo/optimize *system* svg-text) (let [svg-text (get-sobject-content sid)
svg-data (-> (csvg/parse svg-text) svg-text (svgo/optimize *system* svg-text)]
(assoc :name (:name mobj)) (-> (csvg/parse svg-text)
(collect-and-persist-images file-id))] (assoc :name (:name mobj)))))
sid (resolve-sobject-id id)
svg-data (if (cache/cache? *cache*)
(cache/get *cache* sid get-svg)
(get-svg sid))
svg-data (collect-and-persist-images file-id)]
(sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false))) (sbuilder/create-svg-shapes svg-data position objects frame-id frame-id #{} false)))
@ -717,42 +724,34 @@
(defn- create-media-grid (defn- create-media-grid
[fdata page-id frame-id grid media-group] [fdata page-id frame-id grid media-group]
(let [process (fn [mobj position] (letfn [(process [fdata mobj position]
(let [position (gpt/add position (gpt/point grid-gap grid-gap)) (let [position (gpt/add position (gpt/point grid-gap grid-gap))
tp1 (dt/tpoint)] tp (dt/tpoint)]
(try (try
(process-media-object fdata page-id frame-id mobj position) (let [changes (process-media-object fdata page-id frame-id mobj position)]
(catch Throwable cause (cp/process-changes fdata changes false))
(l/wrn :hint "unable to process file media object (skiping)" (catch Throwable cause
:file-id (str (:id fdata)) (if *skip-on-graphic-error*
:id (str (:id mobj)) (l/wrn :hint "unable to process file media object (skiping)"
:cause cause) :file-id (str (:id fdata))
(if-not *skip-on-error* :id (str (:id mobj))
(throw cause) :cause cause)
nil)) (throw cause))
(finally nil)
(l/trc :hint "graphic processed" (finally
:file-id (str (:id fdata)) (let [elapsed (tp)]
:media-id (str (:id mobj)) (l/trc :hint "graphic processed"
:elapsed (dt/format-duration (tp1)))))))] :file-id (str (:id fdata))
:media-id (str (:id mobj))
:elapsed (dt/format-duration elapsed)))))))]
(->> (d/zip media-group grid) (->> (d/zip media-group grid)
(partition-all (or *max-procs* 1)) (reduce (fn [fdata [mobj position]]
(mapcat (fn [partition] (sse/tap {:type :migration-progress
(->> partition :section :graphics
(map (fn [[mobj position]] :name (:name mobj)})
(sse/tap {:type :migration-progress (or (process fdata mobj position) fdata))
:section :graphics (assoc-in fdata [:options :components-v2] true)))))
:name (:name mobj)})
(p/vthread (process mobj position))))
(doall)
(map deref)
(doall))))
(filter some?)
(reduce (fn [fdata changes]
(-> (assoc-in fdata [:options :components-v2] true)
(cp/process-changes changes false)))
fdata))))
(defn- migrate-graphics (defn- migrate-graphics
[fdata] [fdata]
@ -832,17 +831,12 @@
(decode-row))) (decode-row)))
(defn- validate-file! (defn- validate-file!
[file libs throw-on-validate?] [file libs]
(try (cfv/validate-file! file libs)
(cfv/validate-file! file libs) (cfv/validate-file-schema! file))
(cfv/validate-file-schema! file)
(catch Throwable cause
(if throw-on-validate?
(throw cause)
(l/wrn :hint "migrate:file:validation-error" :cause cause)))))
(defn- process-file (defn- process-file
[{:keys [::db/conn] :as system} id & {:keys [validate? throw-on-validate?]}] [{:keys [::db/conn] :as system} id & {:keys [validate?]}]
(let [file (get-file system id) (let [file (get-file system id)
libs (->> (files/get-file-libraries conn id) libs (->> (files/get-file-libraries conn id)
@ -855,7 +849,7 @@
(update :features conj "components/v2")) (update :features conj "components/v2"))
_ (when validate? _ (when validate?
(validate-file! file libs throw-on-validate?)) (validate-file! file libs))
file (if (contains? (:features file) "fdata/objects-map") file (if (contains? (:features file) "fdata/objects-map")
(fdata/enable-objects-map file) (fdata/enable-objects-map file)
@ -901,10 +895,10 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn migrate-file! (defn migrate-file!
[system file-id & {:keys [validate? throw-on-validate? max-procs]}] [system file-id & {:keys [validate? skip-on-graphic-error?]}]
(let [tpoint (dt/tpoint)] (let [tpoint (dt/tpoint)]
(binding [*file-stats* (atom {}) (binding [*file-stats* (atom {})
*max-procs* max-procs] *skip-on-graphic-error* skip-on-graphic-error?]
(try (try
(l/dbg :hint "migrate:file:start" :file-id (str file-id)) (l/dbg :hint "migrate:file:start" :file-id (str file-id))
@ -913,9 +907,7 @@
(fn [system] (fn [system]
(binding [*system* system] (binding [*system* system]
(fsnap/take-file-snapshot! system {:file-id file-id :label "migration/components-v2"}) (fsnap/take-file-snapshot! system {:file-id file-id :label "migration/components-v2"})
(process-file system file-id (process-file system file-id :validate? validate?)))))
:validate? validate?
:throw-on-validate? throw-on-validate?)))))
(finally (finally
(let [elapsed (tpoint) (let [elapsed (tpoint)
components (get @*file-stats* :processed/components 0) components (get @*file-stats* :processed/components 0)
@ -931,7 +923,7 @@
(some-> *team-stats* (swap! update :processed/files (fnil inc 0))))))))) (some-> *team-stats* (swap! update :processed/files (fnil inc 0)))))))))
(defn migrate-team! (defn migrate-team!
[system team-id & {:keys [validate? throw-on-validate? max-procs]}] [system team-id & {:keys [validate? skip-on-graphic-error?]}]
(l/dbg :hint "migrate:team:start" (l/dbg :hint "migrate:team:start"
:team-id (dm/str team-id)) :team-id (dm/str team-id))
@ -941,9 +933,8 @@
migrate-file migrate-file
(fn [system file-id] (fn [system file-id]
(migrate-file! system file-id (migrate-file! system file-id
:max-procs max-procs
:validate? validate? :validate? validate?
:throw-on-validate? throw-on-validate?)) :skip-on-graphics-error? skip-on-graphic-error?))
migrate-team migrate-team
(fn [{:keys [::db/conn] :as system} {:keys [id features] :as team}] (fn [{:keys [::db/conn] :as system} {:keys [id features] :as team}]
(let [features (-> features (let [features (-> features

View file

@ -411,8 +411,7 @@
::migrations (ig/ref :app.migrations/migrations)} ::migrations (ig/ref :app.migrations/migrations)}
::svgo/optimizer ::svgo/optimizer
{::wrk/executor (ig/ref ::wrk/executor) {}
::svgo/max-procs (cf/get :svgo-max-procs)}
::audit.tasks/archive ::audit.tasks/archive
{::props (ig/ref ::setup/props) {::props (ig/ref ::setup/props)

View file

@ -664,9 +664,7 @@
(case feature (case feature
"components/v2" "components/v2"
(feat.compv2/migrate-file! options file-id (feat.compv2/migrate-file! options file-id
:max-procs 2 :validate? validate?)
:validate? validate?
:throw-on-validate? true)
"fdata/shape-data-type" "fdata/shape-data-type"
nil nil

View file

@ -10,6 +10,7 @@
[app.common.pprint :as pp] [app.common.pprint :as pp]
[app.db :as db] [app.db :as db]
[app.features.components-v2 :as feat] [app.features.components-v2 :as feat]
[app.svgo :as svgo]
[app.util.time :as dt] [app.util.time :as dt]
[cuerdas.core :as str] [cuerdas.core :as str]
[promesa.core :as p] [promesa.core :as p]
@ -35,14 +36,10 @@
(fn [_ _ oldv newv] (fn [_ _ oldv newv]
(when (not= (:processed/files oldv) (when (not= (:processed/files oldv)
(:processed/files newv)) (:processed/files newv))
(let [total (:total/files newv) (let [completed (:processed/files newv)
completed (:processed/files newv)
progress (/ (* completed 100.0) total)
elapsed (tpoint)] elapsed (tpoint)]
(l/dbg :hint "progress" (l/dbg :hint "progress"
:completed (:processed/files newv) :completed (:processed/files newv)
:total (:total/files newv)
:progress (str (int progress) "%")
:elapsed (dt/format-duration elapsed)))))) :elapsed (dt/format-duration elapsed))))))
(defn- report-progress-teams (defn- report-progress-teams
@ -50,21 +47,13 @@
(fn [_ _ oldv newv] (fn [_ _ oldv newv]
(when (not= (:processed/teams oldv) (when (not= (:processed/teams oldv)
(:processed/teams newv)) (:processed/teams newv))
(let [total (:total/teams newv) (let [completed (:processed/teams newv)
completed (:processed/teams newv)
progress (/ (* completed 100.0) total)
progress (str (int progress) "%")
elapsed (dt/format-duration (tpoint))] elapsed (dt/format-duration (tpoint))]
(when (fn? on-progress) (when (fn? on-progress)
(on-progress {:total total (on-progress {:elapsed elapsed
:elapsed elapsed :completed completed}))
:completed completed
:progress progress}))
(l/dbg :hint "progress" (l/dbg :hint "progress"
:completed completed :completed completed
:progress progress
:elapsed elapsed))))) :elapsed elapsed)))))
(defn- get-total-files (defn- get-total-files
@ -92,7 +81,6 @@
res (db/exec-one! pool [sql])] res (db/exec-one! pool [sql])]
(:count res))) (:count res)))
(defn- mark-team-migration! (defn- mark-team-migration!
[{:keys [::db/pool]} team-id] [{:keys [::db/pool]} team-id]
;; We execute this out of transaction because we want this ;; We execute this out of transaction because we want this
@ -113,24 +101,68 @@
" WHERE id = ?")] " WHERE id = ?")]
(db/exec-one! pool [sql team-id]))) (db/exec-one! pool [sql team-id])))
;; (def ^:private sql:get-teams
;; "SELECT id, features
;; FROM team
;; WHERE deleted_at IS NULL
;; ORDER BY created_at DESC")
;; (def ^:private sql:get-teams
;; "SELECT t.id, t.features,
;; (SELECT count(*)
;; FROM file_media_object AS fmo
;; JOIN file AS f ON (f.id = fmo.file_id)
;; JOIN project AS p ON (p.id = f.project_id)
;; WHERE p.team_id = t.id
;; AND fmo.mtype = 'image/svg+xml'
;; AND fmo.is_local = false) AS graphics
;; FROM team AS t
;; ORDER BY t.created_at DESC")
(def ^:private sql:get-teams (def ^:private sql:get-teams
"SELECT id, features "WITH teams AS (
FROM team SELECT t.id, t.features,
WHERE deleted_at IS NULL (SELECT count(*)
ORDER BY created_at ASC") FROM file_media_object AS fmo
JOIN file AS f ON (f.id = fmo.file_id)
JOIN project AS p ON (p.id = f.project_id)
WHERE p.team_id = t.id
AND fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false) AS graphics
FROM team AS t
ORDER BY 3 ASC
)
SELECT * FROM teams ")
(defn- read-pred
[[op val field]]
(let [field (name field)]
(case op
:lt [(str/ffmt "WHERE % < ?" field) val]
:lte [(str/ffmt "WHERE % <= ?" field) val]
:gt [(str/ffmt "WHERE % > ?" field) val]
:gte [(str/ffmt "WHERE % >= ?" field) val]
:eq [(str/ffmt "WHERE % = ?" field) val]
[""])))
(defn- get-teams (defn- get-teams
[conn] [conn pred]
(->> (db/cursor conn sql:get-teams) (let [[sql & params] (read-pred pred)]
(map feat/decode-row))) (->> (db/cursor conn (apply vector (str sql:get-teams sql) params))
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(or (contains? features "ephimeral/v2-migration")
(contains? features "components/v2"))))
(map :id))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn migrate-file! (defn migrate-file!
[system file-id & {:keys [rollback? max-procs] [system file-id & {:keys [rollback?] :or {rollback? true}}]
:or {rollback? true}}]
(l/dbg :hint "migrate:start" :rollback rollback?) (l/dbg :hint "migrate:start" :rollback rollback?)
(let [tpoint (dt/tpoint) (let [tpoint (dt/tpoint)
@ -140,7 +172,7 @@
(binding [feat/*stats* (atom {})] (binding [feat/*stats* (atom {})]
(try (try
(-> (assoc system ::db/rollback rollback?) (-> (assoc system ::db/rollback rollback?)
(feat/migrate-file! file-id :max-procs max-procs)) (feat/migrate-file! file-id))
(-> (deref feat/*stats*) (-> (deref feat/*stats*)
(assoc :elapsed (dt/format-duration (tpoint)))) (assoc :elapsed (dt/format-duration (tpoint))))
@ -153,11 +185,11 @@
(l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed))))))) (l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed)))))))
(defn migrate-team! (defn migrate-team!
[{:keys [::db/pool] :as system} team-id & {:keys [rollback? skip-on-error validate? max-procs] [{:keys [::db/pool] :as system} team-id & {:keys [rollback? skip-on-graphic-error? validate? skip-mark?]
:or {rollback? true :or {rollback? true
skip-on-error true validate? true
validate? false skip-on-graphic-error? false
max-procs 1} skip-mark? false}
:as opts}] :as opts}]
(l/dbg :hint "migrate:start" :rollback rollback?) (l/dbg :hint "migrate:start" :rollback rollback?)
@ -165,34 +197,30 @@
(let [team-id (if (string? team-id) (let [team-id (if (string? team-id)
(parse-uuid team-id) (parse-uuid team-id)
team-id) team-id)
total (get-total-files pool :team-id team-id) stats (atom {})
stats (atom {:total/files total})
tpoint (dt/tpoint)] tpoint (dt/tpoint)]
(add-watch stats :progress-report (report-progress-files tpoint)) (add-watch stats :progress-report (report-progress-files tpoint))
(binding [feat/*stats* stats (binding [feat/*stats* stats]
feat/*skip-on-error* skip-on-error]
(try (try
(mark-team-migration! system team-id) (when-not skip-mark?
(mark-team-migration! system team-id))
(-> (assoc system ::db/rollback rollback?) (-> (assoc system ::db/rollback rollback?)
(feat/migrate-team! team-id (feat/migrate-team! team-id
:max-procs max-procs
:validate? validate? :validate? validate?
:throw-on-validate? (not skip-on-error))) :skip-on-graphics-error? skip-on-graphic-error?))
(print-stats! (print-stats!
(-> (deref feat/*stats*) (-> (deref feat/*stats*)
(dissoc :total/files)
(assoc :elapsed (dt/format-duration (tpoint))))) (assoc :elapsed (dt/format-duration (tpoint)))))
(catch Throwable cause (catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause)) (l/dbg :hint "migrate:error" :cause cause))
(finally (finally
(unmark-team-migration! system team-id) (when-not skip-mark?
(unmark-team-migration! system team-id))
(let [elapsed (dt/format-duration (tpoint))] (let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed))))))) (l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed)))))))
@ -202,100 +230,78 @@
This function starts multiple concurrent team migration processes This function starts multiple concurrent team migration processes
until thw maximum number of jobs is reached which by default has the until thw maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option. value of `1`. This is controled with the `:max-jobs` option."
Each tram migration process also can start multiple procs for [{:keys [::db/pool] :as system} & {:keys [max-jobs max-items max-time
graphics migration, the total of that procs is controled with the
`:max-procs` option.
Internally, the graphics migration process uses SVGO module which by
default has a limited number of maximum concurent
operations (globally), ensure setting up correct number with
PENPOT_SVGO_MAX_PROCS environment variable."
[{:keys [::db/pool] :as system} & {:keys [max-jobs max-procs max-items
rollback? validate? preset rollback? validate? preset
skip-on-error max-time pred max-procs skip-mark?
on-start on-progress on-error on-end] on-start on-progress on-error on-end]
:or {validate? false :or {validate? true
rollback? true rollback? true
skip-on-error true
preset :shutdown-on-failure preset :shutdown-on-failure
skip-mark? true
max-jobs 1 max-jobs 1
max-procs 10
max-items Long/MAX_VALUE} max-items Long/MAX_VALUE}
:as opts}] :as opts}]
(let [total (get-total-teams pool) (let [stats (atom {})
stats (atom {:total/teams (min total max-items)}) tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)
tpoint (dt/tpoint) factory (px/thread-factory :virtual false :prefix "penpot/migration/compv2/")
mtime (some-> max-time dt/duration) executor (px/cached-executor :factory factory)
max-procs (or max-procs max-jobs)
scope (px/structured-task-scope :preset preset :factory :virtual) sjobs (ps/create :permits max-jobs)
sjobs (ps/create :permits max-jobs) sprocs (ps/create :permits max-procs)
migrate-team migrate-team
(fn [{:keys [id features] :as team}] (fn [team-id]
(ps/acquire! sjobs) (ps/acquire! sjobs)
(let [ts (tpoint)] (let [ts (tpoint)]
(cond (if (and mtime (neg? (compare mtime ts)))
(and mtime (neg? (compare mtime ts)))
(do (do
(l/inf :hint "max time constraint reached" (l/inf :hint "max time constraint reached"
:team-id (str id) :team-id (str team-id)
:elapsed (dt/format-duration ts)) :elapsed (dt/format-duration ts))
(ps/release! sjobs) (ps/release! sjobs)
(reduced nil)) (reduced nil))
(or (contains? features "ephimeral/v2-migration") (px/run! executor (fn []
(contains? features "components/v2"))
(do
(l/dbg :hint "skip team" :team-id (str id))
(ps/release! sjobs))
:else
(px/submit! scope (fn []
(try (try
(mark-team-migration! system id) (when-not skip-mark?
(mark-team-migration! system team-id))
(-> (assoc system ::db/rollback rollback?) (-> (assoc system ::db/rollback rollback?)
(feat/migrate-team! id (feat/migrate-team! team-id :validate? validate?))
:max-procs max-procs
:validate? validate?
:throw-on-validate? (not skip-on-error)))
(catch Throwable cause (catch Throwable cause
(l/err :hint "unexpected error on processing team" (l/err :hint "unexpected error on processing team (skiping)"
:team-id (str id) :team-id (str team-id)
:cause cause)) :cause cause))
(finally (finally
(ps/release! sjobs) (ps/release! sjobs)
(unmark-team-migration! system id))))))))] (when-not skip-mark?
(unmark-team-migration! system team-id)))))))))]
(l/dbg :hint "migrate:start" (l/dbg :hint "migrate:start"
:rollback rollback? :rollback rollback?
:total total
:max-jobs max-jobs :max-jobs max-jobs
:max-procs max-procs
:max-items max-items) :max-items max-items)
(add-watch stats :progress-report (report-progress-teams tpoint on-progress)) (add-watch stats :progress-report (report-progress-teams tpoint on-progress))
(binding [feat/*stats* stats (binding [feat/*stats* stats
feat/*skip-on-error* skip-on-error] svgo/*semaphore* sprocs]
(try (try
(when (fn? on-start) (when (fn? on-start)
(on-start {:total total :rollback rollback?})) (on-start {:rollback rollback?}))
(db/tx-run! system (db/tx-run! system
(fn [{:keys [::db/conn]}] (fn [{:keys [::db/conn]}]
(run! (partial migrate-team) (run! (partial migrate-team)
(->> (get-teams conn) (->> (get-teams conn pred)
(take max-items))))) (take max-items)))))
(try
(p/await! scope)
(finally
(pu/close! scope)))
;; Close and await tasks
(pu/close! executor)
(if (fn? on-end) (if (fn? on-end)
(-> (deref stats) (-> (deref stats)

View file

@ -7,16 +7,10 @@
(ns app.svgo (ns app.svgo
"A SVG Optimizer service" "A SVG Optimizer service"
(:require (:require
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.jsrt :as jsrt] [app.common.jsrt :as jsrt]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec :as px]
[promesa.exec.bulkhead :as bh]
[promesa.exec.semaphore :as ps] [promesa.exec.semaphore :as ps]
[promesa.util :as pu])) [promesa.util :as pu]))
@ -26,40 +20,23 @@
nil) nil)
(defn optimize (defn optimize
[system data] [{pool ::optimizer} data]
(dm/assert! "expect data to be a string" (string? data)) (try
(some-> *semaphore* ps/acquire!)
(letfn [(optimize-fn [pool] (jsrt/run! pool
(jsrt/run! pool (fn [context]
(fn [context] (jsrt/set! context "svgData" data)
(jsrt/set! context "svgData" data) (jsrt/eval! context "penpotSvgo.optimize(svgData, {plugins: ['safeAndFastPreset']})")))
(jsrt/eval! context "penpotSvgo.optimize(svgData, {plugins: ['safeAndFastPreset']})"))))] (finally
(try (some-> *semaphore* ps/release!))))
(some-> *semaphore* ps/acquire!)
(let [{:keys [::jsrt/pool ::wrk/executor]} (::optimizer system)]
(dm/assert! "expect optimizer instance" (jsrt/pool? pool))
(px/invoke! executor (partial optimize-fn pool)))
(finally
(some-> *semaphore* ps/release!)))))
(s/def ::max-procs (s/nilable ::us/integer))
(defmethod ig/pre-init-spec ::optimizer [_]
(s/keys :req [::wrk/executor ::max-procs]))
(defmethod ig/prep-key ::optimizer
[_ cfg]
(merge {::max-procs 20} (d/without-nils cfg)))
(defmethod ig/init-key ::optimizer (defmethod ig/init-key ::optimizer
[_ {:keys [::wrk/executor ::max-procs]}] [_ _]
(l/inf :hint "initializing svg optimizer pool" :max-procs max-procs) (l/inf :hint "initializing svg optimizer pool")
(let [init (jsrt/resource->source "app/common/svg/optimizer.js") (let [init (jsrt/resource->source "app/common/svg/optimizer.js")]
executor (bh/create :type :executor :executor executor :permits max-procs)] (jsrt/pool :init init)))
{::jsrt/pool (jsrt/pool :init init)
::wrk/executor executor}))
(defmethod ig/halt-key! ::optimizer (defmethod ig/halt-key! ::optimizer
[_ {:keys [::jsrt/pool]}] [_ pool]
(l/info :hint "stopping svg optimizer pool") (l/info :hint "stopping svg optimizer pool")
(pu/close! pool)) (pu/close! pool))