Improve partitioning mechanism on compv2 migration

This commit is contained in:
Andrey Antukh 2024-04-04 10:38:35 +02:00
parent 29332b67f9
commit fed9346ec6

View file

@ -68,7 +68,8 @@
(def ^:private sql:get-teams-by-created-at (def ^:private sql:get-teams-by-created-at
"WITH teams AS ( "WITH teams AS (
SELECT id, features SELECT id, features,
row_number() OVER (ORDER BY created_at) AS rown
FROM team FROM team
WHERE deleted_at IS NULL WHERE deleted_at IS NULL
ORDER BY created_at DESC ORDER BY created_at DESC
@ -77,6 +78,7 @@
(def ^:private sql:get-teams-by-graphics (def ^:private sql:get-teams-by-graphics
"WITH teams AS ( "WITH teams AS (
SELECT t.id, t.features, SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT count(*) (SELECT count(*)
FROM file_media_object AS fmo FROM file_media_object AS fmo
JOIN file AS f ON (f.id = fmo.file_id) JOIN file AS f ON (f.id = fmo.file_id)
@ -93,6 +95,7 @@
(def ^:private sql:get-teams-by-activity (def ^:private sql:get-teams-by-activity
"WITH teams AS ( "WITH teams AS (
SELECT t.id, t.features, SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at)) (SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at))
FROM file AS f FROM file AS f
JOIN project AS p ON (f.project_id = p.id) JOIN project AS p ON (f.project_id = p.id)
@ -107,24 +110,16 @@
) )
SELECT * FROM teams %(pred)s") SELECT * FROM teams %(pred)s")
(def ^:private sql:get-teams-by-report
"WITH teams AS (
SELECT t.id t.features, mr.name
FROM migration_team_report AS mr
JOIN team AS t ON (t.id = mr.team_id)
WHERE t.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM teams %(pred)s")
(def ^:private sql:get-files-by-created-at (def ^:private sql:get-files-by-created-at
"SELECT id, features "SELECT id, features,
row_number() OVER (ORDER BY created_at) AS rown
FROM file FROM file
WHERE deleted_at IS NULL WHERE deleted_at IS NULL
ORDER BY created_at DESC") ORDER BY created_at DESC")
(def ^:private sql:get-files-by-modified-at (def ^:private sql:get-files-by-modified-at
"SELECT id, features "SELECT id, features
row_number() OVER (ORDER BY modified_at) AS rown
FROM file FROM file
WHERE deleted_at IS NULL WHERE deleted_at IS NULL
ORDER BY modified_at DESC") ORDER BY modified_at DESC")
@ -132,6 +127,7 @@
(def ^:private sql:get-files-by-graphics (def ^:private sql:get-files-by-graphics
"WITH files AS ( "WITH files AS (
SELECT f.id, f.features, SELECT f.id, f.features,
row_number() OVER (ORDER BY modified_at) AS rown,
(SELECT count(*) FROM file_media_object AS fmo (SELECT count(*) FROM file_media_object AS fmo
WHERE fmo.mtype = 'image/svg+xml' WHERE fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false AND fmo.is_local = false
@ -141,16 +137,6 @@
ORDER BY 3 ASC ORDER BY 3 ASC
) SELECT * FROM files %(pred)s") ) SELECT * FROM files %(pred)s")
(def ^:private sql:get-files-by-report
"WITH files AS (
SELECT f.id, f.features, mr.label
FROM migration_file_report AS mr
JOIN file AS f ON (f.id = mr.file_id)
WHERE f.deleted_at IS NULL
AND mr.error IS NOT NULL
ORDER BY mr.created_at
) SELECT id, features FROM files %(pred)s")
(defn- read-pred (defn- read-pred
[entries] [entries]
(let [entries (if (and (vector? entries) (let [entries (if (and (vector? entries)
@ -181,8 +167,7 @@
sql (case query sql (case query
:created-at sql:get-teams-by-created-at :created-at sql:get-teams-by-created-at
:activity sql:get-teams-by-activity :activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics :graphics sql:get-teams-by-graphics)
:report sql:get-teams-by-report)
sql (if pred sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)] (let [[pred-sql & pred-params] (read-pred pred)]
(apply vector (apply vector
@ -193,8 +178,7 @@
(->> (db/cursor conn sql {:chunk-size 500}) (->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row) (map feat/decode-row)
(remove (fn [{:keys [features]}] (remove (fn [{:keys [features]}]
(contains? features "components/v2"))) (contains? features "components/v2"))))))
(map :id))))
(defn- get-files (defn- get-files
[conn query pred] [conn query pred]
@ -202,8 +186,7 @@
sql (case query sql (case query
:created-at sql:get-files-by-created-at :created-at sql:get-files-by-created-at
:modified-at sql:get-files-by-modified-at :modified-at sql:get-files-by-modified-at
:graphics sql:get-files-by-graphics :graphics sql:get-files-by-graphics)
:report sql:get-files-by-report)
sql (if pred sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)] (let [[pred-sql & pred-params] (read-pred pred)]
(apply vector (apply vector
@ -214,60 +197,7 @@
(->> (db/cursor conn sql {:chunk-size 500}) (->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row) (map feat/decode-row)
(remove (fn [{:keys [features]}] (remove (fn [{:keys [features]}]
(contains? features "components/v2"))) (contains? features "components/v2"))))))
(map :id))))
(def ^:private sql:team-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_team_report (
id bigserial NOT NULL,
label text NOT NULL,
team_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id))")
(def ^:private sql:file-report-table
"CREATE UNLOGGED TABLE IF NOT EXISTS migration_file_report (
id bigserial NOT NULL,
label text NOT NULL,
file_id UUID NOT NULL,
error text NULL,
created_at timestamptz NOT NULL DEFAULT now(),
elapsed bigint NOT NULL,
PRIMARY KEY (label, created_at, id))")
(defn- create-report-tables!
[system]
(db/exec-one! system [sql:team-report-table])
(db/exec-one! system [sql:file-report-table]))
(defn- clean-team-reports!
[system label]
(db/delete! system :migration-team-report {:label label}))
(defn- team-report!
[system team-id label elapsed error]
(db/insert! system :migration-team-report
{:label label
:team-id team-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
(defn- clean-file-reports!
[system label]
(db/delete! system :migration-file-report {:label label}))
(defn- file-report!
[system file-id label elapsed error]
(db/insert! system :migration-file-report
{:label label
:file-id file-id
:elapsed (inst-ms elapsed)
:error error}
{::db/return-keys false}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
@ -347,7 +277,7 @@
"A REPL helper for migrate all teams. "A REPL helper for migrate all teams.
This function starts multiple concurrent team migration processes This function starts multiple concurrent team migration processes
until thw maximum number of jobs is reached which by default has the until the maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option. value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify If you want to run this on multiple machines you will need to specify
@ -393,15 +323,12 @@
(try (try
(db/tx-run! (assoc main/system ::db/rollback rollback?) (db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system] (fn [system]
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-team! system team-id (feat/migrate-team! system team-id
:label label :label label
:validate? validate? :validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?))) :skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label)
(team-report! main/system team-id label (tpoint) nil))
(catch Throwable cause (catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)" (l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id)) :team-id (str team-id))
@ -411,10 +338,7 @@
{:team-id team-id} {:team-id team-id}
cause)) cause))
(swap! stats update :errors (fnil inc 0)) (swap! stats update :errors (fnil inc 0)))
(when (string? label)
(team-report! main/system team-id label (tpoint) (ex-message cause))))
(finally (finally
(ps/release! sjobs))))) (ps/release! sjobs)))))
@ -445,23 +369,18 @@
feat/*cache* cache feat/*cache* cache
svgo/*semaphore* sprocs] svgo/*semaphore* sprocs]
(try (try
(when (string? label)
(create-report-tables! main/system)
(clean-team-reports! main/system label))
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"]) (db/exec! conn ["SET LOCAL statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) (db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-team (run! process-team
(->> (get-teams conn query pred) (->> (get-teams conn query pred)
(filter (fn [team-id] (filter (fn [{:keys [rown]}]
(if (int? partitions) (if (int? partitions)
(= current-partition (-> (uuid/hash-int team-id) (= current-partition (inc (mod rown partitions)))
(mod partitions)
(inc)))
true))) true)))
(map :id)
(take max-items))) (take max-items)))
;; Close and await tasks ;; Close and await tasks
@ -480,7 +399,6 @@
:rollback rollback? :rollback rollback?
:elapsed elapsed))))))) :elapsed elapsed)))))))
(defn migrate-files! (defn migrate-files!
"A REPL helper for migrate all files. "A REPL helper for migrate all files.
@ -532,15 +450,12 @@
(try (try
(db/tx-run! (assoc main/system ::db/rollback rollback?) (db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system] (fn [system]
(db/exec-one! system ["SET idle_in_transaction_session_timeout = 0"]) (db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-file! system file-id (feat/migrate-file! system file-id
:label label :label label
:validate? validate? :validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?))) :skip-on-graphic-error? skip-on-graphic-error?)))
(when (string? label)
(file-report! main/system file-id label (tpoint) nil))
(catch Throwable cause (catch Throwable cause
(l/wrn :hint "unexpected error on processing file (skiping)" (l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id)) :file-id (str file-id))
@ -550,10 +465,7 @@
{:file-id file-id} {:file-id file-id}
cause)) cause))
(swap! stats update :errors (fnil inc 0)) (swap! stats update :errors (fnil inc 0)))
(when (string? label)
(file-report! main/system file-id label (tpoint) (ex-message cause))))
(finally (finally
(ps/release! sjobs))))) (ps/release! sjobs)))))
@ -584,23 +496,18 @@
feat/*cache* cache feat/*cache* cache
svgo/*semaphore* sprocs] svgo/*semaphore* sprocs]
(try (try
(when (string? label)
(create-report-tables! main/system)
(clean-file-reports! main/system label))
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"]) (db/exec! conn ["SET LOCAL statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) (db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-file (run! process-file
(->> (get-files conn query pred) (->> (get-files conn query pred)
(filter (fn [file-id] (filter (fn [{:keys [rown] :as row}]
(if (int? partitions) (if (int? partitions)
(= current-partition (-> (uuid/hash-int file-id) (= current-partition (inc (mod rown partitions)))
(mod partitions)
(inc)))
true))) true)))
(map :id)
(take max-items))) (take max-items)))
;; Close and await tasks ;; Close and await tasks
@ -619,7 +526,6 @@
:rollback rollback? :rollback rollback?
:elapsed elapsed))))))) :elapsed elapsed)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE PROCESS HELPERS ;; FILE PROCESS HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;