diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index 03f56b2bb1..13046c15c0 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -5,7 +5,7 @@ ;; Copyright (c) KALEIDOS INC (ns app.db - (:refer-clojure :exclude [get]) + (:refer-clojure :exclude [get run!]) (:require [app.common.data :as d] [app.common.exceptions :as ex] @@ -391,6 +391,52 @@ ([^Connection conn ^Savepoint sp] (.rollback conn sp))) +(defn tx-run! + [cfg f] + (cond + (connection? cfg) + (tx-run! {::conn cfg} f) + + (pool? cfg) + (tx-run! {::pool cfg} f) + + (::conn cfg) + (let [conn (::conn cfg) + sp (savepoint conn)] + (try + (let [result (f cfg)] + (release! conn sp) + result) + (catch Throwable cause + (rollback! sp) + (throw cause)))) + + (::pool cfg) + (with-atomic [conn (::pool cfg)] + (f (assoc cfg ::conn conn))) + + :else + (throw (IllegalArgumentException. "invalid arguments")))) + +(defn run! + [cfg f] + (cond + (connection? cfg) + (run! {::conn cfg} f) + + (pool? cfg) + (run! {::pool cfg} f) + + (::conn cfg) + (f cfg) + + (::pool cfg) + (with-open [^Connection conn (open (::pool cfg))] + (f (assoc cfg ::conn conn))) + + :else + (throw (IllegalArgumentException. "invalid arguments")))) + (defn interval [o] (cond diff --git a/backend/src/app/srepl/helpers.clj b/backend/src/app/srepl/helpers.clj index 96fbe6c4ca..5c53270b3f 100644 --- a/backend/src/app/srepl/helpers.clj +++ b/backend/src/app/srepl/helpers.clj @@ -6,6 +6,7 @@ (ns app.srepl.helpers "A main namespace for server repl." + (:refer-clojure :exclude [parse-uuid]) #_:clj-kondo/ignore (:require [app.auth :refer [derive-password]] @@ -39,6 +40,26 @@ (def ^:dynamic *conn*) (def ^:dynamic *pool*) +(defn println! + [& params] + (locking println + (apply println params))) + +(defn parse-uuid + [v] + (if (uuid? v) + v + (d/parse-uuid v))) + +(defn resolve-connectable + [o] + (if (db/connection? o) + o + (if (db/pool? o) + o + (or (::db/conn o) + (::db/pool o))))) + (defn reset-password! "Reset a password to a specific one for a concrete user or all users if email is `:all` keyword." @@ -104,7 +125,7 @@ (dissoc file :data)))))) (def ^:private sql:retrieve-files-chunk - "SELECT id, name, created_at, data FROM file + "SELECT id, name, created_at, revn, data FROM file WHERE created_at < ? AND deleted_at is NULL ORDER BY created_at desc LIMIT ?") @@ -150,11 +171,6 @@ (when (fn? on-end) (on-end)))) -(defn- println! - [& params] - (locking println - (apply println params))) - (defn process-files! "Apply a function to all files in the database, reading them in batches." diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 77413995ba..6ce43b6858 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -8,20 +8,24 @@ "A collection of adhoc fixes scripts." #_:clj-kondo/ignore (:require + [app.common.data :as d] [app.common.logging :as l] [app.common.pprint :as p] [app.common.spec :as us] + [app.common.uuid :as uuid] [app.db :as db] + [app.media :as media] [app.rpc.commands.auth :as auth] [app.rpc.commands.profile :as profile] [app.srepl.fixes :as f] [app.srepl.helpers :as h] + [app.storage :as sto] [app.util.blob :as blob] [app.util.objects-map :as omap] [app.util.pointer-map :as pmap] [app.util.time :as dt] [app.worker :as wrk] - [clojure.pprint :refer [pprint]] + [clojure.pprint :refer [pprint print-table]] [cuerdas.core :as str])) (defn print-available-tasks @@ -164,3 +168,98 @@ (alter-var-root var (fn [f] (or (::original (meta f)) f)))) +(defn take-file-snapshot! + "An internal helper that persist the file snapshot using non-gc + collectable file-changes entry." + [system & {:keys [file-id label]}] + (let [label (or label (str "Snapshot at " (dt/format-instant (dt/now) :rfc1123))) + file-id (h/parse-uuid file-id) + id (uuid/next)] + (db/tx-run! system + (fn [{:keys [::db/conn]}] + (when-let [file (db/get* conn :file {:id file-id})] + (h/println! "=> persisting snapshot for" file-id) + (db/insert! conn :file-change + {:id id + :revn (:revn file) + :data (:data file) + :features (:features file) + :file-id (:id file) + :label label}) + id))))) + +(defn restore-file-snapshot! + [system & {:keys [file-id id label]}] + (letfn [(restore-snapshot! [{:keys [::db/conn ::sto/storage]} file-id snapshot] + (when (and (some? snapshot) + (some? (:data snapshot))) + + (h/println! "-> snapshot found:" (:id snapshot)) + (h/println! "-> restoring it on file:" file-id) + (db/update! conn :file + {:data (:data snapshot)} + {:id file-id}) + + ;; clean object thumbnails + (let [sql (str "delete from file_object_thumbnail " + " where file_id=? returning media_id") + res (db/exec! conn [sql file-id])] + (doseq [media-id (into #{} (keep :media-id) res)] + (sto/del-object! storage media-id))))) + + (execute [{:keys [::db/conn] :as cfg}] + (let [file-id (h/parse-uuid file-id) + id (h/parse-uuid id) + cfg (update cfg ::sto/storage media/configure-assets-storage conn)] + + (cond + (and (uuid? id) (uuid? file-id)) + (let [params {:id id :file-id file-id} + options {:columns [:id :data :revn]} + snapshot (db/get* conn :file-change params options)] + (restore-snapshot! cfg file-id snapshot)) + + (uuid? file-id) + (let [params (cond-> {:file-id file-id} + (string? label) + (assoc :label label)) + options {:columns [:id :data :revn]} + snapshot (db/get* conn :file-change params options)] + (restore-snapshot! cfg file-id snapshot)) + + :else + (println "=> invalid parameters"))))] + + (db/tx-run! system execute))) + +(defn list-file-snapshots! + [system & {:keys [file-id limit chunk-size start-at] + :or {chunk-size 10 limit Long/MAX_VALUE}}] + + (letfn [(get-chunk [ds cursor] + (let [query (str "select id, label, revn, created_at " + " from file_change " + " where file_id = ? " + " and created_at < ? " + " and label is not null " + " and data is not null " + " order by created_at desc " + " limit ?") + file-id (if (string? file-id) + (d/parse-uuid file-id) + file-id) + rows (db/exec! ds [query file-id cursor chunk-size])] + [(some->> rows peek :created-at) (seq rows)])) + + (get-candidates [ds] + (->> (d/iteration (partial get-chunk ds) + :vf second + :kf first + :initk (or start-at (dt/now))) + (take limit)))] + + (db/tx-run! system (fn [system] + (->> (fsnap/get-file-snapshots + (map (fn [row] + (update row :created-at dt/format-instant :rfc1123))) + (print-table [:id :revn :created-at :label]))))))