♻️ Refactor srepl helpers

This commit is contained in:
Andrey Antukh 2024-02-09 14:49:22 +01:00 committed by Andrés Moya
parent dc67056a8c
commit f4ac607958
5 changed files with 370 additions and 393 deletions

View file

@ -7,39 +7,18 @@
(ns app.srepl.helpers
"A main namespace for server repl."
(:refer-clojure :exclude [parse-uuid])
#_:clj-kondo/ignore
(:require
[app.binfile.common :as bfc]
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.features :as cfeat]
[app.common.files.changes :as cpc]
[app.common.files.migrations :as fmg]
[app.common.files.validate :as cfv]
[app.common.logging :as l]
[app.common.pprint :refer [pprint]]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cfg]
[app.db :as db]
[app.db.sql :as sql]
[app.features.components-v2 :as feat.comp-v2]
[app.features.fdata :as feat.fdata]
[app.main :as main]
[app.rpc.commands.files :as files]
[app.rpc.commands.files-update :as files-update]
[app.rpc.commands.files-snapshot :as fsnap]
[app.util.blob :as blob]
[app.util.objects-map :as omap]
[app.util.pointer-map :as pmap]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[clojure.stacktrace :as strace]
[clojure.walk :as walk]
[cuerdas.core :as str]
[expound.alpha :as expound]
[promesa.core :as p]
[promesa.exec :as px]
[promesa.exec.semaphore :as ps]
[promesa.util :as pu]))
[app.util.pointer-map :as pmap]))
(def ^:dynamic *system* nil)
@ -54,25 +33,6 @@
(d/parse-uuid v)
v))
;; (def ^:private sql:get-and-lock-team-files
;; "SELECT f.id
;; FROM file AS f
;; JOIN project AS p ON (p.id = f.project_id)
;; WHERE p.team_id = ?
;; FOR UPDATE")
;; (defn get-and-lock-team-files
;; [conn team-id]
;; (->> (db/exec! conn [sql:get-and-lock-team-files team-id])
;; (into #{} (map :id))))
;; (defn get-team
;; [system team-id]
;; (-> (db/get system :team {:id team-id}
;; {::db/remove-deleted false
;; ::db/check-deleted false})
;; (update :features db/decode-pgarray #{})))
(defn get-file
"Get the migrated data of one file."
([id] (get-file (or *system* main/system) id))
@ -107,8 +67,10 @@
{:revn (:revn file)
:data (:data file)
:features (:features file)
:deleted-at (:deleted-at file)
:created-at (:created-at file)
:modified-at (:modified-at file)
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)})))
@ -125,213 +87,90 @@
(defn reset-file-data!
"Hardcode replace of the data of one file."
[id data]
(db/tx-run! main/system
[system id data]
(db/tx-run! system
(fn [system]
(db/update! system :file
{:data data}
{:id id}))))
(defn process-file*
[system file-id update-fn]
(let [file (get-file system file-id)
file (-> (update-fn file)
(update :revn inc))]
(cfv/validate-file-schema! file)
(update-file! system file)
(dissoc file :data)))
(def ^:private sql:snapshots-with-file
"WITH files AS (
SELECT f.id AS file_id,
(SELECT fc.id
FROM file_change AS fc
WHERE fc.label = ?
AND fc.file_id = f.id
ORDER BY fc.created_at DESC
LIMIT 1) AS id
FROM file AS f
) SELECT * FROM files
WHERE file_id = ANY(?)
AND id IS NOT NULL")
(defn get-file-snapshots
"Get a seq parirs of file-id and snapshot-id for a set of files
and specified label"
[conn label ids]
(db/exec! conn [sql:snapshots-with-file label
(db/create-array conn "uuid" ids)]))
(defn take-team-snapshot!
[system team-id label]
(let [conn (db/get-connection system)]
(->> (feat.comp-v2/get-and-lock-team-files conn team-id)
(map (fn [file-id]
{:file-id file-id
:label label}))
(reduce (fn [result params]
(fsnap/take-file-snapshot! conn params)
(inc result))
0))))
(defn restore-team-snapshot!
[system team-id label]
(let [conn (db/get-connection system)
ids (->> (feat.comp-v2/get-and-lock-team-files conn team-id)
(into #{}))
snap (get-file-snapshots conn label ids)
ids' (into #{} (map :file-id) snap)
team (-> (feat.comp-v2/get-team conn team-id)
(update :features disj "components/v2"))]
(when (not= ids ids')
(throw (RuntimeException. "no uniform snapshot available")))
(feat.comp-v2/update-team! conn team)
(reduce (fn [result params]
(fsnap/restore-file-snapshot! conn params)
(inc result))
0
snap)))
(defn process-file!
"Apply a function to the data of one file. Optionally save the changes or not.
The function receives the decoded and migrated file data."
[& {:keys [update-fn id rollback?]
:or {rollback? true}}]
[system file-id update-fn & {:keys [label validate? with-libraries?] :or {validate? true} :as opts}]
(let [system (or *system* (assoc main/system ::db/rollback rollback?))]
(db/tx-run! system
(fn [system]
(binding [*system* system]
(process-file* system id update-fn))))))
(when (string? label)
(fsnap/take-file-snapshot! system {:file-id file-id :label label}))
(let [conn (db/get-connection system)
file (get-file system file-id)
libs (when with-libraries?
(->> (files/get-file-libraries conn file-id)
(into [file] (map (fn [{:keys [id]}]
(get-file system id))))
(d/index-by :id)))
(def ^:private sql:get-file-ids
"SELECT id FROM file
WHERE created_at < ? AND deleted_at is NULL
ORDER BY created_at DESC")
file' (if with-libraries?
(update-fn file libs opts)
(update-fn file opts))]
(defn analyze-files
"Apply a function to all files in the database, reading them in
batches. Do not change data.
The `on-file` parameter should be a function that receives the file
and the previous state and returns the new state.
Emits rollback at the end of operation."
[& {:keys [max-items start-at on-file on-error on-end on-init with-libraries?]}]
(letfn [(get-candidates [conn]
(cond->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
(some? max-items)
(take max-items)))
(on-error* [cause file]
(println "unexpected exception happened on processing file: " (:id file))
(strace/print-stack-trace cause))
(process-file [{:keys [::db/conn] :as system} file-id]
(let [file (get-file system file-id)
libs (when with-libraries?
(->> (files/get-file-libraries conn file-id)
(into [file] (map (fn [{:keys [id]}]
(get-file system id))))
(d/index-by :id)))]
(try
(if with-libraries?
(on-file file libs)
(on-file file))
(catch Throwable cause
((or on-error on-error*) cause file)))))]
(db/tx-run! (assoc main/system ::db/rollback true)
(fn [{:keys [::db/conn] :as system}]
(try
(binding [*system* system]
(when (fn? on-init) (on-init))
(run! (partial process-file system)
(get-candidates conn)))
(finally
(when (fn? on-end)
(ex/ignoring (on-end)))))))))
(defn process-files!
"Apply a function to all files in the database"
[& {:keys [max-items
max-jobs
start-at
on-file
validate?
rollback?]
:or {max-jobs 1
max-items Long/MAX_VALUE
validate? true
rollback? true}}]
(l/dbg :hint "process:start"
:rollback rollback?
:max-jobs max-jobs
:max-items max-items)
(let [tpoint (dt/tpoint)
factory (px/thread-factory :virtual false :prefix "penpot/file-process/")
executor (px/cached-executor :factory factory)
sjobs (ps/create :permits max-jobs)
process-file
(fn [file-id idx tpoint]
(try
(l/trc :hint "process:file:start" :file-id (str file-id) :index idx)
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [{:keys [::db/conn] :as system}]
(let [file' (get-file system file-id)
file (binding [*system* system]
(on-file file'))]
(when (and (some? file) (not (identical? file file')))
(when validate?
(cfv/validate-file-schema! file))
(let [file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system file-id)
file))
file)]
(db/update! conn :file
{:data (blob/encode (:data file))
:deleted-at (:deleted-at file)
:created-at (:created-at file)
:modified-at (:modified-at file)
:features (db/create-array conn "text" (:features file))
:revn (:revn file)}
{:id file-id}))))))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing file (skiping)"
:file-id (str file-id)
:index idx
:cause cause))
(finally
(ps/release! sjobs)
(let [elapsed (dt/format-duration (tpoint))]
(l/trc :hint "process:file:end"
:file-id (str file-id)
:index idx
:elapsed elapsed)))))]
(try
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(try
(reduce (fn [idx file-id]
(ps/acquire! sjobs)
(px/run! executor (partial process-file file-id idx (dt/tpoint)))
(inc idx))
0
(->> (db/cursor conn [sql:get-file-ids (or start-at (dt/now))])
(take max-items)
(map :id)))
(finally
;; Close and await tasks
(pu/close! executor)))))
(catch Throwable cause
(l/dbg :hint "process:error" :cause cause))
(finally
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "process:end"
:rollback rollback?
:elapsed elapsed))))))
(defn repair-file-media
"A helper intended to be used with `process-files!` that fixes all
not propertly referenced file-media-object for a file"
[{:keys [id data] :as file}]
(let [conn (db/get-connection *system*)
used (bfc/collect-used-media data)
ids (db/create-array conn "uuid" used)
sql (str "SELECT * FROM file_media_object WHERE id = ANY(?)")
rows (db/exec! conn [sql ids])
index (reduce (fn [index media]
(if (not= (:file-id media) id)
(let [media-id (uuid/next)]
(l/wrn :hint "found not referenced media"
:file-id (str id)
:media-id (str (:id media)))
(db/insert! *system* :file-media-object
(-> media
(assoc :file-id id)
(assoc :id media-id)))
(assoc index (:id media) media-id))
index))
{}
rows)]
(when (seq index)
(binding [bfc/*state* (atom {:index index})]
(update file :data (fn [fdata]
(-> fdata
(update :pages-index #'bfc/relink-shapes)
(update :components #'bfc/relink-shapes)
(update :media #'bfc/relink-media)
(d/without-nils))))))))
(when (and (some? file)
(not (identical? file file')))
(when validate? (cfv/validate-file-schema! file'))
(let [file' (update file' :revn inc)]
(update-file! system file')
true))))