Improve snapshot related internal API

This commit also adds the ability to take snapshot of all files
of the team in a single run/transaction.
This commit is contained in:
Andrey Antukh 2024-02-08 15:21:31 +01:00 committed by Andrés Moya
parent aaf457a792
commit 6d35cb2eb4
7 changed files with 433 additions and 286 deletions

View file

@ -1497,7 +1497,7 @@
(-> (db/get system :team {:id team-id} (-> (db/get system :team {:id team-id}
{::db/remove-deleted false {::db/remove-deleted false
::db/check-deleted false}) ::db/check-deleted false})
(decode-row))) (update :features db/decode-pgarray #{})))
(defn- validate-file! (defn- validate-file!
[file libs] [file libs]
@ -1546,19 +1546,21 @@
AND f.deleted_at IS NULL AND f.deleted_at IS NULL
FOR UPDATE") FOR UPDATE")
(defn get-and-lock-files (defn get-and-lock-team-files
[conn team-id] [conn team-id]
(->> (db/cursor conn [sql:get-and-lock-team-files team-id]) (->> (db/cursor conn [sql:get-and-lock-team-files team-id])
(map :id))) (map :id)))
(defn update-team! (defn update-team!
[conn team] [system {:keys [id] :as team}]
(let [params (-> team (let [conn (db/get-connection system)
params (-> team
(update :features db/encode-pgarray conn "text") (update :features db/encode-pgarray conn "text")
(dissoc :id))] (dissoc :id))]
(db/update! conn :team (db/update! conn :team
params params
{:id (:id team)}))) {:id id})
team))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
@ -1657,7 +1659,7 @@
:id id}) :id id})
(run! (partial migrate-file system) (run! (partial migrate-file system)
(get-and-lock-files conn id)) (get-and-lock-team-files conn id))
(->> (assoc team :features features) (->> (assoc team :features features)
(update-team! conn))))))] (update-team! conn))))))]

View file

@ -414,12 +414,12 @@
:hint "provided invalid version")) :hint "provided invalid version"))
(binding [srepl/*system* cfg] (binding [srepl/*system* cfg]
(srepl/update-file! :id file-id (srepl/process-file! :id file-id
:update-fn (fn [file] :update-fn (fn [file]
(update file :data assoc :version version)) (update file :data assoc :version version))
:migrate? false :migrate? false
:inc-revn? false :inc-revn? false
:save? true)) :save? true))
{::rres/status 200 {::rres/status 200
::rres/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::rres/body "OK"})) ::rres/body "OK"}))

View file

@ -285,8 +285,9 @@
file))) file)))
(defn get-minimal-file (defn get-minimal-file
[{:keys [::db/pool] :as cfg} id] [cfg id & {:as opts}]
(db/get pool :file {:id id} {:columns [:id :modified-at :revn]})) (let [opts (assoc opts ::sql/columns [:id :modified-at :revn])]
(db/get cfg :file {:id id} opts)))
(defn get-file-etag (defn get-file-etag
[{:keys [::rpc/profile-id]} {:keys [modified-at revn]}] [{:keys [::rpc/profile-id]} {:keys [modified-at revn]}]

View file

@ -12,14 +12,17 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.db.sql :as-alias sql]
[app.main :as-alias main] [app.main :as-alias main]
[app.media :as media] [app.media :as media]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.commands.files :as files]
[app.rpc.commands.profile :as profile] [app.rpc.commands.profile :as profile]
[app.rpc.doc :as-alias doc] [app.rpc.doc :as-alias doc]
[app.storage :as sto] [app.storage :as sto]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt])) [app.util.time :as dt]
[cuerdas.core :as str]))
(defn check-authorized! (defn check-authorized!
[{:keys [::db/pool]} profile-id] [{:keys [::db/pool]} profile-id]
@ -57,76 +60,119 @@
::sm/params schema:get-file-snapshots} ::sm/params schema:get-file-snapshots}
[cfg {:keys [::rpc/profile-id] :as params}] [cfg {:keys [::rpc/profile-id] :as params}]
(check-authorized! cfg profile-id) (check-authorized! cfg profile-id)
(db/run! cfg #(get-file-snapshots % params))) (db/run! cfg get-file-snapshots params))
(defn restore-file-snapshot! (defn restore-file-snapshot!
[{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}] [{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}]
(let [storage (media/configure-assets-storage storage conn) (let [storage (media/configure-assets-storage storage conn)
params {:id id :file-id file-id} file (files/get-minimal-file conn file-id {::db/for-update true})
options {:columns [:id :data :revn :features]} snapshot (db/get* conn :file-change
snapshot (db/get* conn :file-change params options)] {:file-id file-id
:id id}
{::db/for-share true})]
(when (and (some? snapshot) (when-not snapshot
(some? (:data snapshot))) (ex/raise :type :not-found
:code :snapshot-not-found
:hint "unable to find snapshot with the provided label"
:id id
:file-id file-id))
(l/dbg :hint "restoring snapshot" (when-not (:data snapshot)
:file-id (str file-id) (ex/raise :type :precondition
:snapshot-id (str (:id snapshot))) :code :snapshot-without-data
:hint "snapshot has no data"
:label (:label snapshot)
:file-id file-id))
(db/update! conn :file (l/dbg :hint "restoring snapshot"
{:data (:data snapshot) :file-id (str file-id)
:revn (:revn snapshot) :label (:label snapshot)
:features (:features snapshot)} :snapshot-id (str (:id snapshot)))
{:id file-id})
;; clean object thumbnails (db/update! conn :file
(let [sql (str "update file_tagged_object_thumbnail " {:data (:data snapshot)
" set deleted_at = now() " :revn (inc (:revn file))
" where file_id=? returning media_id") :features (:features snapshot)}
res (db/exec! conn [sql file-id])] {:id file-id})
(doseq [media-id (into #{} (keep :media-id) res)] ;; clean object thumbnails
(sto/touch-object! storage media-id))) (let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
;; clean object thumbnails (doseq [media-id (into #{} (keep :media-id) res)]
(let [sql (str "update file_thumbnail " (sto/touch-object! storage media-id)))
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
{:id (:id snapshot)}))) ;; clean object thumbnails
(let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
(def ^:private schema:restore-file-snapshot {:id (:id snapshot)
[:map :label (:label snapshot)}))
[:file-id ::sm/uuid]
[:id ::sm/uuid]]) (defn- resolve-snapshot-by-label
[conn file-id label]
(->> (db/query conn :file-change
{:file-id file-id
:label label}
{::sql/order-by [[:created-at :desc]]
::sql/columns [:file-id :id :label]})
(first)))
(def ^:private
schema:restore-file-snapshot
[:and
[:map
[:file-id ::sm/uuid]
[:id {:optional true} ::sm/uuid]
[:label {:optional true} :string]]
[::sm/contains-any #{:id :label}]])
(sv/defmethod ::restore-file-snapshot (sv/defmethod ::restore-file-snapshot
{::doc/added "1.20" {::doc/added "1.20"
::doc/skip true ::doc/skip true
::sm/params schema:restore-file-snapshot} ::sm/params schema:restore-file-snapshot}
[cfg {:keys [::rpc/profile-id] :as params}] [cfg {:keys [::rpc/profile-id file-id id label] :as params}]
(check-authorized! cfg profile-id) (check-authorized! cfg profile-id)
(db/tx-run! cfg #(restore-file-snapshot! % params))) (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(let [params (cond-> params
(and (not id) (string? label))
(merge (resolve-snapshot-by-label conn file-id label)))]
(restore-file-snapshot! cfg params)))))
(defn take-file-snapshot! (defn take-file-snapshot!
[{:keys [::db/conn]} {:keys [file-id label]}] [{:keys [::db/conn]} {:keys [file-id label]}]
(when-let [file (db/get* conn :file {:id file-id})] (let [file (db/get conn :file {:id file-id})
(let [id (uuid/next) id (uuid/next)]
label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123)))]
(l/debug :hint "persisting file snapshot" (l/debug :hint "creating file snapshot"
:file-id (str file-id) :file-id (str file-id)
:label label) :label label)
(db/insert! conn :file-change
{:id id (db/insert! conn :file-change
:revn (:revn file) {:id id
:data (:data file) :revn (:revn file)
:features (:features file) :data (:data file)
:file-id (:id file) :features (:features file)
:label label}) :file-id (:id file)
{:id id}))) :label label}
{::db/return-keys false})
{:id id :label label}))
(defn generate-snapshot-label
[]
(let [ts (-> (dt/now)
(dt/format-instant)
(str/replace #"[T:\.]" "-")
(str/rtrim "Z"))]
(str "snapshot-" ts)))
(def ^:private schema:take-file-snapshot (def ^:private schema:take-file-snapshot
[:map [:file-id ::sm/uuid]]) [:map [:file-id ::sm/uuid]])
@ -137,5 +183,8 @@
::sm/params schema:take-file-snapshot} ::sm/params schema:take-file-snapshot}
[cfg {:keys [::rpc/profile-id] :as params}] [cfg {:keys [::rpc/profile-id] :as params}]
(check-authorized! cfg profile-id) (check-authorized! cfg profile-id)
(db/tx-run! cfg #(take-file-snapshot! % params))) (db/tx-run! cfg (fn [cfg]
(let [params (update params :label (fn [label]
(or label (generate-snapshot-label))))]
(take-file-snapshot! cfg params)))))

View file

@ -12,7 +12,6 @@
[app.db :as db] [app.db :as db]
[app.features.components-v2 :as feat] [app.features.components-v2 :as feat]
[app.main :as main] [app.main :as main]
[app.rpc.commands.files-snapshot :as rpc]
[app.srepl.helpers :as h] [app.srepl.helpers :as h]
[app.svgo :as svgo] [app.svgo :as svgo]
[app.util.cache :as cache] [app.util.cache :as cache]
@ -634,42 +633,3 @@
:file-name (:name file)) :file-name (:name file))
(assoc file :deleted-at (dt/now))) (assoc file :deleted-at (dt/now)))
file)) file))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RESTORE SNAPSHOT
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private sql:snapshots-with-file
"SELECT f.id AS file_id,
fc.id AS id
FROM file AS f
JOIN file_change AS fc ON (fc.file_id = f.id)
WHERE fc.label = ? AND f.id = ANY(?)")
(defn restore-team!
[team-id label & {:keys [rollback?] :or {rollback? true}}]
(let [team-id (h/parse-uuid team-id)
get-file-snapshots
(fn [conn ids]
(let [label (str "migration/" label)]
(db/exec! conn [sql:snapshots-with-file label
(db/create-array conn "uuid" ids)])))
restore-snapshot
(fn [{:keys [::db/conn] :as system}]
(let [ids (into #{} (feat/get-and-lock-files conn team-id))
snap (get-file-snapshots conn ids)
ids' (into #{} (map :file-id) snap)
team (-> (feat/get-team conn team-id)
(update :features disj "components/v2"))]
(when (not= ids ids')
(throw (RuntimeException. "no uniform snapshot available")))
(feat/update-team! conn team)
(run! (partial rpc/restore-file-snapshot! system) snap)))]
(-> (assoc main/system ::db/rollback rollback?)
(db/tx-run! restore-snapshot))))

View file

@ -56,6 +56,75 @@
(d/parse-uuid v) (d/parse-uuid v)
v)) v))
;; (def ^:private sql:get-and-lock-team-files
;; "SELECT f.id
;; FROM file AS f
;; JOIN project AS p ON (p.id = f.project_id)
;; WHERE p.team_id = ?
;; FOR UPDATE")
;; (defn get-and-lock-team-files
;; [conn team-id]
;; (->> (db/exec! conn [sql:get-and-lock-team-files team-id])
;; (into #{} (map :id))))
;; (defn get-team
;; [system team-id]
;; (-> (db/get system :team {:id team-id}
;; {::db/remove-deleted false
;; ::db/check-deleted false})
;; (update :features db/decode-pgarray #{})))
(defn get-file
"Get the migrated data of one file."
([id] (get-file (or *system* main/system) id))
([system id]
(db/run! system
(fn [system]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? false)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(fmg/migrate-file)))))))
(defn update-file!
[system {:keys [id] :as file}]
(let [conn (db/get-connection system)
file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system id)
file))
file)
file (-> file
(update :features db/encode-pgarray conn "text")
(update :data blob/encode))]
(db/update! conn :file
{:revn (:revn file)
:data (:data file)
:features (:features file)
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)})))
(defn update-team!
[system {:keys [id] :as team}]
(let [conn (db/get-connection system)
params (-> team
(update :features db/encode-pgarray conn "text")
(dissoc :id))]
(db/update! conn :team
params
{:id id})
team))
(defn reset-file-data! (defn reset-file-data!
"Hardcode replace of the data of one file." "Hardcode replace of the data of one file."
[id data] [id data]
@ -65,111 +134,90 @@
{:data data} {:data data}
{:id id})))) {:id id}))))
(defn- get-file* (defn validate-file
"Get the migrated data of one file."
[system id]
(db/run! system
(fn [system]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer system id)]
(-> (files/get-file system id :migrate? false)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(fmg/migrate-file))))))
(defn get-file
"Get the migrated data of one file."
[id]
(get-file* main/system id))
(defn validate
"Validate structure, referencial integrity and semantic coherence of "Validate structure, referencial integrity and semantic coherence of
all contents of a file. Returns a list of errors." all contents of a file. Returns a list of errors."
[id] [id]
(db/tx-run! main/system (db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(let [id (if (string? id) (parse-uuid id) id) (let [id (if (string? id) (parse-uuid id) id)
file (get-file* system id) file (get-file system id)
libs (->> (files/get-file-libraries conn id) libs (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}] (into [file] (map (fn [{:keys [id]}]
(get-file* system id)))) (get-file system id))))
(d/index-by :id))] (d/index-by :id))]
(validate/validate-file file libs))))) (validate/validate-file file libs)))))
(defn repair! (defn repair-file*
"Internal helper for validate and repair the file. The operation is
applied multiple times untile file is fixed or max iteration counter
is reached (default 10)"
[system id & {:keys [max-iterations] :or {max-iterations 10}}]
(let [id (parse-uuid id)
validate-and-repair
(fn [file libs iteration]
(when-let [errors (not-empty (validate/validate-file file libs))]
(l/trc :hint "repairing file"
:file-id (str id)
:iteration iteration
:errors (count errors))
(let [changes (repair/repair-file file libs errors)]
(-> file
(update :revn inc)
(update :data cpc/process-changes changes)))))
process-file
(fn [file libs]
(loop [file file
iteration 0]
(if (> iteration max-iterations)
(do
(l/wrn :hint "max retry num reached on repairing file"
:file-id (str id)
:iteration iteration)
file)
(if-let [file (validate-and-repair file libs iteration)]
(recur file (inc iteration))
file))))]
(db/tx-run! system
(fn [{:keys [::db/conn] :as system}]
(let [file (get-file system id)
libs (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(get-file system id))))
(d/index-by :id))
file (process-file file libs)]
(update-file! system file))))))
(defn repair-file!
"Repair the list of errors detected by validation." "Repair the list of errors detected by validation."
[id] [id & {:keys [rollback?] :or {rollback? true} :as opts}]
(db/tx-run! main/system (let [system (or *system* (assoc main/system ::db/rollback rollback?))]
(fn [{:keys [::db/conn] :as system}] (repair-file* system id (dissoc opts :rollback?))))
(let [id (if (string? id) (parse-uuid id) id)
file (get-file* system id)
libs (->> (files/get-file-libraries conn id)
(into [file] (map (fn [{:keys [id]}]
(get-file* system id))))
(d/index-by :id))
errors (validate/validate-file file libs)
changes (repair/repair-file file libs errors)
file (-> file (defn process-file*
(update :revn inc) [system file-id update-fn]
(update :data cpc/process-changes changes)) (let [file (get-file system file-id)
file (-> (update-fn file)
(update :revn inc))]
file (if (contains? (:features file) "fdata/objects-map") (cfv/validate-file-schema! file)
(feat.fdata/enable-objects-map file) (update-file! system file)
file) (dissoc file :data)))
file (if (contains? (:features file) "fdata/pointer-map") (defn process-file!
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system id)
file))
file)]
(db/update! conn :file
{:revn (:revn file)
:data (blob/encode (:data file))
:data-backend nil
:modified-at (dt/now)
:has-media-trimmed false}
{:id (:id file)})
:repaired))))
(defn update-file!
"Apply a function to the data of one file. Optionally save the changes or not. "Apply a function to the data of one file. Optionally save the changes or not.
The function receives the decoded and migrated file data." The function receives the decoded and migrated file data."
[& {:keys [update-fn id rollback? inc-revn?] [& {:keys [update-fn id rollback?]
:or {rollback? true inc-revn? true}}] :or {rollback? true}}]
(letfn [(process-file [{:keys [::db/conn] :as system} file-id]
(let [file (get-file* system file-id)
file (cond-> (update-fn file)
inc-revn? (update :revn inc))
_ (cfv/validate-file-schema! file) (let [system (or *system* (assoc main/system ::db/rollback rollback?))]
(db/tx-run! system
file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! system id)
file))
file)]
(db/update! conn :file
{:data (blob/encode (:data file))
:features (db/create-array conn "text" (:features file))
:revn (:revn file)}
{:id (:id file)})
(dissoc file :data)))]
(db/tx-run! (or *system* (assoc main/system ::db/rollback rollback?))
(fn [system] (fn [system]
(binding [*system* system] (binding [*system* system]
(process-file system id)))))) (process-file* system id update-fn))))))
(def ^:private sql:get-file-ids (def ^:private sql:get-file-ids
@ -196,11 +244,11 @@
(strace/print-stack-trace cause)) (strace/print-stack-trace cause))
(process-file [{:keys [::db/conn] :as system} file-id] (process-file [{:keys [::db/conn] :as system} file-id]
(let [file (get-file* system file-id) (let [file (get-file system file-id)
libs (when with-libraries? libs (when with-libraries?
(->> (files/get-file-libraries conn file-id) (->> (files/get-file-libraries conn file-id)
(into [file] (map (fn [{:keys [id]}] (into [file] (map (fn [{:keys [id]}]
(get-file* system id)))) (get-file system id))))
(d/index-by :id)))] (d/index-by :id)))]
(try (try
(if with-libraries? (if with-libraries?
@ -220,38 +268,6 @@
(when (fn? on-end) (when (fn? on-end)
(ex/ignoring (on-end))))))))) (ex/ignoring (on-end)))))))))
(defn repair-file-media
[{:keys [id data] :as file}]
(let [conn (db/get-connection *system*)
used (bfc/collect-used-media data)
ids (db/create-array conn "uuid" used)
sql (str "SELECT * FROM file_media_object WHERE id = ANY(?)")
rows (db/exec! conn [sql ids])
index (reduce (fn [index media]
(if (not= (:file-id media) id)
(let [media-id (uuid/next)]
(l/wrn :hint "found not referenced media"
:file-id (str id)
:media-id (str (:id media)))
(db/insert! *system* :file-media-object
(-> media
(assoc :file-id id)
(assoc :id media-id)))
(assoc index (:id media) media-id))
index))
{}
rows)]
(when (seq index)
(binding [bfc/*state* (atom {:index index})]
(update file :data (fn [fdata]
(-> fdata
(update :pages-index #'bfc/relink-shapes)
(update :components #'bfc/relink-shapes)
(update :media #'bfc/relink-media)
(d/without-nils))))))))
(defn process-files! (defn process-files!
"Apply a function to all files in the database" "Apply a function to all files in the database"
[& {:keys [max-items [& {:keys [max-items
@ -281,7 +297,7 @@
(l/trc :hint "process:file:start" :file-id (str file-id) :index idx) (l/trc :hint "process:file:start" :file-id (str file-id) :index idx)
(db/tx-run! (assoc main/system ::db/rollback rollback?) (db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [{:keys [::db/conn] :as system}] (fn [{:keys [::db/conn] :as system}]
(let [file' (get-file* system file-id) (let [file' (get-file system file-id)
file (binding [*system* system] file (binding [*system* system]
(on-file file'))] (on-file file'))]
@ -350,3 +366,37 @@
:rollback rollback? :rollback rollback?
:elapsed elapsed)))))) :elapsed elapsed))))))
(defn repair-file-media
"A helper intended to be used with `process-files!` that fixes all
not propertly referenced file-media-object for a file"
[{:keys [id data] :as file}]
(let [conn (db/get-connection *system*)
used (bfc/collect-used-media data)
ids (db/create-array conn "uuid" used)
sql (str "SELECT * FROM file_media_object WHERE id = ANY(?)")
rows (db/exec! conn [sql ids])
index (reduce (fn [index media]
(if (not= (:file-id media) id)
(let [media-id (uuid/next)]
(l/wrn :hint "found not referenced media"
:file-id (str id)
:media-id (str (:id media)))
(db/insert! *system* :file-media-object
(-> media
(assoc :file-id id)
(assoc :id media-id)))
(assoc index (:id media) media-id))
index))
{}
rows)]
(when (seq index)
(binding [bfc/*state* (atom {:index index})]
(update file :data (fn [fdata]
(-> fdata
(update :pages-index #'bfc/relink-shapes)
(update :components #'bfc/relink-shapes)
(update :media #'bfc/relink-media)
(d/without-nils))))))))

View file

@ -19,7 +19,8 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.features.fdata :as features.fdata] [app.features.components-v2 :as feat.comp-v2]
[app.features.fdata :as feat.fdata]
[app.main :as main] [app.main :as main]
[app.msgbus :as mbus] [app.msgbus :as mbus]
[app.rpc.commands.auth :as auth] [app.rpc.commands.auth :as auth]
@ -38,7 +39,11 @@
[clojure.tools.namespace.repl :as repl] [clojure.tools.namespace.repl :as repl]
[cuerdas.core :as str])) [cuerdas.core :as str]))
(defn print-available-tasks ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TASKS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn print-tasks
[] []
(let [tasks (:app.worker/registry main/system)] (let [tasks (:app.worker/registry main/system)]
(p/pprint (keys tasks) :level 200))) (p/pprint (keys tasks) :level 200)))
@ -84,6 +89,10 @@
(auth/send-email-verification! pool sprops profile) (auth/send-email-verification! pool sprops profile)
:email-sent)) :email-sent))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PROFILES MANAGEMENT
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn mark-profile-as-active! (defn mark-profile-as-active!
"Mark the profile blocked and removes all the http sessiones "Mark the profile blocked and removes all the http sessiones
associated with the profile-id." associated with the profile-id."
@ -121,19 +130,28 @@
(let [email (str/lower email)] (let [email (str/lower email)]
(db/exec! conn ["update profile set password=? where email=?" password email])))))) (db/exec! conn ["update profile set password=? where email=?" password email]))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FEATURES
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn enable-objects-map-feature-on-file! (defn enable-objects-map-feature-on-file!
[& {:keys [save? id]}] [& {:keys [save? id]}]
(h/update-file! main/system (h/process-file! main/system
:id id :id id
:update-fn features.fdata/enable-objects-map :update-fn feat.fdata/enable-objects-map
:save? save?)) :save? save?))
(defn enable-pointer-map-feature-on-file! (defn enable-pointer-map-feature-on-file!
[& {:keys [save? id]}] [& {:keys [save? id]}]
(h/update-file! main/system (h/process-file! main/system
:id id :id id
:update-fn features.fdata/enable-pointer-map :update-fn feat.fdata/enable-pointer-map
:save? save?)) :save? save?))
(defn enable-storage-features-on-file!
[& {:as params}]
(enable-objects-map-feature-on-file! main/system params)
(enable-pointer-map-feature-on-file! main/system params))
(defn enable-team-feature! (defn enable-team-feature!
[team-id feature] [team-id feature]
@ -171,57 +189,10 @@
{:id team-id}) {:id team-id})
:disabled)))))) :disabled))))))
(defn enable-storage-features-on-file! ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
[& {:as params}] ;; NOTIFICATIONS
(enable-objects-map-feature-on-file! main/system params) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(enable-pointer-map-feature-on-file! main/system params))
(defn instrument-var
[var]
(alter-var-root var (fn [f]
(let [mf (meta f)]
(if (::original mf)
f
(with-meta
(fn [& params]
(tap> params)
(let [result (apply f params)]
(tap> result)
result))
{::original f}))))))
(defn uninstrument-var
[var]
(alter-var-root var (fn [f]
(or (::original (meta f)) f))))
(defn take-file-snapshot!
"An internal helper that persist the file snapshot using non-gc
collectable file-changes entry."
[& {:keys [file-id label]}]
(let [file-id (h/parse-uuid file-id)]
(db/tx-run! main/system fsnap/take-file-snapshot! {:file-id file-id :label label})))
(defn restore-file-snapshot!
[& {:keys [file-id id]}]
(db/tx-run! main/system
(fn [cfg]
(let [file-id (h/parse-uuid file-id)
id (h/parse-uuid id)]
(if (and (uuid? id) (uuid? file-id))
(fsnap/restore-file-snapshot! cfg {:id id :file-id file-id})
(println "=> invalid parameters"))))))
(defn list-file-snapshots!
[& {:keys [file-id limit]}]
(db/tx-run! main/system
(fn [system]
(let [params {:file-id (h/parse-uuid file-id)
:limit limit}]
(->> (fsnap/get-file-snapshots system (d/without-nils params))
(print-table [:id :revn :created-at :label]))))))
(defn notify! (defn notify!
[{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level] [{:keys [::mbus/msgbus ::db/pool]} & {:keys [dest code message level]
@ -321,6 +292,121 @@
(into #{}) (into #{})
(run! send)))) (run! send))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SNAPSHOTS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn take-file-snapshot!
"An internal helper that persist the file snapshot using non-gc
collectable file-changes entry."
[& {:keys [file-id label]}]
(let [file-id (h/parse-uuid file-id)]
(db/tx-run! main/system fsnap/take-file-snapshot! {:file-id file-id :label label})))
(defn restore-file-snapshot!
[& {:keys [file-id id]}]
(db/tx-run! main/system
(fn [cfg]
(let [file-id (h/parse-uuid file-id)
id (h/parse-uuid id)]
(if (and (uuid? id) (uuid? file-id))
(fsnap/restore-file-snapshot! cfg {:id id :file-id file-id})
(println "=> invalid parameters"))))))
(defn list-file-snapshots!
[& {:keys [file-id limit]}]
(db/tx-run! main/system
(fn [system]
(let [params {:file-id (h/parse-uuid file-id)
:limit limit}]
(->> (fsnap/get-file-snapshots system (d/without-nils params))
(print-table [:id :revn :created-at :label]))))))
(defn take-team-snapshot!
[& {:keys [team-id label rollback?]
:or {rollback? true}}]
(let [team-id (h/parse-uuid team-id)
label (or label (fsnap/generate-snapshot-label))
take-snapshot
(fn [{:keys [::db/conn] :as system}]
(->> (feat.comp-v2/get-and-lock-team-files conn team-id)
(map (fn [file-id]
{:file-id file-id
:label label}))
(run! (partial fsnap/take-file-snapshot! system))))]
(-> (assoc main/system ::db/rollback rollback?)
(db/tx-run! take-snapshot))))
(def ^:private sql:snapshots-with-file
"WITH files AS (
SELECT f.id AS file_id,
(SELECT fc.id
FROM file_change AS fc
WHERE fc.label = ?
AND fc.file_id = f.id
ORDER BY fc.created_at DESC
LIMIT 1) AS id
FROM file AS f
) SELECT * FROM files
WHERE file_id = ANY(?)
AND id IS NOT NULL")
(defn restore-team-snapshot!
"Restore a snapshot on all files of the team. The snapshot should
exists for all files; if is not the case, an exception is raised."
[& {:keys [team-id label rollback?] :or {rollback? true}}]
(let [team-id (h/parse-uuid team-id)
get-file-snapshots
(fn [conn ids]
(db/exec! conn [sql:snapshots-with-file label
(db/create-array conn "uuid" ids)]))
restore-snapshot
(fn [{:keys [::db/conn] :as system}]
(let [ids (->> (feat.comp-v2/get-and-lock-team-files conn team-id)
(into #{}))
snap (get-file-snapshots conn ids)
ids' (into #{} (map :file-id) snap)
team (-> (feat.comp-v2/get-team conn team-id)
(update :features disj "components/v2"))]
(when (not= ids ids')
(throw (RuntimeException. "no uniform snapshot available")))
(feat.comp-v2/update-team! conn team)
(run! (partial fsnap/restore-file-snapshot! system) snap)))]
(-> (assoc main/system ::db/rollback rollback?)
(db/tx-run! restore-snapshot))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; MISC
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn instrument-var
[var]
(alter-var-root var (fn [f]
(let [mf (meta f)]
(if (::original mf)
f
(with-meta
(fn [& params]
(tap> params)
(let [result (apply f params)]
(tap> result)
result))
{::original f}))))))
(defn uninstrument-var
[var]
(alter-var-root var (fn [f]
(or (::original (meta f)) f))))
(defn duplicate-team (defn duplicate-team
[team-id & {:keys [name]}] [team-id & {:keys [name]}]
(let [team-id (h/parse-uuid team-id)] (let [team-id (h/parse-uuid team-id)]
@ -337,4 +423,3 @@
(assoc :team-id (:id team)))] (assoc :team-id (:id team)))]
(db/insert! conn :team-profile-rel params (db/insert! conn :team-profile-rel params
{::db/return-keys false})))))))) {::db/return-keys false}))))))))