mirror of
https://github.com/penpot/penpot.git
synced 2025-05-15 19:16:38 +02:00
📎 Add srepl helper for process files
This commit is contained in:
parent
1bb3a3a084
commit
804addfa66
3 changed files with 98 additions and 5 deletions
|
@ -19,10 +19,11 @@
|
||||||
[app.common.schema.generators :as sg]
|
[app.common.schema.generators :as sg]
|
||||||
[app.common.spec :as us]
|
[app.common.spec :as us]
|
||||||
[app.common.transit :as t]
|
[app.common.transit :as t]
|
||||||
|
[app.common.types.file :as ctf]
|
||||||
[app.common.uuid :as uuid]
|
[app.common.uuid :as uuid]
|
||||||
[app.config :as cfg]
|
[app.config :as cfg]
|
||||||
[app.main :as main]
|
[app.main :as main]
|
||||||
[app.srepl.helpers]
|
[app.srepl.helpers :as srepl.helpers]
|
||||||
[app.srepl.main :as srepl]
|
[app.srepl.main :as srepl]
|
||||||
[app.util.blob :as blob]
|
[app.util.blob :as blob]
|
||||||
[app.util.json :as json]
|
[app.util.json :as json]
|
||||||
|
@ -48,7 +49,8 @@
|
||||||
[malli.generator :as mg]
|
[malli.generator :as mg]
|
||||||
[malli.registry :as mr]
|
[malli.registry :as mr]
|
||||||
[malli.transform :as mt]
|
[malli.transform :as mt]
|
||||||
[malli.util :as mu]))
|
[malli.util :as mu]
|
||||||
|
[promesa.exec :as px]))
|
||||||
|
|
||||||
(repl/disable-reload! (find-ns 'integrant.core))
|
(repl/disable-reload! (find-ns 'integrant.core))
|
||||||
(set! *warn-on-reflection* true)
|
(set! *warn-on-reflection* true)
|
||||||
|
@ -176,4 +178,3 @@
|
||||||
[:map
|
[:map
|
||||||
[:type [:= :b]]
|
[:type [:= :b]]
|
||||||
[:b :int]]]]]]]])
|
[:b :int]]]]]]]])
|
||||||
|
|
||||||
|
|
|
@ -299,6 +299,10 @@
|
||||||
:hint "database object not found"))
|
:hint "database object not found"))
|
||||||
row))
|
row))
|
||||||
|
|
||||||
|
(defn plan
|
||||||
|
[ds sql]
|
||||||
|
(jdbc/plan ds sql sql/default-opts))
|
||||||
|
|
||||||
(defn get-by-id
|
(defn get-by-id
|
||||||
[ds table id & {:as opts}]
|
[ds table id & {:as opts}]
|
||||||
(get ds table {:id id} opts))
|
(get ds table {:id id} opts))
|
||||||
|
|
|
@ -12,9 +12,9 @@
|
||||||
[app.common.data :as d]
|
[app.common.data :as d]
|
||||||
[app.common.exceptions :as ex]
|
[app.common.exceptions :as ex]
|
||||||
[app.common.files.features :as ffeat]
|
[app.common.files.features :as ffeat]
|
||||||
|
[app.common.files.migrations :as pmg]
|
||||||
[app.common.logging :as l]
|
[app.common.logging :as l]
|
||||||
[app.common.pages :as cp]
|
[app.common.pages :as cp]
|
||||||
[app.common.files.migrations :as pmg]
|
|
||||||
[app.common.pprint :refer [pprint]]
|
[app.common.pprint :refer [pprint]]
|
||||||
[app.common.spec :as us]
|
[app.common.spec :as us]
|
||||||
[app.common.uuid :as uuid]
|
[app.common.uuid :as uuid]
|
||||||
|
@ -31,9 +31,13 @@
|
||||||
[clojure.stacktrace :as strace]
|
[clojure.stacktrace :as strace]
|
||||||
[clojure.walk :as walk]
|
[clojure.walk :as walk]
|
||||||
[cuerdas.core :as str]
|
[cuerdas.core :as str]
|
||||||
[expound.alpha :as expound]))
|
[expound.alpha :as expound]
|
||||||
|
[promesa.core :as p]
|
||||||
|
[promesa.exec :as px]
|
||||||
|
[promesa.exec.csp :as sp]))
|
||||||
|
|
||||||
(def ^:dynamic *conn*)
|
(def ^:dynamic *conn*)
|
||||||
|
(def ^:dynamic *pool*)
|
||||||
|
|
||||||
(defn reset-password!
|
(defn reset-password!
|
||||||
"Reset a password to a specific one for a concrete user or all users
|
"Reset a password to a specific one for a concrete user or all users
|
||||||
|
@ -146,6 +150,90 @@
|
||||||
|
|
||||||
(when (fn? on-end) (on-end))))
|
(when (fn? on-end) (on-end))))
|
||||||
|
|
||||||
|
(defn- println!
|
||||||
|
[& params]
|
||||||
|
(locking println
|
||||||
|
(apply println params)))
|
||||||
|
|
||||||
|
(defn process-files!
|
||||||
|
"Apply a function to all files in the database, reading them in
|
||||||
|
batches."
|
||||||
|
|
||||||
|
[{:keys [::db/pool] :as system} & {:keys [chunk-size
|
||||||
|
max-items
|
||||||
|
workers
|
||||||
|
start-at
|
||||||
|
on-file
|
||||||
|
on-error
|
||||||
|
on-end
|
||||||
|
on-init]
|
||||||
|
:or {chunk-size 10
|
||||||
|
workers 1}}]
|
||||||
|
|
||||||
|
(letfn [(get-chunk [conn cursor]
|
||||||
|
(let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])]
|
||||||
|
[(some->> rows peek :created-at) (seq rows)]))
|
||||||
|
|
||||||
|
(get-candidates [conn]
|
||||||
|
(->> (d/iteration (partial get-chunk conn)
|
||||||
|
:vf second
|
||||||
|
:kf first
|
||||||
|
:initk (or start-at (dt/now)))
|
||||||
|
(take max-items)))
|
||||||
|
|
||||||
|
|
||||||
|
(on-error* [cause file]
|
||||||
|
(println! "unexpected exception happened on processing file: " (:id file))
|
||||||
|
(strace/print-stack-trace cause))
|
||||||
|
|
||||||
|
(process-file [conn file]
|
||||||
|
(try
|
||||||
|
(binding [*conn* conn
|
||||||
|
pmap/*tracked* (atom {})
|
||||||
|
pmap/*load-fn* (partial files/load-pointer conn (:id file))
|
||||||
|
ffeat/*wrap-with-pointer-map-fn*
|
||||||
|
(if (contains? (:features file) "storage/pointer-map") pmap/wrap identity)
|
||||||
|
ffeat/*wrap-with-objects-map-fn*
|
||||||
|
(if (contains? (:features file) "storage/objectd-map") omap/wrap identity)]
|
||||||
|
(on-file file))
|
||||||
|
(catch Throwable cause
|
||||||
|
((or on-error on-error*) cause file))))
|
||||||
|
|
||||||
|
(run-worker [in index]
|
||||||
|
(db/with-atomic [conn pool]
|
||||||
|
(loop [i 0]
|
||||||
|
(when-let [file (sp/take! in)]
|
||||||
|
(println! "=> worker: index:" index "| loop:" i "| file:" (:id file) "|" (px/get-name))
|
||||||
|
(process-file conn file)
|
||||||
|
(recur (inc i))))))
|
||||||
|
|
||||||
|
(run-producer [input]
|
||||||
|
(db/with-atomic [conn pool]
|
||||||
|
(doseq [file (get-candidates conn)]
|
||||||
|
(println! "=> producer:" (:id file) "|" (px/get-name))
|
||||||
|
(sp/put! input file))
|
||||||
|
(sp/close! input)))
|
||||||
|
|
||||||
|
(start-worker [input index]
|
||||||
|
(px/thread
|
||||||
|
{:name (str "penpot/srepl/worker/" index)}
|
||||||
|
(run-worker input index)))
|
||||||
|
]
|
||||||
|
|
||||||
|
(when (fn? on-init) (on-init))
|
||||||
|
|
||||||
|
(let [input (sp/chan :buf chunk-size)
|
||||||
|
producer (px/thread
|
||||||
|
{:name "penpot/srepl/producer"}
|
||||||
|
(run-producer input))
|
||||||
|
threads (->> (range workers)
|
||||||
|
(map (partial start-worker input))
|
||||||
|
(cons producer)
|
||||||
|
(doall))]
|
||||||
|
|
||||||
|
(run! p/await! threads)
|
||||||
|
(when (fn? on-end) (on-end)))))
|
||||||
|
|
||||||
(defn update-pages
|
(defn update-pages
|
||||||
"Apply a function to all pages of one file. The function receives a page and returns an updated page."
|
"Apply a function to all pages of one file. The function receives a page and returns an updated page."
|
||||||
[data f]
|
[data f]
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue