mirror of
https://github.com/penpot/penpot.git
synced 2025-08-07 14:38:33 +02:00
♻️ Simplify components-v2 migration functions impl
This commit is contained in:
parent
02d8208553
commit
0a5e15b916
2 changed files with 192 additions and 168 deletions
|
@ -10,10 +10,12 @@
|
|||
[app.common.pprint :as pp]
|
||||
[app.db :as db]
|
||||
[app.features.components-v2 :as feat]
|
||||
[app.main :as main]
|
||||
[app.svgo :as svgo]
|
||||
[app.util.cache :as cache]
|
||||
[app.util.time :as dt]
|
||||
[app.worker :as-alias wrk]
|
||||
[cuerdas.core :as str]
|
||||
[promesa.core :as p]
|
||||
[promesa.exec :as px]
|
||||
[promesa.exec.semaphore :as ps]
|
||||
[promesa.util :as pu]))
|
||||
|
@ -36,8 +38,7 @@
|
|||
(fn [_ _ oldv newv]
|
||||
(when (not= (:processed/files oldv)
|
||||
(:processed/files newv))
|
||||
(let [completed (:processed/files newv)
|
||||
elapsed (tpoint)]
|
||||
(let [elapsed (tpoint)]
|
||||
(l/dbg :hint "progress"
|
||||
:completed (:processed/files newv)
|
||||
:elapsed (dt/format-duration elapsed))))))
|
||||
|
@ -56,71 +57,13 @@
|
|||
:completed completed
|
||||
:elapsed elapsed)))))
|
||||
|
||||
(defn- get-total-files
|
||||
[pool & {:keys [team-id]}]
|
||||
(if (some? team-id)
|
||||
(let [sql (str/concat
|
||||
"SELECT count(f.id) AS count FROM file AS f "
|
||||
" JOIN project AS p ON (p.id = f.project_id) "
|
||||
" WHERE p.team_id = ? AND f.deleted_at IS NULL "
|
||||
" AND p.deleted_at IS NULL")
|
||||
res (db/exec-one! pool [sql team-id])]
|
||||
(:count res))
|
||||
(def ^:private sql:get-teams-1
|
||||
"SELECT id, features
|
||||
FROM team
|
||||
WHERE deleted_at IS NULL
|
||||
ORDER BY created_at DESC")
|
||||
|
||||
(let [sql (str/concat
|
||||
"SELECT count(id) AS count FROM file "
|
||||
" WHERE deleted_at IS NULL")
|
||||
res (db/exec-one! pool [sql])]
|
||||
(:count res))))
|
||||
|
||||
(defn- get-total-teams
|
||||
[pool]
|
||||
(let [sql (str/concat
|
||||
"SELECT count(id) AS count FROM team "
|
||||
" WHERE deleted_at IS NULL")
|
||||
res (db/exec-one! pool [sql])]
|
||||
(:count res)))
|
||||
|
||||
(defn- mark-team-migration!
|
||||
[{:keys [::db/pool]} team-id]
|
||||
;; We execute this out of transaction because we want this
|
||||
;; change to be visible to all other sessions before starting
|
||||
;; the migration
|
||||
(let [sql (str "UPDATE team SET features = "
|
||||
" array_append(features, 'ephimeral/v2-migration') "
|
||||
" WHERE id = ?")]
|
||||
(db/exec-one! pool [sql team-id])))
|
||||
|
||||
(defn- unmark-team-migration!
|
||||
[{:keys [::db/pool]} team-id]
|
||||
;; We execute this out of transaction because we want this
|
||||
;; change to be visible to all other sessions before starting
|
||||
;; the migration
|
||||
(let [sql (str "UPDATE team SET features = "
|
||||
" array_remove(features, 'ephimeral/v2-migration') "
|
||||
" WHERE id = ?")]
|
||||
(db/exec-one! pool [sql team-id])))
|
||||
|
||||
;; (def ^:private sql:get-teams
|
||||
;; "SELECT id, features
|
||||
;; FROM team
|
||||
;; WHERE deleted_at IS NULL
|
||||
;; ORDER BY created_at DESC")
|
||||
|
||||
;; (def ^:private sql:get-teams
|
||||
;; "SELECT t.id, t.features,
|
||||
;; (SELECT count(*)
|
||||
;; FROM file_media_object AS fmo
|
||||
;; JOIN file AS f ON (f.id = fmo.file_id)
|
||||
;; JOIN project AS p ON (p.id = f.project_id)
|
||||
;; WHERE p.team_id = t.id
|
||||
;; AND fmo.mtype = 'image/svg+xml'
|
||||
;; AND fmo.is_local = false) AS graphics
|
||||
;; FROM team AS t
|
||||
;; ORDER BY t.created_at DESC")
|
||||
|
||||
|
||||
(def ^:private sql:get-teams
|
||||
(def ^:private sql:get-teams-2
|
||||
"WITH teams AS (
|
||||
SELECT t.id, t.features,
|
||||
(SELECT count(*)
|
||||
|
@ -136,20 +79,37 @@
|
|||
SELECT * FROM teams ")
|
||||
|
||||
(defn- read-pred
|
||||
[[op val field]]
|
||||
(let [field (name field)]
|
||||
(case op
|
||||
:lt [(str/ffmt "WHERE % < ?" field) val]
|
||||
:lte [(str/ffmt "WHERE % <= ?" field) val]
|
||||
:gt [(str/ffmt "WHERE % > ?" field) val]
|
||||
:gte [(str/ffmt "WHERE % >= ?" field) val]
|
||||
:eq [(str/ffmt "WHERE % = ?" field) val]
|
||||
[""])))
|
||||
[entries]
|
||||
(let [entries (if (and (vector? entries)
|
||||
(keyword? (first entries)))
|
||||
[entries]
|
||||
entries)]
|
||||
(loop [params []
|
||||
queries []
|
||||
entries (seq entries)]
|
||||
(if-let [[op val field] (first entries)]
|
||||
(let [field (name field)
|
||||
cond (case op
|
||||
:lt (str/ffmt "% < ?" field)
|
||||
:lte (str/ffmt "% <= ?" field)
|
||||
:gt (str/ffmt "% > ?" field)
|
||||
:gte (str/ffmt "% >= ?" field)
|
||||
:eq (str/ffmt "% = ?" field))]
|
||||
(recur (conj params val)
|
||||
(conj queries cond)
|
||||
(rest entries)))
|
||||
|
||||
(let [sql (apply str "WHERE " (str/join " AND " queries))]
|
||||
(apply vector sql params))))))
|
||||
|
||||
(defn- get-teams
|
||||
[conn pred]
|
||||
(let [[sql & params] (read-pred pred)]
|
||||
(->> (db/cursor conn (apply vector (str sql:get-teams sql) params))
|
||||
(let [sql (if pred
|
||||
(let [[sql & params] (read-pred pred)]
|
||||
(apply vector (str sql:get-teams-2 sql) params))
|
||||
[sql:get-teams-1])]
|
||||
|
||||
(->> (db/cursor conn sql)
|
||||
(map feat/decode-row)
|
||||
(remove (fn [{:keys [features]}]
|
||||
(or (contains? features "ephimeral/v2-migration")
|
||||
|
@ -162,8 +122,7 @@
|
|||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defn migrate-file!
|
||||
[system file-id & {:keys [rollback?] :or {rollback? true}}]
|
||||
|
||||
[file-id & {:keys [rollback? validate?] :or {rollback? true validate? false}}]
|
||||
(l/dbg :hint "migrate:start" :rollback rollback?)
|
||||
(let [tpoint (dt/tpoint)
|
||||
file-id (if (string? file-id)
|
||||
|
@ -171,8 +130,8 @@
|
|||
file-id)]
|
||||
(binding [feat/*stats* (atom {})]
|
||||
(try
|
||||
(-> (assoc system ::db/rollback rollback?)
|
||||
(feat/migrate-file! file-id))
|
||||
(-> (assoc main/system ::db/rollback rollback?)
|
||||
(feat/migrate-file! file-id :validate? validate?))
|
||||
|
||||
(-> (deref feat/*stats*)
|
||||
(assoc :elapsed (dt/format-duration (tpoint))))
|
||||
|
@ -185,12 +144,10 @@
|
|||
(l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed)))))))
|
||||
|
||||
(defn migrate-team!
|
||||
[{:keys [::db/pool] :as system} team-id & {:keys [rollback? skip-on-graphic-error? validate? skip-mark?]
|
||||
:or {rollback? true
|
||||
validate? true
|
||||
skip-on-graphic-error? false
|
||||
skip-mark? false}
|
||||
:as opts}]
|
||||
[team-id & {:keys [rollback? skip-on-graphic-error? validate?]
|
||||
:or {rollback? true
|
||||
validate? true
|
||||
skip-on-graphic-error? false}}]
|
||||
|
||||
(l/dbg :hint "migrate:start" :rollback rollback?)
|
||||
|
||||
|
@ -204,10 +161,7 @@
|
|||
|
||||
(binding [feat/*stats* stats]
|
||||
(try
|
||||
(when-not skip-mark?
|
||||
(mark-team-migration! system team-id))
|
||||
|
||||
(-> (assoc system ::db/rollback rollback?)
|
||||
(-> (assoc main/system ::db/rollback rollback?)
|
||||
(feat/migrate-team! team-id
|
||||
:validate? validate?
|
||||
:skip-on-graphics-error? skip-on-graphic-error?))
|
||||
|
@ -219,9 +173,6 @@
|
|||
(l/dbg :hint "migrate:error" :cause cause))
|
||||
|
||||
(finally
|
||||
(when-not skip-mark?
|
||||
(unmark-team-migration! system team-id))
|
||||
|
||||
(let [elapsed (dt/format-duration (tpoint))]
|
||||
(l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed)))))))
|
||||
|
||||
|
@ -232,28 +183,31 @@
|
|||
until thw maximum number of jobs is reached which by default has the
|
||||
value of `1`. This is controled with the `:max-jobs` option."
|
||||
|
||||
[{:keys [::db/pool] :as system} & {:keys [max-jobs max-items max-time
|
||||
rollback? validate? preset
|
||||
pred max-procs skip-mark?
|
||||
on-start on-progress on-error on-end]
|
||||
:or {validate? true
|
||||
rollback? true
|
||||
preset :shutdown-on-failure
|
||||
skip-mark? true
|
||||
max-jobs 1
|
||||
max-items Long/MAX_VALUE}
|
||||
:as opts}]
|
||||
[& {:keys [max-jobs max-items max-time rollback? validate?
|
||||
pred max-procs cache on-start on-progress on-error on-end
|
||||
skip-on-graphic-error?]
|
||||
:or {validate? false
|
||||
rollback? true
|
||||
max-jobs 1
|
||||
skip-on-graphic-error? true
|
||||
max-items Long/MAX_VALUE}}]
|
||||
|
||||
(let [stats (atom {})
|
||||
tpoint (dt/tpoint)
|
||||
mtime (some-> max-time dt/duration)
|
||||
|
||||
factory (px/thread-factory :virtual false :prefix "penpot/migration/compv2/")
|
||||
factory (px/thread-factory :virtual false :prefix "penpot/migration/")
|
||||
executor (px/cached-executor :factory factory)
|
||||
|
||||
max-procs (or max-procs max-jobs)
|
||||
sjobs (ps/create :permits max-jobs)
|
||||
sprocs (ps/create :permits max-procs)
|
||||
|
||||
cache (if (int? cache)
|
||||
(cache/create :executor (::wrk/executor main/system)
|
||||
:max-items cache)
|
||||
nil)
|
||||
|
||||
migrate-team
|
||||
(fn [team-id]
|
||||
(ps/acquire! sjobs)
|
||||
|
@ -268,18 +222,18 @@
|
|||
|
||||
(px/run! executor (fn []
|
||||
(try
|
||||
(when-not skip-mark?
|
||||
(mark-team-migration! system team-id))
|
||||
(-> (assoc system ::db/rollback rollback?)
|
||||
(feat/migrate-team! team-id :validate? validate?))
|
||||
(-> (assoc main/system ::db/rollback rollback?)
|
||||
(feat/migrate-team! team-id
|
||||
:validate? validate?
|
||||
:skip-on-graphics-error? skip-on-graphic-error?))
|
||||
|
||||
(catch Throwable cause
|
||||
(l/err :hint "unexpected error on processing team (skiping)"
|
||||
(l/wrn :hint "unexpected error on processing team (skiping)"
|
||||
:team-id (str team-id)
|
||||
:cause cause))
|
||||
|
||||
(finally
|
||||
(ps/release! sjobs)
|
||||
(when-not skip-mark?
|
||||
(unmark-team-migration! system team-id)))))))))]
|
||||
(ps/release! sjobs))))))))]
|
||||
|
||||
(l/dbg :hint "migrate:start"
|
||||
:rollback rollback?
|
||||
|
@ -289,14 +243,17 @@
|
|||
(add-watch stats :progress-report (report-progress-teams tpoint on-progress))
|
||||
|
||||
(binding [feat/*stats* stats
|
||||
feat/*cache* cache
|
||||
svgo/*semaphore* sprocs]
|
||||
(try
|
||||
(when (fn? on-start)
|
||||
(on-start {:rollback rollback?}))
|
||||
|
||||
(db/tx-run! system
|
||||
(db/tx-run! main/system
|
||||
(fn [{:keys [::db/conn]}]
|
||||
(run! (partial migrate-team)
|
||||
(db/exec! conn ["SET statement_timeout = 0;"])
|
||||
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0;"])
|
||||
(run! migrate-team
|
||||
(->> (get-teams conn pred)
|
||||
(take max-items)))))
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue