mirror of
https://github.com/penpot/penpot.git
synced 2025-08-07 14:38:33 +02:00
♻️ Refactor srepl helpers
This commit is contained in:
parent
76a6f077a6
commit
f3e9efa6fe
3 changed files with 135 additions and 541 deletions
|
@ -9,7 +9,6 @@
|
|||
(:refer-clojure :exclude [parse-uuid])
|
||||
#_:clj-kondo/ignore
|
||||
(:require
|
||||
[app.auth :refer [derive-password]]
|
||||
[app.common.data :as d]
|
||||
[app.common.exceptions :as ex]
|
||||
[app.common.features :as cfeat]
|
||||
|
@ -41,6 +40,7 @@
|
|||
[promesa.exec.csp :as sp]))
|
||||
|
||||
(def ^:dynamic *conn* nil)
|
||||
(def ^:dynamic *system* nil)
|
||||
|
||||
(defn println!
|
||||
[& params]
|
||||
|
@ -53,44 +53,22 @@
|
|||
v
|
||||
(d/parse-uuid v)))
|
||||
|
||||
(defn resolve-connectable
|
||||
[o]
|
||||
(if (db/connection? o)
|
||||
o
|
||||
(if (db/pool? o)
|
||||
o
|
||||
(or (::db/conn o)
|
||||
(::db/pool o)))))
|
||||
|
||||
(defn reset-password!
|
||||
"Reset a password to a specific one for a concrete user or all users
|
||||
if email is `:all` keyword."
|
||||
[system & {:keys [email password] :or {password "123123"} :as params}]
|
||||
(us/verify! (contains? params :email) "`email` parameter is mandatory")
|
||||
(db/with-atomic [conn (:app.db/pool system)]
|
||||
(let [password (derive-password password)]
|
||||
(if (= email :all)
|
||||
(db/exec! conn ["update profile set password=?" password])
|
||||
(let [email (str/lower email)]
|
||||
(db/exec! conn ["update profile set password=? where email=?" password email]))))))
|
||||
|
||||
(defn reset-file-data!
|
||||
"Hardcode replace of the data of one file."
|
||||
[system id data]
|
||||
(db/with-atomic [conn (:app.db/pool system)]
|
||||
(db/update! conn :file
|
||||
{:data data}
|
||||
{:id id})))
|
||||
(db/tx-run! system (fn [system]
|
||||
(db/update! system :file
|
||||
{:data data}
|
||||
{:id id}))))
|
||||
|
||||
(defn get-file
|
||||
"Get the migrated data of one file."
|
||||
[system id]
|
||||
(db/with-atomic [conn (:app.db/pool system)]
|
||||
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
|
||||
(-> (db/get-by-id conn :file id)
|
||||
(update :data blob/decode)
|
||||
(update :data pmg/migrate-data)
|
||||
(files/process-pointers deref)))))
|
||||
(db/run! system
|
||||
(fn [{:keys [::db/conn]}]
|
||||
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
|
||||
(-> (files/get-file conn id)
|
||||
(files/process-pointers deref))))))
|
||||
|
||||
(defn validate
|
||||
"Validate structure, referencial integrity and semantic coherence of
|
||||
|
@ -145,95 +123,99 @@
|
|||
(defn update-file!
|
||||
"Apply a function to the data of one file. Optionally save the changes or not.
|
||||
The function receives the decoded and migrated file data."
|
||||
[system & {:keys [update-fn id save? migrate? inc-revn?]
|
||||
:or {save? false migrate? true inc-revn? true}}]
|
||||
(db/with-atomic [conn (:app.db/pool system)]
|
||||
(let [file (-> (db/get-by-id conn :file id {::db/for-update? true})
|
||||
(update :features db/decode-pgarray #{}))]
|
||||
(binding [*conn* conn
|
||||
pmap/*tracked* (atom {})
|
||||
pmap/*load-fn* (partial files/load-pointer conn id)
|
||||
cfeat/*wrap-with-pointer-map-fn*
|
||||
(if (contains? (:features file) "fdata/pointer-map") pmap/wrap identity)
|
||||
cfeat/*wrap-with-objects-map-fn*
|
||||
(if (contains? (:features file) "fdata/objectd-map") omap/wrap identity)]
|
||||
(let [file (-> file
|
||||
(update :data blob/decode)
|
||||
(cond-> migrate? (update :data pmg/migrate-data))
|
||||
(update-fn)
|
||||
(cond-> inc-revn? (update :revn inc)))]
|
||||
(when save?
|
||||
(let [features (db/create-array conn "text" (:features file))
|
||||
data (blob/encode (:data file))]
|
||||
(db/update! conn :file
|
||||
{:data data
|
||||
:revn (:revn file)
|
||||
:features features}
|
||||
{:id id})
|
||||
[system & {:keys [update-fn id rollback? migrate? inc-revn?]
|
||||
:or {rollback? true migrate? true inc-revn? true}}]
|
||||
(letfn [(process-file [conn {:keys [features] :as file}]
|
||||
(binding [pmap/*tracked* (atom {})
|
||||
pmap/*load-fn* (partial files/load-pointer conn id)
|
||||
cfeat/*wrap-with-pointer-map-fn*
|
||||
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
|
||||
cfeat/*wrap-with-objects-map-fn*
|
||||
(if (contains? features "fdata/objectd-map") omap/wrap identity)]
|
||||
|
||||
(let [file (cond-> (update-fn file)
|
||||
inc-revn? (update :revn inc))
|
||||
features (db/create-array conn "text" (:features file))
|
||||
data (blob/encode (:data file))]
|
||||
|
||||
(db/update! conn :file
|
||||
{:data data
|
||||
:revn (:revn file)
|
||||
:features features}
|
||||
{:id id}))
|
||||
|
||||
(when (contains? (:features file) "fdata/pointer-map")
|
||||
(files/persist-pointers! conn id))))
|
||||
(files/persist-pointers! conn id))
|
||||
|
||||
(dissoc file :data))))))
|
||||
(dissoc file :data)))]
|
||||
|
||||
(def ^:private sql:retrieve-files-chunk
|
||||
"SELECT id, name, features, created_at, revn, data FROM file
|
||||
WHERE created_at < ? AND deleted_at is NULL
|
||||
ORDER BY created_at desc LIMIT ?")
|
||||
(db/tx-run! system
|
||||
(fn [{:keys [::db/conn] :as system}]
|
||||
(binding [*conn* conn *system* system]
|
||||
(try
|
||||
(->> (files/get-file conn id :migrate? migrate?)
|
||||
(process-file conn))
|
||||
(finally
|
||||
(when rollback?
|
||||
(db/rollback! conn)))))))))
|
||||
|
||||
(defn analyze-files
|
||||
"Apply a function to all files in the database, reading them in
|
||||
batches. Do not change data.
|
||||
|
||||
The `on-file` parameter should be a function that receives the file
|
||||
and the previous state and returns the new state."
|
||||
and the previous state and returns the new state.
|
||||
|
||||
Emits rollback at the end of operation."
|
||||
[system & {:keys [chunk-size max-items start-at on-file on-error on-end on-init with-libraries?]
|
||||
:or {chunk-size 10 max-items Long/MAX_VALUE}}]
|
||||
(letfn [(get-chunk [conn cursor]
|
||||
(let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])]
|
||||
[(some->> rows peek :created-at) (seq rows)]))
|
||||
(let [sql (str "SELECT id, created_at FROM file "
|
||||
" WHERE created_at < ? AND deleted_at is NULL "
|
||||
" ORDER BY created_at desc LIMIT ?")
|
||||
rows (db/exec! conn [sql cursor chunk-size])]
|
||||
[(some->> rows peek :created-at) (map :id rows)]))
|
||||
|
||||
(get-candidates [conn]
|
||||
(->> (d/iteration (partial get-chunk conn)
|
||||
:vf second
|
||||
:kf first
|
||||
:initk (or start-at (dt/now)))
|
||||
(take max-items)
|
||||
(map #(-> %
|
||||
(update :data blob/decode)
|
||||
(update :features db/decode-pgarray #{})))))
|
||||
(take max-items)))
|
||||
|
||||
(on-error* [cause file]
|
||||
(println "unexpected exception happened on processing file: " (:id file))
|
||||
(strace/print-stack-trace cause))]
|
||||
(strace/print-stack-trace cause))
|
||||
|
||||
(when (fn? on-init) (on-init))
|
||||
(process-file [conn file-id]
|
||||
(let [file (binding [pmap/*load-fn* (partial files/load-pointer conn file-id)]
|
||||
(-> (files/get-file conn file-id)
|
||||
(files/process-pointers deref)))
|
||||
|
||||
(db/with-atomic [conn (:app.db/pool system)]
|
||||
(doseq [file (get-candidates conn)]
|
||||
(binding [*conn* conn
|
||||
pmap/*tracked* (atom {})
|
||||
pmap/*load-fn* (partial files/load-pointer conn (:id file))
|
||||
cfeat/*wrap-with-pointer-map-fn*
|
||||
(if (contains? (:features file) "fdata/pointer-map") pmap/wrap identity)
|
||||
cfeat/*wrap-with-objects-map-fn*
|
||||
(if (contains? (:features file) "fdata/objects-map") omap/wrap identity)]
|
||||
(let [libraries (when with-libraries?
|
||||
(->> (files/get-file-libraries conn (:id file))
|
||||
(into [file] (map (fn [{:keys [id]}]
|
||||
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
|
||||
(-> (files-update/get-file conn id)
|
||||
(update :data blob/decode)
|
||||
(files/process-pointers deref)))))) ; ensure all pointers resolved
|
||||
(d/index-by :id)))]
|
||||
(try
|
||||
(if with-libraries?
|
||||
(on-file file libraries)
|
||||
(on-file file))
|
||||
(catch Throwable cause
|
||||
((or on-error on-error*) cause file)))))))
|
||||
libs (when with-libraries?
|
||||
(->> (files/get-file-libraries conn file-id)
|
||||
(into [file] (map (fn [{:keys [id]}]
|
||||
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
|
||||
(-> (files/get-file conn id)
|
||||
(files/process-pointers deref))))))
|
||||
(d/index-by :id)))]
|
||||
(try
|
||||
(if with-libraries?
|
||||
(on-file file libs)
|
||||
(on-file file))
|
||||
(catch Throwable cause
|
||||
((or on-error on-error*) cause file)))))]
|
||||
|
||||
(when (fn? on-end) (on-end))))
|
||||
(db/tx-run! system
|
||||
(fn [{:keys [::db/conn] :as system}]
|
||||
(try
|
||||
(binding [*conn* conn *system* system]
|
||||
(when (fn? on-init) (on-init))
|
||||
(run! (partial process-file conn) (get-candidates conn)))
|
||||
(finally
|
||||
(when (fn? on-end)
|
||||
(ex/ignoring (on-end)))
|
||||
(db/rollback! conn)))))))
|
||||
|
||||
(defn process-files!
|
||||
"Apply a function to all files in the database, reading them in
|
||||
|
@ -246,15 +228,18 @@
|
|||
on-file
|
||||
on-error
|
||||
on-end
|
||||
on-init]
|
||||
on-init
|
||||
rollback?]
|
||||
:or {chunk-size 10
|
||||
max-items Long/MAX_VALUE
|
||||
workers 1}}]
|
||||
|
||||
workers 1
|
||||
rollback? true}}]
|
||||
(letfn [(get-chunk [conn cursor]
|
||||
(let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])]
|
||||
[(some->> rows peek :created-at)
|
||||
(map #(update % :features db/decode-pgarray #{}) rows)]))
|
||||
(let [sql (str "SELECT id, created_at FROM file "
|
||||
" WHERE created_at < ? AND deleted_at is NULL "
|
||||
" ORDER BY created_at desc LIMIT ?")
|
||||
rows (db/exec! conn [sql cursor chunk-size])]
|
||||
[(some->> rows peek :created-at) (map :id rows)]))
|
||||
|
||||
(get-candidates [conn]
|
||||
(->> (d/iteration (partial get-chunk conn)
|
||||
|
@ -267,38 +252,43 @@
|
|||
(println! "unexpected exception happened on processing file: " (:id file))
|
||||
(strace/print-stack-trace cause))
|
||||
|
||||
(process-file [conn file]
|
||||
(process-file [conn file-id]
|
||||
(try
|
||||
(binding [*conn* conn
|
||||
pmap/*tracked* (atom {})
|
||||
pmap/*load-fn* (partial files/load-pointer conn (:id file))
|
||||
cfeat/*wrap-with-pointer-map-fn*
|
||||
(if (contains? (:features file) "fdata/pointer-map") pmap/wrap identity)
|
||||
cfeat/*wrap-with-objects-map-fn*
|
||||
(if (contains? (:features file) "fdata/objectd-map") omap/wrap identity)]
|
||||
(on-file file))
|
||||
(let [{:keys [features] :as file} (files/get-file conn file-id)]
|
||||
(binding [pmap/*tracked* (atom {})
|
||||
pmap/*load-fn* (partial files/load-pointer conn file-id)
|
||||
cfeat/*wrap-with-pointer-map-fn*
|
||||
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
|
||||
cfeat/*wrap-with-objects-map-fn*
|
||||
(if (contains? features "fdata/objectd-map") omap/wrap identity)]
|
||||
|
||||
(on-file file)
|
||||
|
||||
(when (contains? features "fdata/pointer-map")
|
||||
(files/persist-pointers! conn file-id))))
|
||||
|
||||
(catch Throwable cause
|
||||
((or on-error on-error*) cause file))))
|
||||
((or on-error on-error*) cause file-id))))
|
||||
|
||||
(run-worker [in index]
|
||||
(db/with-atomic [conn pool]
|
||||
(loop [i 0]
|
||||
(when-let [file (sp/take! in)]
|
||||
(println! "=> worker: index:" index "| loop:" i "| file:" (:id file) "|" (px/get-name))
|
||||
(process-file conn file)
|
||||
(recur (inc i))))))
|
||||
(db/tx-run! system
|
||||
(fn [{:keys [::db/conn] :as system}]
|
||||
(binding [*conn* conn *system* system]
|
||||
(loop [i 0]
|
||||
(when-let [file-id (sp/take! in)]
|
||||
(println! "=> worker: index:" index "| loop:" i "| file:" (str file-id) "|" (px/get-name))
|
||||
(process-file conn file-id)
|
||||
(recur (inc i)))))
|
||||
|
||||
(when rollback?
|
||||
(db/rollback! conn)))))
|
||||
|
||||
(run-producer [input]
|
||||
(db/with-atomic [conn pool]
|
||||
(doseq [file (get-candidates conn)]
|
||||
(println! "=> producer:" (:id file) "|" (px/get-name))
|
||||
(sp/put! input file))
|
||||
(sp/close! input)))
|
||||
|
||||
(start-worker [input index]
|
||||
(px/thread
|
||||
{:name (str "penpot/srepl/worker/" index)}
|
||||
(run-worker input index)))]
|
||||
(doseq [file-id (get-candidates conn)]
|
||||
(println! "=> producer:" file-id "|" (px/get-name))
|
||||
(sp/put! input file-id))
|
||||
(sp/close! input)))]
|
||||
|
||||
(when (fn? on-init) (on-init))
|
||||
|
||||
|
@ -307,19 +297,12 @@
|
|||
{:name "penpot/srepl/producer"}
|
||||
(run-producer input))
|
||||
threads (->> (range workers)
|
||||
(map (partial start-worker input))
|
||||
(map (fn [index]
|
||||
(px/thread
|
||||
{:name (str "penpot/srepl/worker/" index)}
|
||||
(run-worker input index))))
|
||||
(cons producer)
|
||||
(doall))]
|
||||
|
||||
(run! p/await! threads)
|
||||
(when (fn? on-end) (on-end)))))
|
||||
|
||||
(defn update-pages
|
||||
"Apply a function to all pages of one file. The function receives a page and returns an updated page."
|
||||
[data f]
|
||||
(update data :pages-index update-vals f))
|
||||
|
||||
(defn update-shapes
|
||||
"Apply a function to all shapes of one page The function receives a shape and returns an updated shape"
|
||||
[page f]
|
||||
(update page :objects update-vals f))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue