Merge pull request #4082 from penpot/niwinz-staging-binfile-join

📎 Add helper for check not referenced media
This commit is contained in:
Alejandro 2024-01-31 07:27:05 +01:00 committed by GitHub
commit 4c815998f8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -9,12 +9,14 @@
(:refer-clojure :exclude [parse-uuid]) (:refer-clojure :exclude [parse-uuid])
#_:clj-kondo/ignore #_:clj-kondo/ignore
(:require (:require
[app.binfile.common :as bfc]
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.features :as cfeat] [app.common.features :as cfeat]
[app.common.files.changes :as cpc] [app.common.files.changes :as cpc]
[app.common.files.migrations :as pmg] [app.common.files.migrations :as fmg]
[app.common.files.repair :as repair] [app.common.files.repair :as repair]
[app.common.files.validate :as cfv]
[app.common.files.validate :as validate] [app.common.files.validate :as validate]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.pprint :refer [pprint]] [app.common.pprint :refer [pprint]]
@ -38,7 +40,8 @@
[expound.alpha :as expound] [expound.alpha :as expound]
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px] [promesa.exec :as px]
[promesa.exec.csp :as sp])) [promesa.exec.semaphore :as ps]
[promesa.util :as pu]))
(def ^:dynamic *system* nil) (def ^:dynamic *system* nil)
@ -62,108 +65,111 @@
{:data data} {:data data}
{:id id})))) {:id id}))))
(defn get-file (defn- get-file*
"Get the migrated data of one file." "Get the migrated data of one file."
[id & {:keys [migrate?] :or {migrate? true}}] [system id]
(db/run! main/system (db/run! system
(fn [system] (fn [system]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? migrate?) (-> (files/get-file system id :migrate? false)
(update :data feat.fdata/process-pointers deref) (update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))))))) (update :data feat.fdata/process-objects (partial into {}))
(fmg/migrate-file))))))
(defn get-file
"Get the migrated data of one file."
[id]
(get-file* main/system id))
(defn validate (defn validate
"Validate structure, referencial integrity and semantic coherence of "Validate structure, referencial integrity and semantic coherence of
all contents of a file. Returns a list of errors." all contents of a file. Returns a list of errors."
[id] [id]
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (let [id (if (string? id) (parse-uuid id) id)
(let [id (if (string? id) (parse-uuid id) id) file (get-file* system id)
file (files/get-file system id) libs (->> (files/get-file-libraries conn id)
libs (->> (files/get-file-libraries conn id) (into [file] (map (fn [{:keys [id]}]
(into [file] (map (fn [{:keys [id]}] (get-file* system id))))
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (d/index-by :id))]
(-> (files/get-file system id :migrate? false) (validate/validate-file file libs)))))
(update :data feat.fdata/process-pointers deref)
(pmg/migrate-file))))))
(d/index-by :id))]
(validate/validate-file file libs))))))
(defn repair! (defn repair!
"Repair the list of errors detected by validation." "Repair the list of errors detected by validation."
[id] [id]
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(binding [pmap/*tracked* (pmap/create-tracked) (let [id (if (string? id) (parse-uuid id) id)
pmap/*load-fn* (partial feat.fdata/load-pointer system id)] file (get-file* system id)
(let [id (if (string? id) (parse-uuid id) id) libs (->> (files/get-file-libraries conn id)
file (files/get-file system id) (into [file] (map (fn [{:keys [id]}]
libs (->> (files/get-file-libraries conn id) (get-file* system id))))
(into [file] (map (fn [{:keys [id]}] (d/index-by :id))
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] errors (validate/validate-file file libs)
(-> (files/get-file system id :migrate? false) changes (repair/repair-file file libs errors)
(update :data feat.fdata/process-pointers deref)
(pmg/migrate-file))))))
(d/index-by :id))
errors (validate/validate-file file libs)
changes (repair/repair-file file libs errors)
file (-> file file (-> file
(update :revn inc) (update :revn inc)
(update :data cpc/process-changes changes) (update :data cpc/process-changes changes))
(update :data blob/encode))]
(when (contains? (:features file) "fdata/pointer-map") file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/persist-pointers! system id)) (feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system id)
file))
file)]
(db/update! conn :file
{:revn (:revn file)
:data (blob/encode (:data file))
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)})
:repaired))))
(db/update! conn :file
{:revn (:revn file)
:data (:data file)
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)})
:repaired)))))
(defn update-file! (defn update-file!
"Apply a function to the data of one file. Optionally save the changes or not. "Apply a function to the data of one file. Optionally save the changes or not.
The function receives the decoded and migrated file data." The function receives the decoded and migrated file data."
[& {:keys [update-fn id rollback? migrate? inc-revn?] [& {:keys [update-fn id rollback? inc-revn?]
:or {rollback? true migrate? true inc-revn? true}}] :or {rollback? true inc-revn? true}}]
(letfn [(process-file [{:keys [::db/conn] :as system} {:keys [features] :as file}] (letfn [(process-file [{:keys [::db/conn] :as system} file-id]
(binding [pmap/*tracked* (pmap/create-tracked) (let [file (get-file* system file-id)
pmap/*load-fn* (partial feat.fdata/load-pointer system id) file (cond-> (update-fn file)
cfeat/*wrap-with-pointer-map-fn* inc-revn? (update :revn inc))
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
cfeat/*wrap-with-objects-map-fn*
(if (contains? features "fdata/objectd-map") omap/wrap identity)]
(let [file (cond-> (update-fn file) _ (cfv/validate-file-schema! file)
inc-revn? (update :revn inc))
features (db/create-array conn "text" (:features file))
data (blob/encode (:data file))]
(db/update! conn :file file (if (contains? (:features file) "fdata/objects-map")
{:data data (feat.fdata/enable-objects-map file)
:revn (:revn file) file)
:features features}
{:id id}))
(when (contains? (:features file) "fdata/pointer-map") file (if (contains? (:features file) "fdata/pointer-map")
(feat.fdata/persist-pointers! system id)) (binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system id)
file))
file)]
(db/update! conn :file
{:data (blob/encode (:data file))
:features (db/create-array conn "text" (:features file))
:revn (:revn file)}
{:id (:id file)})
(dissoc file :data)))] (dissoc file :data)))]
(db/tx-run! (or *system* main/system) (db/tx-run! (or *system* (assoc main/system ::db/rollback rollback?))
(fn [system] (fn [system]
(binding [*system* system] (binding [*system* system]
(try (process-file system id))))))
(->> (files/get-file system id :migrate? migrate?)
(process-file system))
(finally
(when rollback?
(db/rollback! system)))))))))
(def ^:private sql:get-file-ids (def ^:private sql:get-file-ids
@ -190,16 +196,11 @@
(strace/print-stack-trace cause)) (strace/print-stack-trace cause))
(process-file [{:keys [::db/conn] :as system} file-id] (process-file [{:keys [::db/conn] :as system} file-id]
(let [file (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system file-id)] (let [file (get-file* system file-id)
(-> (files/get-file system file-id)
(update :data feat.fdata/process-pointers deref)))
libs (when with-libraries? libs (when with-libraries?
(->> (files/get-file-libraries conn file-id) (->> (files/get-file-libraries conn file-id)
(into [file] (map (fn [{:keys [id]}] (into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)] (get-file* system id))))
(-> (files/get-file system id)
(update :data feat.fdata/process-pointers deref))))))
(d/index-by :id)))] (d/index-by :id)))]
(try (try
(if with-libraries? (if with-libraries?
@ -208,7 +209,7 @@
(catch Throwable cause (catch Throwable cause
((or on-error on-error*) cause file)))))] ((or on-error on-error*) cause file)))))]
(db/tx-run! main/system (db/tx-run! (assoc main/system ::db/rollback true)
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(try (try
(binding [*system* system] (binding [*system* system]
@ -217,83 +218,125 @@
(get-candidates conn))) (get-candidates conn)))
(finally (finally
(when (fn? on-end) (when (fn? on-end)
(ex/ignoring (on-end))) (ex/ignoring (on-end)))))))))
(db/rollback! system)))))))
(defn repair-file-media
[{:keys [id data] :as file}]
(let [conn (db/get-connection *system*)
used (bfc/collect-used-media data)
ids (db/create-array conn "uuid" used)
sql (str "SELECT * FROM file_media_object WHERE id = ANY(?)")
rows (db/exec! conn [sql ids])
index (reduce (fn [index media]
(if (not= (:file-id media) id)
(let [media-id (uuid/next)]
(l/wrn :hint "found not referenced media"
:file-id (str id)
:media-id (str (:id media)))
(db/insert! *system* :file-media-object
(-> media
(assoc :file-id id)
(assoc :id media-id)))
(assoc index (:id media) media-id))
index))
{}
rows)]
(when (seq index)
(binding [bfc/*state* (atom {:index index})]
(update file :data (fn [fdata]
(-> fdata
(update :pages-index #'bfc/relink-shapes)
(update :components #'bfc/relink-shapes)
(update :media #'bfc/relink-media)
(d/without-nils))))))))
(defn process-files! (defn process-files!
"Apply a function to all files in the database, reading them in "Apply a function to all files in the database"
batches."
[& {:keys [max-items [& {:keys [max-items
workers max-jobs
start-at start-at
on-file on-file
on-error
on-end
on-init
rollback?] rollback?]
:or {workers 1 :or {max-jobs 1
rollback? true}}] rollback? true}}]
(letfn [(get-candidates [conn]
(cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
(some? max-items)
(take max-items)))
(on-error* [cause file] (l/dbg :hint "process:start"
(println! "unexpected exception happened on processing file: " (:id file)) :rollback rollback?
(strace/print-stack-trace cause)) :max-jobs max-jobs
:max-items max-items)
(process-file [system file-id] (let [tpoint (dt/tpoint)
(try factory (px/thread-factory :virtual false :prefix "penpot/file-process/")
(let [{:keys [features] :as file} (files/get-file system file-id)] executor (px/cached-executor :factory factory)
(binding [pmap/*tracked* (pmap/create-tracked) sjobs (ps/create :permits max-jobs)
pmap/*load-fn* (partial feat.fdata/load-pointer system file-id)
cfeat/*wrap-with-pointer-map-fn*
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
cfeat/*wrap-with-objects-map-fn*
(if (contains? features "fdata/objectd-map") omap/wrap identity)]
(on-file file) process-file
(fn [file-id tpoint]
(try
(l/trc :hint "process:file:start" :file-id (str file-id))
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [{:keys [::db/conn] :as system}]
(let [file' (get-file* system file-id)
file (binding [*system* system]
(on-file file'))]
(when (contains? features "fdata/pointer-map") (when (and (some? file)
(feat.fdata/persist-pointers! system file-id)))) (not (identical? file file')))
(catch Throwable cause (cfv/validate-file-schema! file)
((or on-error on-error*) cause file-id))))
(run-worker [in index] (let [file (if (contains? (:features file) "fdata/objects-map")
(db/tx-run! main/system (feat.fdata/enable-objects-map file)
(fn [system] file)
(binding [*system* system]
(loop [i 0]
(when-let [file-id (sp/take! in)]
(println! "=> worker: index:" index "| loop:" i "| file:" (str file-id) "|" (px/get-name))
(process-file system file-id)
(recur (inc i)))))
(when rollback? file (if (contains? (:features file) "fdata/pointer-map")
(db/rollback! system))))) (binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system file-id)
file))
file)]
(run-producer [input] (db/update! conn :file
(db/tx-run! main/system {:data (blob/encode (:data file))
(fn [{:keys [::db/conn]}] :features (db/create-array conn "text" (:features file))
(doseq [file-id (get-candidates conn)] :revn (:revn file)}
(println! "=> producer:" file-id "|" (px/get-name)) {:id file-id}))))))
(sp/put! input file-id)) (catch Throwable cause
(sp/close! input))))] (l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id)
:cause cause))
(finally
(ps/release! sjobs)
(let [elapsed (dt/format-duration (tpoint))]
(l/trc :hint "process:file:end"
:file-id (str file-id)
:elapsed elapsed)))))]
(when (fn? on-init) (on-init))
(let [input (sp/chan :buf 25) (try
producer (px/thread (db/tx-run! main/system
{:name "penpot/srepl/producer"} (fn [{:keys [::db/conn] :as system}]
(run-producer input)) (db/exec! conn ["SET statement_timeout = 0"])
threads (->> (range workers) (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(map (fn [index]
(px/thread (run! (fn [file-id]
{:name (str "penpot/srepl/worker/" index)} (ps/acquire! sjobs)
(run-worker input index)))) (px/run! executor (partial process-file file-id (dt/tpoint))))
(cons producer) (->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
(doall))] (take max-items)
(map :id)))
;; Close and await tasks
(pu/close! executor)))
(catch Throwable cause
(l/dbg :hint "process:error" :cause cause))
(finally
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "process:end"
:rollback rollback?
:elapsed elapsed))))))
(run! p/await! threads)
(when (fn? on-end) (on-end)))))