Add support for reporting and partitions on comp-v2 migration code

This commit is contained in:
Andrey Antukh 2024-01-20 22:25:05 +01:00
parent db21525485
commit 0d33779c95
3 changed files with 182 additions and 64 deletions

View file

@ -1104,7 +1104,7 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn migrate-file! (defn migrate-file!
[system file-id & {:keys [validate? skip-on-graphic-error?]}] [system file-id & {:keys [validate? skip-on-graphic-error? label]}]
(let [tpoint (dt/tpoint)] (let [tpoint (dt/tpoint)]
(binding [*file-stats* (atom {}) (binding [*file-stats* (atom {})
*skip-on-graphic-error* skip-on-graphic-error?] *skip-on-graphic-error* skip-on-graphic-error?]
@ -1119,7 +1119,9 @@
(fn [system] (fn [system]
(try (try
(binding [*system* system] (binding [*system* system]
(fsnap/take-file-snapshot! system {:file-id file-id :label "migration/components-v2"}) (when (string? label)
(fsnap/take-file-snapshot! system {:file-id file-id
:label (str "migration/" label)}))
(process-file system file-id :validate? validate?)) (process-file system file-id :validate? validate?))
(catch Throwable cause (catch Throwable cause
@ -1145,7 +1147,7 @@
(some-> *team-stats* (swap! update :processed/files (fnil inc 0))))))))) (some-> *team-stats* (swap! update :processed/files (fnil inc 0)))))))))
(defn migrate-team! (defn migrate-team!
[system team-id & {:keys [validate? skip-on-graphic-error?]}] [system team-id & {:keys [validate? skip-on-graphic-error? label]}]
(l/dbg :hint "migrate:team:start" (l/dbg :hint "migrate:team:start"
:team-id (dm/str team-id)) :team-id (dm/str team-id))
@ -1156,30 +1158,30 @@
migrate-file migrate-file
(fn [system file-id] (fn [system file-id]
(migrate-file! system file-id (migrate-file! system file-id
:label label
:validate? validate? :validate? validate?
:skip-on-graphics-error? skip-on-graphic-error?)) :skip-on-graphics-error? skip-on-graphic-error?))
migrate-team migrate-team
(fn [{:keys [::db/conn] :as system} {:keys [id features] :as team}] (fn [{:keys [::db/conn] :as system} team-id]
(let [features (-> features (let [{:keys [id features]} (get-team system team-id)]
(disj "ephimeral/v2-migration") (if (contains? features "components/v2")
(conj "components/v2") (l/inf :hint "team already migrated")
(conj "layout/grid") (let [features (-> features
(conj "styles/v2"))] (disj "ephimeral/v2-migration")
(conj "components/v2")
(conj "layout/grid")
(conj "styles/v2"))]
(run! (partial migrate-file system) (run! (partial migrate-file system)
(get-and-lock-files conn id)) (get-and-lock-files conn id))
(update-team-features! conn id features)))] (update-team-features! conn id features)))))]
(binding [*team-stats* (atom {}) (binding [*team-stats* (atom {})
*team-id* team-id] *team-id* team-id]
(try (try
(db/tx-run! system (fn [system] (db/tx-run! system migrate-team team-id)
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"])
(let [team (get-team system team-id)]
(if (contains? (:features team) "components/v2")
(l/inf :hint "team already migrated")
(migrate-team system team)))))
(catch Throwable cause (catch Throwable cause
(vreset! err true) (vreset! err true)
(throw cause)) (throw cause))

View file

@ -6,8 +6,10 @@
(ns app.srepl.components-v2 (ns app.srepl.components-v2
(:require (:require
[app.common.data :as d]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.pprint :as pp] [app.common.pprint :as pp]
[app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.features.components-v2 :as feat] [app.features.components-v2 :as feat]
[app.main :as main] [app.main :as main]
@ -57,13 +59,15 @@
:completed completed :completed completed
:elapsed elapsed))))) :elapsed elapsed)))))
(def ^:private sql:get-teams-1 (def ^:private sql:get-teams-by-created-at
"SELECT id, features "WITH teams AS (
FROM team SELECT id, features
WHERE deleted_at IS NULL FROM team
ORDER BY created_at DESC") WHERE deleted_at IS NULL
ORDER BY created_at DESC
) SELECT * FROM TEAMS %(pred)s")
(def ^:private sql:get-teams-2 (def ^:private sql:get-teams-by-graphics
"WITH teams AS ( "WITH teams AS (
SELECT t.id, t.features, SELECT t.id, t.features,
(SELECT count(*) (SELECT count(*)
@ -74,9 +78,37 @@
AND fmo.mtype = 'image/svg+xml' AND fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false) AS graphics AND fmo.is_local = false) AS graphics
FROM team AS t FROM team AS t
WHERE t.deleted_at IS NULL
ORDER BY 3 ASC ORDER BY 3 ASC
) )
SELECT * FROM teams ") SELECT * FROM teams %(pred)s")
(def ^:private sql:get-teams-by-activity
"WITH teams AS (
SELECT t.id, t.features,
(SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at))
FROM file AS f
JOIN project AS p ON (f.project_id = p.id)
WHERE p.team_id = t.id) AS updated_at,
(SELECT coalesce(count(*), 0)
FROM file AS f
JOIN project AS p ON (f.project_id = p.id)
WHERE p.team_id = t.id) AS total_files
FROM team AS t
WHERE t.deleted_at IS NULL
ORDER BY 3 DESC, 4 DESC
)
SELECT * FROM teams %(pred)s")
(def ^:private sql:get-teams-by-report
"WITH teams AS (
SELECT t.id t.features, mr.name
FROM migration_report AS mr
JOIN team AS t ON (t.id = mr.team_id)
WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM teams %(pred)s")
(defn- read-pred (defn- read-pred
[entries] [entries]
@ -103,26 +135,62 @@
(apply vector sql params)))))) (apply vector sql params))))))
(defn- get-teams (defn- get-teams
[conn pred] [conn query pred]
(let [sql (if pred (let [query (d/nilv query :created-at)
(let [[sql & params] (read-pred pred)] sql (case query
(apply vector (str sql:get-teams-2 sql) params)) :created-at sql:get-teams-by-created-at
[sql:get-teams-1])] :activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics
:report sql:get-teams-by-report)
(->> (db/cursor conn sql) sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
(str/format sql {:pred pred-sql})
pred-params))
[(str/format sql {:pred ""})])]
(->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row) (map feat/decode-row)
(remove (fn [{:keys [features]}] (remove (fn [{:keys [features]}]
(or (contains? features "ephimeral/v2-migration") (contains? features "components/v2")))
(contains? features "components/v2"))))
(map :id)))) (map :id))))
(def ^:private sql:report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_report (
id bigserial NOT NULL,
label text NOT NULL,
team_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id)
)")
(defn- create-report-table!
[system]
(db/exec-one! system [sql:report-table]))
(defn- clean-reports!
[system label]
(db/delete! system :migration-report {:label label}))
(defn- report!
[system team-id label elapsed error]
(db/insert! system :migration-report
{:label label
:team-id team-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn migrate-file! (defn migrate-file!
[file-id & {:keys [rollback? validate?] :or {rollback? true validate? false}}] [file-id & {:keys [rollback? validate? label] :or {rollback? true validate? false}}]
(l/dbg :hint "migrate:start" :rollback rollback?) (l/dbg :hint "migrate:start" :rollback rollback?)
(let [tpoint (dt/tpoint) (let [tpoint (dt/tpoint)
file-id (if (string? file-id) file-id (if (string? file-id)
@ -131,7 +199,9 @@
(binding [feat/*stats* (atom {})] (binding [feat/*stats* (atom {})]
(try (try
(-> (assoc main/system ::db/rollback rollback?) (-> (assoc main/system ::db/rollback rollback?)
(feat/migrate-file! file-id :validate? validate?)) (feat/migrate-file! file-id
:validate? validate?
:label label))
(-> (deref feat/*stats*) (-> (deref feat/*stats*)
(assoc :elapsed (dt/format-duration (tpoint)))) (assoc :elapsed (dt/format-duration (tpoint))))
@ -144,7 +214,7 @@
(l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed))))))) (l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed)))))))
(defn migrate-team! (defn migrate-team!
[team-id & {:keys [rollback? skip-on-graphic-error? validate?] [team-id & {:keys [rollback? skip-on-graphic-error? validate? label]
:or {rollback? true :or {rollback? true
validate? true validate? true
skip-on-graphic-error? false}}] skip-on-graphic-error? false}}]
@ -163,6 +233,7 @@
(try (try
(-> (assoc main/system ::db/rollback rollback?) (-> (assoc main/system ::db/rollback rollback?)
(feat/migrate-team! team-id (feat/migrate-team! team-id
:label label
:validate? validate? :validate? validate?
:skip-on-graphics-error? skip-on-graphic-error?)) :skip-on-graphics-error? skip-on-graphic-error?))
(print-stats! (print-stats!
@ -181,17 +252,29 @@
This function starts multiple concurrent team migration processes This function starts multiple concurrent team migration processes
until thw maximum number of jobs is reached which by default has the until thw maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option." value of `1`. This is controled with the `:max-jobs` option.
[& {:keys [max-jobs max-items max-time rollback? validate? If you want to run this on multiple machines you will need to specify
the total number of partitions and the current partition.
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache on-start on-progress on-error on-end pred max-procs cache on-start on-progress on-error on-end
skip-on-graphic-error?] skip-on-graphic-error? label partitions current-partition]
:or {validate? false :or {validate? false
rollback? true rollback? true
max-jobs 1 max-jobs 1
skip-on-graphic-error? true skip-on-graphic-error? true
max-items Long/MAX_VALUE}}] max-items Long/MAX_VALUE}}]
(when (int? partitions)
(when-not (int? current-partition)
(throw (IllegalArgumentException. "missing `current-partition` parameter")))
(when-not (<= 0 current-partition partitions)
(throw (IllegalArgumentException. "invalid value on `current-partition` parameter"))))
(let [stats (atom {}) (let [stats (atom {})
tpoint (dt/tpoint) tpoint (dt/tpoint)
mtime (some-> max-time dt/duration) mtime (some-> max-time dt/duration)
@ -207,8 +290,32 @@
(cache/create :executor :same-thread (cache/create :executor :same-thread
:max-items cache) :max-items cache)
nil) nil)
migrate-team migrate-team
(fn [team-id]
(let [tpoint (dt/tpoint)]
(try
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"])
(feat/migrate-team! system team-id
:label label
:validate? validate?
:skip-on-graphics-error? skip-on-graphic-error?)))
(when (string? label)
(report! main/system label team-id (tpoint) nil))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id)
:cause cause)
(when (string? label)
(report! main/system label team-id (tpoint) (ex-message cause))))
(finally
(ps/release! sjobs)))))
process-team
(fn [team-id] (fn [team-id]
(ps/acquire! sjobs) (ps/acquire! sjobs)
(let [ts (tpoint)] (let [ts (tpoint)]
@ -220,25 +327,15 @@
(ps/release! sjobs) (ps/release! sjobs)
(reduced nil)) (reduced nil))
(px/run! executor (fn [] (px/run! executor (partial migrate-team team-id)))))]
(try
(-> (assoc main/system ::db/rollback rollback?)
(feat/migrate-team! team-id
:validate? validate?
:skip-on-graphics-error? skip-on-graphic-error?))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id)
:cause cause))
(finally
(ps/release! sjobs))))))))]
(l/dbg :hint "migrate:start" (l/dbg :hint "migrate:start"
:label label
:rollback rollback? :rollback rollback?
:max-jobs max-jobs :max-jobs max-jobs
:max-items max-items) :max-items max-items
:partitions partitions
:current-partition current-partition)
(add-watch stats :progress-report (report-progress-teams tpoint on-progress)) (add-watch stats :progress-report (report-progress-teams tpoint on-progress))
@ -249,16 +346,26 @@
(when (fn? on-start) (when (fn? on-start)
(on-start {:rollback rollback?})) (on-start {:rollback rollback?}))
(db/tx-run! main/system (when (string? label)
(fn [{:keys [::db/conn]}] (create-report-table! main/system)
(db/exec! conn ["SET statement_timeout = 0;"]) (clean-reports! main/system label))
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0;"])
(run! migrate-team
(->> (get-teams conn pred)
(take max-items)))))
;; Close and await tasks (db/tx-run! main/system
(pu/close! executor) (fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(run! process-team
(->> (get-teams conn query pred)
(take max-items)
(filter (fn [team-id]
(if (and (int? current-partition) (int? partitions))
(let [partition (mod (uuid/hash-int team-id) partitions)]
(= (dec current-partition) partition))
true)))))
;; Close and await tasks
(pu/close! executor)))
(if (fn? on-end) (if (fn? on-end)
(-> (deref stats) (-> (deref stats)

View file

@ -75,3 +75,12 @@
with base62. It is only safe to use with uuid v4 and penpot custom v8" with base62. It is only safe to use with uuid v4 and penpot custom v8"
[id] [id]
(impl/short-v8 (dm/str id)))) (impl/short-v8 (dm/str id))))
#?(:clj
(defn hash-int
[id]
(let [a (.getMostSignificantBits ^UUID id)
b (.getLeastSignificantBits ^UUID id)]
(+ (clojure.lang.Murmur3/hashLong a)
(clojure.lang.Murmur3/hashLong b)))))