Reorganize fdata/pointer-map feature helpers

Mainly move all pointer-map related helpers from app.rpc.commands.files
to the the app.features.fdata namespace and normalizes codestile around
feature handling on all affected code.

This commit also comes with several features related bugifxes on the
components-v2 migration code:

- properly migrate legacy feature names on apply components-v2 migration
- start using new fdata feature related functions
- prevent generation of a ephimeral pointer on each graphic migration
  operation; on large files this caused a very noticiable overhead of
  creating a big number of completly unused pointer maps
- do persistence after validation and not before
This commit is contained in:
Andrey Antukh 2023-12-07 11:51:52 +01:00 committed by Andrés Moya
parent 5669bfc260
commit 417366d998
13 changed files with 649 additions and 610 deletions

View file

@ -23,6 +23,7 @@
[app.config :as cfg]
[app.db :as db]
[app.db.sql :as sql]
[app.features.fdata :as feat.fdata]
[app.main :refer [system]]
[app.rpc.commands.files :as files]
[app.rpc.commands.files-update :as files-update]
@ -39,7 +40,6 @@
[promesa.exec :as px]
[promesa.exec.csp :as sp]))
(def ^:dynamic *conn* nil)
(def ^:dynamic *system* nil)
(defn println!
@ -63,71 +63,75 @@
(defn get-file
"Get the migrated data of one file."
[system id]
[system id & {:keys [migrate?] :or {migrate? true}}]
(db/run! system
(fn [{:keys [::db/conn]}]
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
(-> (files/get-file conn id)
(files/process-pointers deref))))))
(fn [system]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? migrate?)
(update :data feat.fdata/process-pointers deref))))))
(defn validate
"Validate structure, referencial integrity and semantic coherence of
all contents of a file. Returns a list of errors."
[system id]
(db/with-atomic [conn (:app.db/pool system)]
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
(let [file (files/get-file conn id)
libraries (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
(-> (db/get conn :file {:id id})
(files/decode-row)
(files/process-pointers deref) ; ensure all pointers resolved
(pmg/migrate-file))))))
(d/index-by :id))]
(validate/validate-file file libraries)))))
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(let [id (if (string? id) (parse-uuid id) id)
file (files/get-file system id)
libs (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? false)
(feat.fdata/process-pointers deref)
(pmg/migrate-file))))))
(d/index-by :id))]
(validate/validate-file file libs))))))
(defn repair
(defn repair!
"Repair the list of errors detected by validation."
[system id]
(db/with-atomic [conn (:app.db/pool system)]
(let [file (files/get-file conn id)]
(binding [*conn* conn
pmap/*tracked* (atom {})
pmap/*load-fn* (partial files/load-pointer conn id)]
(let [libraries (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
(-> (db/get conn :file {:id id})
(files/decode-row)
(files/process-pointers deref) ; ensure all pointers resolved
(pmg/migrate-file))))))
(d/index-by :id))
errors (validate/validate-file file libraries)
changes (-> (repair/repair-file (:data file) libraries errors)
(get :redo-changes))
file (-> file
(update :revn inc)
(update :data cpc/process-changes changes)
(update :data blob/encode))]
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(let [id (if (string? id) (parse-uuid id) id)
file (files/get-file system id)
libs (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? false)
(feat.fdata/process-pointers deref)
(pmg/migrate-file))))))
(d/index-by :id))
errors (validate/validate-file file libs)
changes (-> (repair/repair-file (:data file) libs errors) :redo-changes)
(files/persist-pointers! conn id)
(db/update! conn :file
{:revn (:revn file)
:data (:data file)
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)}))))))
file (-> file
(update :revn inc)
(update :data cpc/process-changes changes)
(update :data blob/encode))]
(when (contains? (:features file) "fdata/pointer-map")
(feat.fdata/persist-pointers! system id))
(db/update! conn :file
{:revn (:revn file)
:data (:data file)
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)})
:repaired)))))
(defn update-file!
"Apply a function to the data of one file. Optionally save the changes or not.
The function receives the decoded and migrated file data."
[system & {:keys [update-fn id rollback? migrate? inc-revn?]
:or {rollback? true migrate? true inc-revn? true}}]
(letfn [(process-file [conn {:keys [features] :as file}]
(binding [pmap/*tracked* (atom {})
pmap/*load-fn* (partial files/load-pointer conn id)
(letfn [(process-file [{:keys [::db/conn] :as system} {:keys [features] :as file}]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system id)
cfeat/*wrap-with-pointer-map-fn*
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
cfeat/*wrap-with-objects-map-fn*
@ -145,19 +149,19 @@
{:id id}))
(when (contains? (:features file) "fdata/pointer-map")
(files/persist-pointers! conn id))
(feat.fdata/persist-pointers! system id))
(dissoc file :data)))]
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(binding [*conn* conn *system* system]
(fn [system]
(binding [*system* system]
(try
(->> (files/get-file conn id :migrate? migrate?)
(process-file conn))
(->> (files/get-file system id :migrate? migrate?)
(process-file system))
(finally
(when rollback?
(db/rollback! conn)))))))))
(db/rollback! system)))))))))
(defn analyze-files
"Apply a function to all files in the database, reading them in
@ -187,17 +191,17 @@
(println "unexpected exception happened on processing file: " (:id file))
(strace/print-stack-trace cause))
(process-file [conn file-id]
(let [file (binding [pmap/*load-fn* (partial files/load-pointer conn file-id)]
(-> (files/get-file conn file-id)
(files/process-pointers deref)))
(process-file [{:keys [::db/conn] :as system} file-id]
(let [file (binding [pmap/*load-fn* (partial feat.fdata/load-pointer system file-id)]
(-> (files/get-file system file-id)
(update :data feat.fdata/process-pointers deref)))
libs (when with-libraries?
(->> (files/get-file-libraries conn file-id)
(into [file] (map (fn [{:keys [id]}]
(binding [pmap/*load-fn* (partial files/load-pointer conn id)]
(-> (files/get-file conn id)
(files/process-pointers deref))))))
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id)
(update :data feat.fdata/process-pointers deref))))))
(d/index-by :id)))]
(try
(if with-libraries?
@ -209,31 +213,31 @@
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(try
(binding [*conn* conn *system* system]
(binding [*system* system]
(when (fn? on-init) (on-init))
(run! (partial process-file conn) (get-candidates conn)))
(run! (partial process-file system) (get-candidates conn)))
(finally
(when (fn? on-end)
(ex/ignoring (on-end)))
(db/rollback! conn)))))))
(db/rollback! system)))))))
(defn process-files!
"Apply a function to all files in the database, reading them in
batches."
[{:keys [::db/pool] :as system} & {:keys [chunk-size
max-items
workers
start-at
on-file
on-error
on-end
on-init
rollback?]
:or {chunk-size 10
max-items Long/MAX_VALUE
workers 1
rollback? true}}]
[system & {:keys [chunk-size
max-items
workers
start-at
on-file
on-error
on-end
on-init
rollback?]
:or {chunk-size 10
max-items Long/MAX_VALUE
workers 1
rollback? true}}]
(letfn [(get-chunk [conn cursor]
(let [sql (str "SELECT id, created_at FROM file "
" WHERE created_at < ? AND deleted_at is NULL "
@ -252,11 +256,11 @@
(println! "unexpected exception happened on processing file: " (:id file))
(strace/print-stack-trace cause))
(process-file [conn file-id]
(process-file [system file-id]
(try
(let [{:keys [features] :as file} (files/get-file conn file-id)]
(binding [pmap/*tracked* (atom {})
pmap/*load-fn* (partial files/load-pointer conn file-id)
(let [{:keys [features] :as file} (files/get-file system file-id)]
(binding [pmap/*tracked* (pmap/create-tracked)
pmap/*load-fn* (partial feat.fdata/load-pointer system file-id)
cfeat/*wrap-with-pointer-map-fn*
(if (contains? features "fdata/pointer-map") pmap/wrap identity)
cfeat/*wrap-with-objects-map-fn*
@ -265,30 +269,30 @@
(on-file file)
(when (contains? features "fdata/pointer-map")
(files/persist-pointers! conn file-id))))
(feat.fdata/persist-pointers! system file-id))))
(catch Throwable cause
((or on-error on-error*) cause file-id))))
(run-worker [in index]
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(binding [*conn* conn *system* system]
(fn [system]
(binding [*system* system]
(loop [i 0]
(when-let [file-id (sp/take! in)]
(println! "=> worker: index:" index "| loop:" i "| file:" (str file-id) "|" (px/get-name))
(process-file conn file-id)
(process-file system file-id)
(recur (inc i)))))
(when rollback?
(db/rollback! conn)))))
(db/rollback! system)))))
(run-producer [input]
(db/with-atomic [conn pool]
(doseq [file-id (get-candidates conn)]
(println! "=> producer:" file-id "|" (px/get-name))
(sp/put! input file-id))
(sp/close! input)))]
(db/tx-run! system (fn [{:keys [::db/conn]}]
(doseq [file-id (get-candidates conn)]
(println! "=> producer:" file-id "|" (px/get-name))
(sp/put! input file-id))
(sp/close! input))))]
(when (fn? on-init) (on-init))