Add proper file iteration on srepl helpers

This commit is contained in:
Andrey Antukh 2024-01-05 11:45:05 +01:00
parent a6e802ba2a
commit 2698944ec7
2 changed files with 103 additions and 123 deletions

View file

@ -24,7 +24,7 @@
[app.db :as db] [app.db :as db]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.features.fdata :as feat.fdata] [app.features.fdata :as feat.fdata]
[app.main :refer [system]] [app.main :as main]
[app.rpc.commands.files :as files] [app.rpc.commands.files :as files]
[app.rpc.commands.files-update :as files-update] [app.rpc.commands.files-update :as files-update]
[app.util.blob :as blob] [app.util.blob :as blob]
@ -55,16 +55,17 @@
(defn reset-file-data! (defn reset-file-data!
"Hardcode replace of the data of one file." "Hardcode replace of the data of one file."
[system id data] [id data]
(db/tx-run! system (fn [system] (db/tx-run! main/system
(db/update! system :file (fn [system]
{:data data} (db/update! system :file
{:id id})))) {:data data}
{:id id}))))
(defn get-file (defn get-file
"Get the migrated data of one file." "Get the migrated data of one file."
[system id & {:keys [migrate?] :or {migrate? true}}] [id & {:keys [migrate?] :or {migrate? true}}]
(db/run! system (db/run! main/system
(fn [system] (fn [system]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? migrate?) (-> (files/get-file system id :migrate? migrate?)
@ -73,8 +74,8 @@
(defn validate (defn validate
"Validate structure, referencial integrity and semantic coherence of "Validate structure, referencial integrity and semantic coherence of
all contents of a file. Returns a list of errors." all contents of a file. Returns a list of errors."
[system id] [id]
(db/tx-run! system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(let [id (if (string? id) (parse-uuid id) id) (let [id (if (string? id) (parse-uuid id) id)
@ -90,8 +91,8 @@
(defn repair! (defn repair!
"Repair the list of errors detected by validation." "Repair the list of errors detected by validation."
[system id] [id]
(db/tx-run! system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(binding [pmap/*tracked* (pmap/create-tracked) (binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system id)] pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
@ -127,8 +128,8 @@
(defn update-file! (defn update-file!
"Apply a function to the data of one file. Optionally save the changes or not. "Apply a function to the data of one file. Optionally save the changes or not.
The function receives the decoded and migrated file data." The function receives the decoded and migrated file data."
[system & {:keys [update-fn id rollback? migrate? inc-revn?] [& {:keys [update-fn id rollback? migrate? inc-revn?]
:or {rollback? true migrate? true inc-revn? true}}] :or {rollback? true migrate? true inc-revn? true}}]
(letfn [(process-file [{:keys [::db/conn] :as system} {:keys [features] :as file}] (letfn [(process-file [{:keys [::db/conn] :as system} {:keys [features] :as file}]
(binding [pmap/*tracked* (pmap/create-tracked) (binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system id) pmap/*load-fn* (partial feat.fdata/load-pointer system id)
@ -153,7 +154,7 @@
(dissoc file :data)))] (dissoc file :data)))]
(db/tx-run! system (db/tx-run! main/system
(fn [system] (fn [system]
(binding [*system* system] (binding [*system* system]
(try (try
@ -163,6 +164,12 @@
(when rollback? (when rollback?
(db/rollback! system))))))))) (db/rollback! system)))))))))
(def ^:private sql:get-file-ids
"SELECT id FROM file
WHERE created_at < ? AND deleted_at is NULL
ORDER BY created_at DESC")
(defn analyze-files (defn analyze-files
"Apply a function to all files in the database, reading them in "Apply a function to all files in the database, reading them in
batches. Do not change data. batches. Do not change data.
@ -171,21 +178,11 @@
and the previous state and returns the new state. and the previous state and returns the new state.
Emits rollback at the end of operation." Emits rollback at the end of operation."
[system & {:keys [chunk-size max-items start-at on-file on-error on-end on-init with-libraries?] [& {:keys [max-items start-at on-file on-error on-end on-init with-libraries?]}]
:or {chunk-size 10 max-items Long/MAX_VALUE}}] (letfn [(get-candidates [conn]
(letfn [(get-chunk [conn cursor] (cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
(let [sql (str "SELECT id, created_at FROM file " (some? max-items)
" WHERE created_at < ? AND deleted_at is NULL " (take max-items)))
" ORDER BY created_at desc LIMIT ?")
rows (db/exec! conn [sql cursor chunk-size])]
[(some->> rows peek :created-at) (map :id rows)]))
(get-candidates [conn]
(->> (d/iteration (partial get-chunk conn)
:vf second
:kf first
:initk (or start-at (dt/now)))
(take max-items)))
(on-error* [cause file] (on-error* [cause file]
(println "unexpected exception happened on processing file: " (:id file)) (println "unexpected exception happened on processing file: " (:id file))
@ -210,12 +207,13 @@
(catch Throwable cause (catch Throwable cause
((or on-error on-error*) cause file)))))] ((or on-error on-error*) cause file)))))]
(db/tx-run! system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(try (try
(binding [*system* system] (binding [*system* system]
(when (fn? on-init) (on-init)) (when (fn? on-init) (on-init))
(run! (partial process-file system) (get-candidates conn))) (run! (partial process-file system)
(get-candidates conn)))
(finally (finally
(when (fn? on-end) (when (fn? on-end)
(ex/ignoring (on-end))) (ex/ignoring (on-end)))
@ -224,33 +222,20 @@
(defn process-files! (defn process-files!
"Apply a function to all files in the database, reading them in "Apply a function to all files in the database, reading them in
batches." batches."
[& {:keys [max-items
[system & {:keys [chunk-size workers
max-items start-at
workers on-file
start-at on-error
on-file on-end
on-error on-init
on-end rollback?]
on-init :or {workers 1
rollback?] rollback? true}}]
:or {chunk-size 10 (letfn [(get-candidates [conn]
max-items Long/MAX_VALUE (cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
workers 1 (some? max-items)
rollback? true}}] (take max-items)))
(letfn [(get-chunk [conn cursor]
(let [sql (str "SELECT id, created_at FROM file "
" WHERE created_at < ? AND deleted_at is NULL "
" ORDER BY created_at desc LIMIT ?")
rows (db/exec! conn [sql cursor chunk-size])]
[(some->> rows peek :created-at) (map :id rows)]))
(get-candidates [conn]
(->> (d/iteration (partial get-chunk conn)
:vf second
:kf first
:initk (or start-at (dt/now)))
(take max-items)))
(on-error* [cause file] (on-error* [cause file]
(println! "unexpected exception happened on processing file: " (:id file)) (println! "unexpected exception happened on processing file: " (:id file))
@ -275,7 +260,7 @@
((or on-error on-error*) cause file-id)))) ((or on-error on-error*) cause file-id))))
(run-worker [in index] (run-worker [in index]
(db/tx-run! system (db/tx-run! main/system
(fn [system] (fn [system]
(binding [*system* system] (binding [*system* system]
(loop [i 0] (loop [i 0]
@ -288,15 +273,16 @@
(db/rollback! system))))) (db/rollback! system)))))
(run-producer [input] (run-producer [input]
(db/tx-run! system (fn [{:keys [::db/conn]}] (db/tx-run! main/system
(doseq [file-id (get-candidates conn)] (fn [{:keys [::db/conn]}]
(println! "=> producer:" file-id "|" (px/get-name)) (doseq [file-id (get-candidates conn)]
(sp/put! input file-id)) (println! "=> producer:" file-id "|" (px/get-name))
(sp/close! input))))] (sp/put! input file-id))
(sp/close! input))))]
(when (fn? on-init) (on-init)) (when (fn? on-init) (on-init))
(let [input (sp/chan :buf chunk-size) (let [input (sp/chan :buf 25)
producer (px/thread producer (px/thread
{:name "penpot/srepl/producer"} {:name "penpot/srepl/producer"}
(run-producer input)) (run-producer input))

View file

@ -38,52 +38,45 @@
[cuerdas.core :as str])) [cuerdas.core :as str]))
(defn print-available-tasks (defn print-available-tasks
[system] []
(let [tasks (:app.worker/registry system)] (let [tasks (:app.worker/registry main/system)]
(p/pprint (keys tasks) :level 200))) (p/pprint (keys tasks) :level 200)))
(defn run-task! (defn run-task!
([system name] ([tname]
(run-task! system name {})) (run-task! tname {}))
([system name params] ([tname params]
(let [tasks (:app.worker/registry system)] (let [tasks (:app.worker/registry main/system)
(if-let [task-fn (get tasks name)] tname (if (keyword? tname) (name tname) name)]
(if-let [task-fn (get tasks tname)]
(task-fn params) (task-fn params)
(println (format "no task '%s' found" name)))))) (println (format "no task '%s' found" tname))))))
(defn schedule-task! (defn schedule-task!
([system name] ([name]
(schedule-task! system name {})) (schedule-task! name {}))
([system name props] ([name props]
(let [pool (:app.db/pool system)] (let [pool (:app.db/pool main/system)]
(wrk/submit! (wrk/submit!
::wrk/conn pool ::wrk/conn pool
::wrk/task name ::wrk/task name
::wrk/props props)))) ::wrk/props props))))
(defn send-test-email! (defn send-test-email!
[system destination] [destination]
(us/verify!
:expr (some? system)
:hint "system should be provided")
(us/verify! (us/verify!
:expr (string? destination) :expr (string? destination)
:hint "destination should be provided") :hint "destination should be provided")
(let [handler (:app.email/sendmail system)] (let [handler (:app.email/sendmail main/system)]
(handler {:body "test email" (handler {:body "test email"
:subject "test email" :subject "test email"
:to [destination]}))) :to [destination]})))
(defn resend-email-verification-email! (defn resend-email-verification-email!
[system email] [email]
(us/verify! (let [sprops (:app.setup/props main/system)
:expr (some? system) pool (:app.db/pool main/system)
:hint "system should be provided")
(let [sprops (:app.setup/props system)
pool (:app.db/pool system)
profile (profile/get-profile-by-email pool email)] profile (profile/get-profile-by-email pool email)]
(auth/send-email-verification! pool sprops profile) (auth/send-email-verification! pool sprops profile)
@ -92,8 +85,8 @@
(defn mark-profile-as-active! (defn mark-profile-as-active!
"Mark the profile blocked and removes all the http sessiones "Mark the profile blocked and removes all the http sessiones
associated with the profile-id." associated with the profile-id."
[system email] [email]
(db/with-atomic [conn (:app.db/pool system)] (db/with-atomic [conn (:app.db/pool main/system)]
(when-let [profile (db/get* conn :profile (when-let [profile (db/get* conn :profile
{:email (str/lower email)} {:email (str/lower email)}
{:columns [:id :email]})] {:columns [:id :email]})]
@ -104,8 +97,8 @@
(defn mark-profile-as-blocked! (defn mark-profile-as-blocked!
"Mark the profile blocked and removes all the http sessiones "Mark the profile blocked and removes all the http sessiones
associated with the profile-id." associated with the profile-id."
[system email] [email]
(db/with-atomic [conn (:app.db/pool system)] (db/with-atomic [conn (:app.db/pool main/system)]
(when-let [profile (db/get* conn :profile (when-let [profile (db/get* conn :profile
{:email (str/lower email)} {:email (str/lower email)}
{:columns [:id :email]})] {:columns [:id :email]})]
@ -117,9 +110,9 @@
(defn reset-password! (defn reset-password!
"Reset a password to a specific one for a concrete user or all users "Reset a password to a specific one for a concrete user or all users
if email is `:all` keyword." if email is `:all` keyword."
[system & {:keys [email password] :or {password "123123"} :as params}] [& {:keys [email password] :or {password "123123"} :as params}]
(us/verify! (contains? params :email) "`email` parameter is mandatory") (us/verify! (contains? params :email) "`email` parameter is mandatory")
(db/with-atomic [conn (:app.db/pool system)] (db/with-atomic [conn (:app.db/pool main/system)]
(let [password (derive-password password)] (let [password (derive-password password)]
(if (= email :all) (if (= email :all)
(db/exec! conn ["update profile set password=?" password]) (db/exec! conn ["update profile set password=?" password])
@ -127,21 +120,21 @@
(db/exec! conn ["update profile set password=? where email=?" password email])))))) (db/exec! conn ["update profile set password=? where email=?" password email]))))))
(defn enable-objects-map-feature-on-file! (defn enable-objects-map-feature-on-file!
[system & {:keys [save? id]}] [& {:keys [save? id]}]
(h/update-file! system (h/update-file! main/system
:id id :id id
:update-fn features.fdata/enable-objects-map :update-fn features.fdata/enable-objects-map
:save? save?)) :save? save?))
(defn enable-pointer-map-feature-on-file! (defn enable-pointer-map-feature-on-file!
[system & {:keys [save? id]}] [& {:keys [save? id]}]
(h/update-file! system (h/update-file! main/system
:id id :id id
:update-fn features.fdata/enable-pointer-map :update-fn features.fdata/enable-pointer-map
:save? save?)) :save? save?))
(defn enable-team-feature! (defn enable-team-feature!
[system team-id feature] [team-id feature]
(dm/verify! (dm/verify!
"feature should be supported" "feature should be supported"
(contains? cfeat/supported-features feature)) (contains? cfeat/supported-features feature))
@ -149,7 +142,7 @@
(let [team-id (if (string? team-id) (let [team-id (if (string? team-id)
(parse-uuid team-id) (parse-uuid team-id)
team-id)] team-id)]
(db/tx-run! system (db/tx-run! main/system
(fn [{:keys [::db/conn]}] (fn [{:keys [::db/conn]}]
(let [team (-> (db/get conn :team {:id team-id}) (let [team (-> (db/get conn :team {:id team-id})
(update :features db/decode-pgarray #{})) (update :features db/decode-pgarray #{}))
@ -161,7 +154,7 @@
:enabled)))))) :enabled))))))
(defn disable-team-feature! (defn disable-team-feature!
[system team-id feature] [team-id feature]
(dm/verify! (dm/verify!
"feature should be supported" "feature should be supported"
(contains? cfeat/supported-features feature)) (contains? cfeat/supported-features feature))
@ -169,7 +162,7 @@
(let [team-id (if (string? team-id) (let [team-id (if (string? team-id)
(parse-uuid team-id) (parse-uuid team-id)
team-id)] team-id)]
(db/tx-run! system (db/tx-run! main/system
(fn [{:keys [::db/conn]}] (fn [{:keys [::db/conn]}]
(let [team (-> (db/get conn :team {:id team-id}) (let [team (-> (db/get conn :team {:id team-id})
(update :features db/decode-pgarray #{})) (update :features db/decode-pgarray #{}))
@ -181,9 +174,9 @@
:disabled)))))) :disabled))))))
(defn enable-storage-features-on-file! (defn enable-storage-features-on-file!
[system & {:as params}] [& {:as params}]
(enable-objects-map-feature-on-file! system params) (enable-objects-map-feature-on-file! main/system params)
(enable-pointer-map-feature-on-file! system params)) (enable-pointer-map-feature-on-file! main/system params))
(defn instrument-var (defn instrument-var
[var] [var]
@ -207,13 +200,13 @@
(defn take-file-snapshot! (defn take-file-snapshot!
"An internal helper that persist the file snapshot using non-gc "An internal helper that persist the file snapshot using non-gc
collectable file-changes entry." collectable file-changes entry."
[system & {:keys [file-id label]}] [& {:keys [file-id label]}]
(let [file-id (h/parse-uuid file-id)] (let [file-id (h/parse-uuid file-id)]
(db/tx-run! system fsnap/take-file-snapshot! {:file-id file-id :label label}))) (db/tx-run! main/system fsnap/take-file-snapshot! {:file-id file-id :label label})))
(defn restore-file-snapshot! (defn restore-file-snapshot!
[system & {:keys [file-id id]}] [& {:keys [file-id id]}]
(db/tx-run! system (db/tx-run! main/system
(fn [cfg] (fn [cfg]
(let [file-id (h/parse-uuid file-id) (let [file-id (h/parse-uuid file-id)
id (h/parse-uuid id)] id (h/parse-uuid id)]
@ -224,12 +217,13 @@
(defn list-file-snapshots! (defn list-file-snapshots!
[system & {:keys [file-id limit]}] [& {:keys [file-id limit]}]
(db/tx-run! system (fn [system] (db/tx-run! main/system
(let [params {:file-id (h/parse-uuid file-id) (fn [system]
:limit limit}] (let [params {:file-id (h/parse-uuid file-id)
(->> (fsnap/get-file-snapshots system (d/without-nils params)) :limit limit}]
(print-table [:id :revn :created-at :label])))))) (->> (fsnap/get-file-snapshots system (d/without-nils params))
(print-table [:id :revn :created-at :label]))))))
(defn notify! (defn notify!
[{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level] [{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level]
@ -334,12 +328,12 @@
(into #{}) (into #{})
(run! send)))) (run! send))))
(defn duplicate-team (defn duplicate-team
[system team-id & {:keys [name]}] [team-id & {:keys [name]}]
(let [team-id (if (string? team-id) (parse-uuid team-id) team-id) (let [team-id (if (string? team-id) (parse-uuid team-id) team-id)
name (or name (fn [prev-name] name (or name (fn [prev-name]
(str/ffmt "Cloned: % (%)" prev-name (dt/format-instant (dt/now)))))] (str/ffmt "Cloned: % (%)" prev-name (dt/format-instant (dt/now)))))]
(db/tx-run! system (fn [cfg] (db/tx-run! main/system
(db/exec-one! cfg ["SET CONSTRAINTS ALL DEFERRED"]) (fn [cfg]
(mgmt/duplicate-team cfg :team-id team-id :name name))))) (db/exec-one! cfg ["SET CONSTRAINTS ALL DEFERRED"])
(mgmt/duplicate-team cfg :team-id team-id :name name)))))