Add initial impl for migrate-components-v2 manage.py command

This commit is contained in:
Andrey Antukh 2023-11-17 14:06:18 +01:00 committed by Andrés Moya
parent c948f1a087
commit 08166bcebf
5 changed files with 100 additions and 35 deletions

View file

@ -41,21 +41,26 @@
:elapsed (dt/format-duration elapsed))))))
(defn- report-progress-teams
[tpoint]
[tpoint on-progress]
(fn [_ _ oldv newv]
(when (not= (:processed/teams oldv)
(:processed/teams newv))
(let [total (:total/teams newv)
completed (:processed/teams newv)
progress (/ (* completed 100.0) total)
elapsed (tpoint)]
progress (str (int progress) "%")
elapsed (dt/format-duration (tpoint))]
(when (fn? on-progress)
(on-progress {:total total
:elapsed elapsed
:completed completed
:progress progress}))
(l/dbg :hint "progress"
:completed-teams (:processed/teams newv)
:completed-files (:processed/files newv)
:completed-graphics (:processed/graphics newv)
:completed-components (:processed/components newv)
:progress (str (int progress) "%")
:elapsed (dt/format-duration elapsed))))))
:completed completed
:progress progress
:elapsed elapsed)))))
(defn- get-total-files
[pool & {:keys [team-id]}]
@ -191,13 +196,23 @@
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end" :elapsed elapsed))))))
(defn default-on-end
[stats]
(print-stats!
(-> stats
(update :elapsed/total dt/format-duration)
(dissoc :total/teams))))
(defn migrate-teams!
[{:keys [::db/pool] :as system}
& {:keys [chunk-size max-jobs max-items start-at rollback? preset skip-on-error max-time validate?]
& {:keys [chunk-size max-jobs max-items start-at
rollback? validate? preset skip-on-error
max-time on-start on-progress on-error on-end]
:or {chunk-size 10000
validate? false
rollback? true
skip-on-error true
on-end default-on-end
preset :shutdown-on-failure
max-jobs Integer/MAX_VALUE
max-items Long/MAX_VALUE}}]
@ -242,7 +257,10 @@
tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)]
(add-watch stats :progress-report (report-progress-teams tpoint))
(when (fn? on-start)
(on-start {:total total :rollback rollback?}))
(add-watch stats :progress-report (report-progress-teams tpoint on-progress))
(binding [feat/*stats* stats
feat/*semaphore* sem
@ -257,13 +275,15 @@
(p/await! scope))
(print-stats!
(-> (deref feat/*stats*)
(assoc :elapsed/total (dt/format-duration (tpoint)))
(dissoc :total/teams)))
(when (fn? on-end)
(-> (deref stats)
(assoc :elapsed/total (tpoint))
(on-end)))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause))
(l/dbg :hint "migrate:error" :cause cause)
(when (fn? on-error)
(on-error cause)))
(finally
(let [elapsed (dt/format-duration (tpoint))]