♻️ Remove deprecated with-atomic and refactor tx-run! (#5915)

* ♻️ Remove deprecated with-atomic and refactor tx-run!

*  Do not hold open connection for the whole clone-template operation
This commit is contained in:
Andrey Antukh 2025-02-24 11:15:44 +01:00 committed by GitHub
parent bcea19001e
commit 3074fc9ab5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 441 additions and 468 deletions

View file

@ -22,7 +22,8 @@
[clojure.set :as set] [clojure.set :as set]
[integrant.core :as ig] [integrant.core :as ig]
[next.jdbc :as jdbc] [next.jdbc :as jdbc]
[next.jdbc.date-time :as jdbc-dt]) [next.jdbc.date-time :as jdbc-dt]
[next.jdbc.transaction])
(:import (:import
com.zaxxer.hikari.HikariConfig com.zaxxer.hikari.HikariConfig
com.zaxxer.hikari.HikariDataSource com.zaxxer.hikari.HikariDataSource
@ -223,16 +224,6 @@
(let [^OutputStream os (.getOutputStream ^LargeObject lobj)] (let [^OutputStream os (.getOutputStream ^LargeObject lobj)]
(io/make-output-stream os opts)))) (io/make-output-stream os opts))))
(defmacro with-atomic
[& args]
(if (symbol? (first args))
(let [cfgs (first args)
body (rest args)]
`(jdbc/with-transaction [conn# (::pool ~cfgs)]
(let [~cfgs (assoc ~cfgs ::conn conn#)]
~@body)))
`(jdbc/with-transaction ~@args)))
(defn open (defn open
[system-or-pool] [system-or-pool]
(if (pool? system-or-pool) (if (pool? system-or-pool)
@ -535,43 +526,31 @@
(l/trc :hint "explicit rollback requested (savepoint)") (l/trc :hint "explicit rollback requested (savepoint)")
(.rollback conn sp)))) (.rollback conn sp))))
(defn transact!
"A lower-level function for executing function in a transaction"
([transactable f] (transact! transactable f {}))
([transactable f opts]
(binding [next.jdbc.transaction/*nested-tx* :ignore]
(jdbc/transact transactable f opts))))
(defn tx-run! (defn tx-run!
"Run a function in a transaction."
[system f & params] [system f & params]
(cond (if (connection? system)
(connection? system)
(tx-run! {::conn system} f) (tx-run! {::conn system} f)
(if (pool? system)
(pool? system)
(tx-run! {::pool system} f) (tx-run! {::pool system} f)
(if-let [conn (or (::conn system)
(::conn system) (::pool system))]
(let [conn (::conn system) (transact! conn
sp (savepoint conn)] (fn [conn]
(try
(let [system' (-> system (let [system' (-> system
(assoc ::savepoint sp) (dissoc ::rollback)
(dissoc ::rollback)) (assoc ::conn conn))]
result (apply f system' params)] (apply f system' params)))
(if (::rollback system) {:rollback-only (::rollback system)
(rollback! conn sp) :read-only (::read-only system)})
(release! conn sp)) (throw (IllegalArgumentException. "invalid system/cfg provided"))))))
result)
(catch Throwable cause
(.rollback ^Connection conn ^Savepoint sp)
(throw cause))))
(::pool system)
(with-atomic [conn (::pool system)]
(let [system' (-> system
(assoc ::conn conn)
(dissoc ::rollback))
result (apply f system' params)]
(when (::rollback system)
(rollback! conn))
result))
:else
(throw (IllegalArgumentException. "invalid system/cfg provided"))))
(defn run! (defn run!
[system f & params] [system f & params]

View file

@ -337,16 +337,17 @@
or (updated_at is null and or (updated_at is null and
created_at < now() - ?::interval)") created_at < now() - ?::interval)")
(defmethod ig/init-key ::tasks/gc (defn- collect-expired-tasks
[_ {:keys [::db/pool ::tasks/max-age] :as cfg}] [{:keys [::db/conn ::tasks/max-age]}]
(l/debug :hint "initializing session gc task" :max-age max-age)
(fn [_]
(db/with-atomic [conn pool]
(let [interval (db/interval max-age) (let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-expired interval interval]) result (db/exec-one! conn [sql:delete-expired interval interval])
result (:next.jdbc/update-count result)] result (:next.jdbc/update-count result)]
(l/debug :task "gc" (l/debug :task "gc"
:hint "clean http sessions" :hint "clean http sessions"
:deleted result) :deleted result)
result)))) result))
(defmethod ig/init-key ::tasks/gc
[_ {:keys [::tasks/max-age] :as cfg}]
(l/debug :hint "initializing session gc task" :max-age max-age)
(fn [_] (db/tx-run! cfg collect-expired-tasks)))

View file

@ -43,13 +43,8 @@
(decode-row token))) (decode-row token)))
(defn repl:create-access-token (defn repl:create-access-token
[{:keys [::db/pool] :as system} profile-id name expiration] [cfg profile-id name expiration]
(db/with-atomic [conn pool] (db/tx-run! cfg create-access-token profile-id name expiration))
(let [props (:app.setup/props system)]
(create-access-token {::db/conn conn ::setup/props props}
profile-id
name
expiration))))
(def ^:private schema:create-access-token (def ^:private schema:create-access-token
[:map {:title "create-access-token"} [:map {:title "create-access-token"}

View file

@ -149,7 +149,7 @@
;; ---- COMMAND: Recover Profile ;; ---- COMMAND: Recover Profile
(defn recover-profile (defn recover-profile
[{:keys [::db/pool] :as cfg} {:keys [token password]}] [{:keys [::db/conn] :as cfg} {:keys [token password]}]
(letfn [(validate-token [token] (letfn [(validate-token [token]
(let [tdata (tokens/verify (::setup/props cfg) {:token token :iss :password-recovery})] (let [tdata (tokens/verify (::setup/props cfg) {:token token :iss :password-recovery})]
(:profile-id tdata))) (:profile-id tdata)))
@ -159,10 +159,10 @@
(db/update! conn :profile {:password pwd :is-active true} {:id profile-id}) (db/update! conn :profile {:password pwd :is-active true} {:id profile-id})
nil))] nil))]
(db/with-atomic [conn pool]
(->> (validate-token token) (->> (validate-token token)
(update-password conn)) (update-password conn))
nil)))
nil))
(def schema:recover-profile (def schema:recover-profile
[:map {:title "recover-profile"} [:map {:title "recover-profile"}
@ -173,7 +173,8 @@
{::rpc/auth false {::rpc/auth false
::doc/added "1.15" ::doc/added "1.15"
::sm/params schema:recover-profile ::sm/params schema:recover-profile
::climit/id :auth/global} ::climit/id :auth/global
::db/transaction true}
[cfg params] [cfg params]
(recover-profile cfg params)) (recover-profile cfg params))

View file

@ -27,7 +27,7 @@
{::rpc/auth false {::rpc/auth false
::doc/added "1.15" ::doc/added "1.15"
::doc/changes ["1.15" "This method is migrated from mutations to commands."]} ::doc/changes ["1.15" "This method is migrated from mutations to commands."]}
[{:keys [::db/pool] :as cfg} _] [cfg _]
(when-not (contains? cf/flags :demo-users) (when-not (contains? cf/flags :demo-users)
(ex/raise :type :validation (ex/raise :type :validation
@ -49,9 +49,11 @@
:password (profile/derive-password cfg password) :password (profile/derive-password cfg password)
:props {}}] :props {}}]
(db/with-atomic [conn pool]
(let [profile (->> (auth/create-profile! conn params) (let [profile (db/tx-run! cfg (fn [{:keys [::db/conn]}]
(auth/create-profile-rels! conn))] (->> (auth/create-profile! conn params)
(auth/create-profile-rels! conn))))]
(with-meta {:email email (with-meta {:email email
:password password} :password password}
{::audit/profile-id (:id profile)}))))) {::audit/profile-id (:id profile)}))))

View file

@ -803,17 +803,17 @@
[:id ::sm/uuid] [:id ::sm/uuid]
[:name [:string {:max 250}]] [:name [:string {:max 250}]]
[:created-at ::dt/instant] [:created-at ::dt/instant]
[:modified-at ::dt/instant]]} [:modified-at ::dt/instant]]
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(check-edition-permissions! conn profile-id id) (check-edition-permissions! conn profile-id id)
(let [file (rename-file conn params)] (let [file (rename-file conn params)]
(rph/with-meta (rph/with-meta
(select-keys file [:id :name :created-at :modified-at]) (select-keys file [:id :name :created-at :modified-at])
{::audit/props {:project-id (:project-id file) {::audit/props {:project-id (:project-id file)
:created-at (:created-at file) :created-at (:created-at file)
:modified-at (:modified-at file)}})))) :modified-at (:modified-at file)}})))
;; --- MUTATION COMMAND: set-file-shared ;; --- MUTATION COMMAND: set-file-shared
@ -1005,15 +1005,17 @@
{::doc/added "1.17" {::doc/added "1.17"
::webhooks/event? true ::webhooks/event? true
::sm/params schema:link-file-to-library} ::sm/params schema:link-file-to-library}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id library-id] :as params}] [cfg {:keys [::rpc/profile-id file-id library-id] :as params}]
(when (= file-id library-id) (when (= file-id library-id)
(ex/raise :type :validation (ex/raise :type :validation
:code :invalid-library :code :invalid-library
:hint "A file cannot be linked to itself")) :hint "A file cannot be linked to itself"))
(db/with-atomic [conn pool]
(db/tx-run! cfg
(fn [{:keys [::db/conn]}]
(check-edition-permissions! conn profile-id file-id) (check-edition-permissions! conn profile-id file-id)
(check-edition-permissions! conn profile-id library-id) (check-edition-permissions! conn profile-id library-id)
(link-file-to-library conn params))) (link-file-to-library conn params))))
;; --- MUTATION COMMAND: unlink-file-from-library ;; --- MUTATION COMMAND: unlink-file-from-library
@ -1031,12 +1033,12 @@
(sv/defmethod ::unlink-file-from-library (sv/defmethod ::unlink-file-from-library
{::doc/added "1.17" {::doc/added "1.17"
::webhooks/event? true ::webhooks/event? true
::sm/params schema:unlink-file-to-library} ::sm/params schema:unlink-file-to-library
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(check-edition-permissions! conn profile-id file-id) (check-edition-permissions! conn profile-id file-id)
(unlink-file-from-library conn params) (unlink-file-from-library conn params)
nil)) nil)
;; --- MUTATION COMMAND: update-sync ;; --- MUTATION COMMAND: update-sync
@ -1056,12 +1058,11 @@
(sv/defmethod ::update-file-library-sync-status (sv/defmethod ::update-file-library-sync-status
"Update the synchronization status of a file->library link" "Update the synchronization status of a file->library link"
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:update-file-library-sync-status} ::sm/params schema:update-file-library-sync-status
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id file-id] :as params}]
(check-edition-permissions! conn profile-id file-id) (check-edition-permissions! conn profile-id file-id)
(update-sync conn params))) (update-sync conn params))
;; --- MUTATION COMMAND: ignore-sync ;; --- MUTATION COMMAND: ignore-sync
@ -1082,9 +1083,9 @@
(sv/defmethod ::ignore-file-library-sync-status (sv/defmethod ::ignore-file-library-sync-status
"Ignore updates in linked files" "Ignore updates in linked files"
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:ignore-file-library-sync-status} ::sm/params schema:ignore-file-library-sync-status
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id file-id] :as params}]
(check-edition-permissions! conn profile-id file-id) (check-edition-permissions! conn profile-id file-id)
(-> (ignore-sync conn params) (-> (ignore-sync conn params)
(update :features db/decode-pgarray #{})))) (update :features db/decode-pgarray #{})))

View file

@ -33,11 +33,11 @@
pages of a file with specific permissions (who-comment and who-inspect)." pages of a file with specific permissions (who-comment and who-inspect)."
{::doc/added "1.18" {::doc/added "1.18"
::doc/module :files ::doc/module :files
::sm/params schema:create-share-link} ::sm/params schema:create-share-link
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id file-id] :as params}]
(files/check-edition-permissions! conn profile-id file-id) (files/check-edition-permissions! conn profile-id file-id)
(create-share-link conn (assoc params :profile-id profile-id)))) (create-share-link conn (assoc params :profile-id profile-id)))
(defn create-share-link (defn create-share-link
[conn {:keys [profile-id file-id pages who-comment who-inspect]}] [conn {:keys [profile-id file-id pages who-comment who-inspect]}]
@ -61,10 +61,10 @@
(sv/defmethod ::delete-share-link (sv/defmethod ::delete-share-link
{::doc/added "1.18" {::doc/added "1.18"
::doc/module ::files ::doc/module ::files
::sm/params schema:delete-share-link} ::sm/params schema:delete-share-link
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id id] :as params}]
(let [slink (db/get-by-id conn :share-link id)] (let [slink (db/get-by-id conn :share-link id)]
(files/check-edition-permissions! conn profile-id (:file-id slink)) (files/check-edition-permissions! conn profile-id (:file-id slink))
(db/delete! conn :share-link {:id id}) (db/delete! conn :share-link {:id id})
nil))) nil))

View file

@ -396,8 +396,8 @@
;; --- COMMAND: Clone Template ;; --- COMMAND: Clone Template
(defn clone-template (defn clone-template
[cfg {:keys [project-id profile-id] :as params} template] [{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [project-id profile-id] :as params} template]
(db/tx-run! cfg (fn [{:keys [::db/conn ::wrk/executor] :as cfg}]
;; NOTE: the importation process performs some operations ;; NOTE: the importation process performs some operations
;; that are not very friendly with virtual threads, and for ;; that are not very friendly with virtual threads, and for
;; avoid unexpected blocking of other concurrent operations ;; avoid unexpected blocking of other concurrent operations
@ -408,9 +408,10 @@
:min-age "30m") :min-age "30m")
format (bfc/parse-file-format template) format (bfc/parse-file-format template)
team (teams/get-team conn team (teams/get-team pool
:profile-id profile-id :profile-id profile-id
:project-id project-id) :project-id project-id)
cfg (-> cfg cfg (-> cfg
(assoc ::bfc/project-id project-id) (assoc ::bfc/project-id project-id)
(assoc ::bfc/profile-id profile-id) (assoc ::bfc/profile-id profile-id)
@ -421,9 +422,12 @@
(px/invoke! executor (partial bf.v3/import-files! cfg)) (px/invoke! executor (partial bf.v3/import-files! cfg))
(px/invoke! executor (partial bf.v1/import-files! cfg)))] (px/invoke! executor (partial bf.v1/import-files! cfg)))]
(db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}]
(db/update! conn :project (db/update! conn :project
{:modified-at (dt/now)} {:modified-at (dt/now)}
{:id project-id}) {:id project-id}
{::db/return-keys false})
(let [props (audit/clean-props params)] (let [props (audit/clean-props params)]
(doseq [file-id result] (doseq [file-id result]
@ -432,9 +436,9 @@
(assoc ::audit/profile-id profile-id) (assoc ::audit/profile-id profile-id)
(assoc ::audit/name "create-file") (assoc ::audit/name "create-file")
(assoc ::audit/props props))] (assoc ::audit/props props))]
(audit/submit! cfg event)))) (audit/submit! cfg event))))))
result)))) result))
(def ^:private (def ^:private
schema:clone-template schema:clone-template

View file

@ -273,15 +273,14 @@
(sv/defmethod ::clone-file-media-object (sv/defmethod ::clone-file-media-object
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:clone-file-media-object} ::sm/params schema:clone-file-media-object
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(files/check-edition-permissions! conn profile-id file-id) (files/check-edition-permissions! conn profile-id file-id)
(-> (assoc cfg :conn conn) (clone-file-media-object cfg params))
(clone-file-media-object params))))
(defn clone-file-media-object (defn clone-file-media-object
[{:keys [conn]} {:keys [id file-id is-local]}] [{:keys [::db/conn]} {:keys [id file-id is-local]}]
(let [mobj (db/get-by-id conn :file-media-object id)] (let [mobj (db/get-by-id conn :file-media-object id)]
(db/insert! conn :file-media-object (db/insert! conn :file-media-object
{:id (uuid/next) {:id (uuid/next)

View file

@ -124,9 +124,9 @@
(sv/defmethod ::update-profile (sv/defmethod ::update-profile
{::doc/added "1.0" {::doc/added "1.0"
::sm/params schema:update-profile ::sm/params schema:update-profile
::sm/result schema:profile} ::sm/result schema:profile
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id fullname lang theme] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id fullname lang theme] :as params}]
;; NOTE: we need to retrieve the profile independently if we use ;; NOTE: we need to retrieve the profile independently if we use
;; it or not for explicit locking and avoid concurrent updates of ;; it or not for explicit locking and avoid concurrent updates of
;; the same row/object. ;; the same row/object.
@ -149,7 +149,7 @@
(-> profile (-> profile
(strip-private-attrs) (strip-private-attrs)
(d/without-nils) (d/without-nils)
(rph/with-meta {::audit/props (audit/profile->props profile)}))))) (rph/with-meta {::audit/props (audit/profile->props profile)}))))
;; --- MUTATION: Update Password ;; --- MUTATION: Update Password
@ -168,10 +168,9 @@
(sv/defmethod ::update-profile-password (sv/defmethod ::update-profile-password
{::doc/added "1.0" {::doc/added "1.0"
::sm/params schema:update-profile-password ::sm/params schema:update-profile-password
::climit/id :auth/global} ::climit/id :auth/global
::db/transaction true}
[cfg {:keys [::rpc/profile-id password] :as params}] [cfg {:keys [::rpc/profile-id password] :as params}]
(db/tx-run! cfg (fn [cfg]
(let [profile (validate-password! cfg (assoc params :profile-id profile-id)) (let [profile (validate-password! cfg (assoc params :profile-id profile-id))
session-id (::session/id params)] session-id (::session/id params)]
@ -182,7 +181,7 @@
(update-profile-password! cfg (assoc profile :password password)) (update-profile-password! cfg (assoc profile :password password))
(invalidate-profile-session! cfg profile-id session-id) (invalidate-profile-session! cfg profile-id session-id)
nil)))) nil))
(defn- invalidate-profile-session! (defn- invalidate-profile-session!
"Removes all sessions except the current one." "Removes all sessions except the current one."
@ -440,9 +439,9 @@
(declare ^:private get-owned-teams) (declare ^:private get-owned-teams)
(sv/defmethod ::delete-profile (sv/defmethod ::delete-profile
{::doc/added "1.0"} {::doc/added "1.0"
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id] :as params}]
(let [teams (get-owned-teams conn profile-id) (let [teams (get-owned-teams conn profile-id)
deleted-at (dt/now)] deleted-at (dt/now)]
@ -469,8 +468,7 @@
(-> (rph/wrap nil) (-> (rph/wrap nil)
(rph/with-transform (session/delete-fn cfg)))))) (rph/with-transform (session/delete-fn cfg)))))
;; --- HELPERS ;; --- HELPERS

View file

@ -219,12 +219,12 @@
::sm/params schema:update-project-pin ::sm/params schema:update-project-pin
::webhooks/batch-timeout (dt/duration "5s") ::webhooks/batch-timeout (dt/duration "5s")
::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id) ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id)
::webhooks/event? true} ::webhooks/event? true
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id team-id is-pinned] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id id team-id is-pinned] :as params}]
(check-read-permissions! conn profile-id id) (check-read-permissions! conn profile-id id)
(db/exec-one! conn [sql:update-project-pin team-id id profile-id is-pinned is-pinned]) (db/exec-one! conn [sql:update-project-pin team-id id profile-id is-pinned is-pinned])
nil)) nil)
;; --- MUTATION: Rename Project ;; --- MUTATION: Rename Project
@ -238,9 +238,9 @@
(sv/defmethod ::rename-project (sv/defmethod ::rename-project
{::doc/added "1.18" {::doc/added "1.18"
::sm/params schema:rename-project ::sm/params schema:rename-project
::webhooks/event? true} ::webhooks/event? true
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id name] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id id name] :as params}]
(check-edition-permissions! conn profile-id id) (check-edition-permissions! conn profile-id id)
(let [project (db/get-by-id conn :project id ::sql/for-update true)] (let [project (db/get-by-id conn :project id ::sql/for-update true)]
(db/update! conn :project (db/update! conn :project
@ -248,7 +248,7 @@
{:id id}) {:id id})
(rph/with-meta (rph/wrap) (rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project) {::audit/props {:team-id (:team-id project)
:prev-name (:name project)}})))) :prev-name (:name project)}})))
;; --- MUTATION: Delete Project ;; --- MUTATION: Delete Project
@ -280,13 +280,13 @@
(sv/defmethod ::delete-project (sv/defmethod ::delete-project
{::doc/added "1.18" {::doc/added "1.18"
::sm/params schema:delete-project ::sm/params schema:delete-project
::webhooks/event? true} ::webhooks/event? true
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id id] :as params}]
(check-edition-permissions! conn profile-id id) (check-edition-permissions! conn profile-id id)
(let [project (delete-project conn id)] (let [project (delete-project conn id)]
(rph/with-meta (rph/wrap) (rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project) {::audit/props {:team-id (:team-id project)
:name (:name project) :name (:name project)
:created-at (:created-at project) :created-at (:created-at project)
:modified-at (:modified-at project)}})))) :modified-at (:modified-at project)}})))

View file

@ -527,14 +527,14 @@
(sv/defmethod ::update-team (sv/defmethod ::update-team
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:update-team} ::sm/params schema:update-team
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id name] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id id name]}]
(check-edition-permissions! conn profile-id id) (check-edition-permissions! conn profile-id id)
(db/update! conn :team (db/update! conn :team
{:name name} {:name name}
{:id id}) {:id id})
nil)) nil)
;; --- Mutation: Leave Team ;; --- Mutation: Leave Team
@ -592,10 +592,10 @@
(sv/defmethod ::leave-team (sv/defmethod ::leave-team
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:leave-team} ::sm/params schema:leave-team
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id] :as params}]
(leave-team conn (assoc params :profile-id profile-id)))) (leave-team conn (assoc params :profile-id profile-id)))
;; --- Mutation: Delete Team ;; --- Mutation: Delete Team
@ -627,16 +627,16 @@
(sv/defmethod ::delete-team (sv/defmethod ::delete-team
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:delete-team} ::sm/params schema:delete-team
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(let [perms (get-permissions conn profile-id id)] (let [perms (get-permissions conn profile-id id)]
(when-not (:is-owner perms) (when-not (:is-owner perms)
(ex/raise :type :validation (ex/raise :type :validation
:code :only-owner-can-delete-team)) :code :only-owner-can-delete-team))
(delete-team conn id) (delete-team conn id)
nil))) nil))
;; --- Mutation: Team Update Role ;; --- Mutation: Team Update Role
@ -714,10 +714,10 @@
(sv/defmethod ::delete-team-member (sv/defmethod ::delete-team-member
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:delete-team-member} ::sm/params schema:delete-team-member
[{:keys [::db/pool ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id team-id member-id] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id team-id member-id] :as params}]
(let [team (get-team pool :profile-id profile-id :team-id team-id) (let [team (get-team conn :profile-id profile-id :team-id team-id)
perms (get-permissions conn profile-id team-id)] perms (get-permissions conn profile-id team-id)]
(when-not (or (:is-owner perms) (when-not (or (:is-owner perms)
(:is-admin perms)) (:is-admin perms))
@ -730,7 +730,6 @@
(db/delete! conn :team-profile-rel {:profile-id member-id (db/delete! conn :team-profile-rel {:profile-id member-id
:team-id team-id}) :team-id team-id})
(mbus/pub! msgbus (mbus/pub! msgbus
:topic member-id :topic member-id
:message {:type :team-membership-change :message {:type :team-membership-change
@ -738,7 +737,7 @@
:team-id team-id :team-id team-id
:team-name (:name team)}) :team-name (:name team)})
nil))) nil))
;; --- Mutation: Update Team Photo ;; --- Mutation: Update Team Photo
@ -764,8 +763,8 @@
(let [team (get-team pool :profile-id profile-id :team-id team-id) (let [team (get-team pool :profile-id profile-id :team-id team-id)
photo (profile/upload-photo cfg params)] photo (profile/upload-photo cfg params)]
(db/with-atomic [conn pool] (check-admin-permissions! pool profile-id team-id)
(check-admin-permissions! conn profile-id team-id)
;; Mark object as touched for make it ellegible for tentative ;; Mark object as touched for make it ellegible for tentative
;; garbage collection. ;; garbage collection.
(when-let [id (:photo-id team)] (when-let [id (:photo-id team)]
@ -776,4 +775,4 @@
{:photo-id (:id photo)} {:photo-id (:id photo)}
{:id team-id}) {:id team-id})
(assoc team :photo-id (:id photo))))) (assoc team :photo-id (:id photo))))

View file

@ -407,9 +407,9 @@
(sv/defmethod ::update-team-invitation-role (sv/defmethod ::update-team-invitation-role
{::doc/added "1.17" {::doc/added "1.17"
::doc/module :teams ::doc/module :teams
::sm/params schema:update-team-invitation-role} ::sm/params schema:update-team-invitation-role
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id email role] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id team-id email role] :as params}]
(let [perms (teams/get-permissions conn profile-id team-id)] (let [perms (teams/get-permissions conn profile-id team-id)]
(when-not (:is-admin perms) (when-not (:is-admin perms)
@ -420,7 +420,7 @@
{:role (name role) :updated-at (dt/now)} {:role (name role) :updated-at (dt/now)}
{:team-id team-id :email-to (profile/clean-email email)}) {:team-id team-id :email-to (profile/clean-email email)})
nil))) nil))
;; --- Mutation: Delete invitation ;; --- Mutation: Delete invitation
@ -431,9 +431,9 @@
(sv/defmethod ::delete-team-invitation (sv/defmethod ::delete-team-invitation
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:delete-team-invition} ::sm/params schema:delete-team-invition
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id email] :as params}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id team-id email] :as params}]
(let [perms (teams/get-permissions conn profile-id team-id)] (let [perms (teams/get-permissions conn profile-id team-id)]
(when-not (:is-admin perms) (when-not (:is-admin perms)
@ -444,7 +444,7 @@
{:team-id team-id {:team-id team-id
:email-to (profile/clean-email email)} :email-to (profile/clean-email email)}
{::db/return-keys true})] {::db/return-keys true})]
(rph/wrap nil {::audit/props {:invitation-id (:id invitation)}}))))) (rph/wrap nil {::audit/props {:invitation-id (:id invitation)}}))))
;; --- Mutation: Request Team Invitation ;; --- Mutation: Request Team Invitation

View file

@ -144,13 +144,13 @@
(sv/defmethod ::delete-webhook (sv/defmethod ::delete-webhook
{::doc/added "1.17" {::doc/added "1.17"
::sm/params schema:delete-webhook} ::sm/params schema:delete-webhook
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id]}] ::db/transaction true}
(db/with-atomic [conn pool] [{:keys [::db/conn]} {:keys [::rpc/profile-id id]}]
(let [whook (-> (db/get conn :webhook {:id id}) decode-row)] (let [whook (-> (db/get conn :webhook {:id id}) decode-row)]
(check-webhook-edition-permissions! conn profile-id (:team-id whook) (:profile-id whook)) (check-webhook-edition-permissions! conn profile-id (:team-id whook) (:profile-id whook))
(db/delete! conn :webhook {:id id}) (db/delete! conn :webhook {:id id})
nil))) nil))
;; --- Query: Webhooks ;; --- Query: Webhooks

View file

@ -78,7 +78,8 @@
(defmethod ig/init-key ::props (defmethod ig/init-key ::props
[_ {:keys [::db/pool ::key] :as cfg}] [_ {:keys [::db/pool ::key] :as cfg}]
(db/with-atomic [conn pool]
(db/tx-run! cfg (fn [{:keys [::db/conn]}]
(db/xact-lock! conn 0) (db/xact-lock! conn 0)
(when-not key (when-not key
(l/warn :hint (str "using autogenerated secret-key, it will change on each restart and will invalidate " (l/warn :hint (str "using autogenerated secret-key, it will change on each restart and will invalidate "
@ -89,8 +90,7 @@
(-> (get-all-props conn) (-> (get-all-props conn)
(assoc :secret-key secret) (assoc :secret-key secret)
(assoc :tokens-key (keys/derive secret :salt "tokens")) (assoc :tokens-key (keys/derive secret :salt "tokens"))
(update :instance-id handle-instance-id conn (db/read-only? pool)))))) (update :instance-id handle-instance-id conn (db/read-only? pool)))))))
;; FIXME ;; FIXME
(sm/register! ::props :any) (sm/register! ::props :any)

View file

@ -36,8 +36,9 @@
(defmethod exec-command :create-profile (defmethod exec-command :create-profile
[{:keys [fullname email password is-active] [{:keys [fullname email password is-active]
:or {is-active true}}] :or {is-active true}}]
(when-let [system (get-current-system)] (some-> (get-current-system)
(db/with-atomic [conn (:app.db/pool system)] (db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [password (cmd.profile/derive-password system password) (let [password (cmd.profile/derive-password system password)
params {:id (uuid/next) params {:id (uuid/next)
:email email :email email
@ -46,12 +47,13 @@
:password password :password password
:props {}}] :props {}}]
(->> (cmd.auth/create-profile! conn params) (->> (cmd.auth/create-profile! conn params)
(cmd.auth/create-profile-rels! conn)))))) (cmd.auth/create-profile-rels! conn)))))))
(defmethod exec-command :update-profile (defmethod exec-command :update-profile
[{:keys [fullname email password is-active]}] [{:keys [fullname email password is-active]}]
(when-let [system (get-current-system)] (some-> (get-current-system)
(db/with-atomic [conn (:app.db/pool system)] (db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [params (cond-> {} (let [params (cond-> {}
(some? fullname) (some? fullname)
(assoc :fullname fullname) (assoc :fullname fullname)
@ -66,7 +68,7 @@
params params
{:email email {:email email
:deleted-at nil})] :deleted-at nil})]
(pos? (db/get-update-count res)))))))) (pos? (db/get-update-count res)))))))))
(defmethod exec-command :delete-profile (defmethod exec-command :delete-profile
[{:keys [email soft]}] [{:keys [email soft]}]
@ -75,16 +77,16 @@
:code :invalid-arguments :code :invalid-arguments
:hint "email should be provided")) :hint "email should be provided"))
(when-let [system (get-current-system)] (some-> (get-current-system)
(db/with-atomic [conn (:app.db/pool system)] (db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [res (if soft (let [res (if soft
(db/update! conn :profile (db/update! conn :profile
{:deleted-at (dt/now)} {:deleted-at (dt/now)}
{:email email :deleted-at nil}) {:email email :deleted-at nil})
(db/delete! conn :profile (db/delete! conn :profile
{:email email}))] {:email email}))]
(pos? (db/get-update-count res)))))) (pos? (db/get-update-count res)))))))
(defmethod exec-command :search-profile (defmethod exec-command :search-profile
[{:keys [email]}] [{:keys [email]}]
@ -93,12 +95,12 @@
:code :invalid-arguments :code :invalid-arguments
:hint "email should be provided")) :hint "email should be provided"))
(when-let [system (get-current-system)] (some-> (get-current-system)
(db/with-atomic [conn (:app.db/pool system)] (db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [sql (str "select email, fullname, created_at, deleted_at from profile " (let [sql (str "select email, fullname, created_at, deleted_at from profile "
" where email similar to ? order by created_at desc limit 100")] " where email similar to ? order by created_at desc limit 100")]
(db/exec! conn [sql email]))))) (db/exec! conn [sql email]))))))
(defmethod exec-command :derive-password (defmethod exec-command :derive-password
[{:keys [password]}] [{:keys [password]}]

View file

@ -101,38 +101,46 @@
"Mark the profile blocked and removes all the http sessiones "Mark the profile blocked and removes all the http sessiones
associated with the profile-id." associated with the profile-id."
[email] [email]
(db/with-atomic [conn (:app.db/pool main/system)] (some-> main/system
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(when-let [profile (db/get* conn :profile (when-let [profile (db/get* conn :profile
{:email (str/lower email)} {:email (str/lower email)}
{:columns [:id :email]})] {:columns [:id :email]})]
(when-not (:is-blocked profile) (when-not (:is-blocked profile)
(db/update! conn :profile {:is-active true} {:id (:id profile)}) (db/update! conn :profile {:is-active true} {:id (:id profile)})
:activated)))) :activated))))))
(defn mark-profile-as-blocked! (defn mark-profile-as-blocked!
"Mark the profile blocked and removes all the http sessiones "Mark the profile blocked and removes all the http sessiones
associated with the profile-id." associated with the profile-id."
[email] [email]
(db/with-atomic [conn (:app.db/pool main/system)] (some-> main/system
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(when-let [profile (db/get* conn :profile (when-let [profile (db/get* conn :profile
{:email (str/lower email)} {:email (str/lower email)}
{:columns [:id :email]})] {:columns [:id :email]})]
(when-not (:is-blocked profile) (when-not (:is-blocked profile)
(db/update! conn :profile {:is-blocked true} {:id (:id profile)}) (db/update! conn :profile {:is-blocked true} {:id (:id profile)})
(db/delete! conn :http-session {:profile-id (:id profile)}) (db/delete! conn :http-session {:profile-id (:id profile)})
:blocked)))) :blocked))))))
(defn reset-password! (defn reset-password!
"Reset a password to a specific one for a concrete user or all users "Reset a password to a specific one for a concrete user or all users
if email is `:all` keyword." if email is `:all` keyword."
[& {:keys [email password] :or {password "123123"} :as params}] [& {:keys [email password] :or {password "123123"} :as params}]
(us/verify! (contains? params :email) "`email` parameter is mandatory") (when-not email
(db/with-atomic [conn (:app.db/pool main/system)] (throw (IllegalArgumentException. "email is mandatory")))
(some-> main/system
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [password (derive-password password)] (let [password (derive-password password)]
(if (= email :all) (if (= email :all)
(db/exec! conn ["update profile set password=?" password]) (db/exec! conn ["update profile set password=?" password])
(let [email (str/lower email)] (let [email (str/lower email)]
(db/exec! conn ["update profile set password=? where email=?" password email])))))) (db/exec! conn ["update profile set password=? where email=?" password email]))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FEATURES ;; FEATURES

View file

@ -26,18 +26,14 @@
{k (assoc v ::min-age (cf/get-deletion-delay))}) {k (assoc v ::min-age (cf/get-deletion-delay))})
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [::db/pool ::min-age] :as cfg}] [_ {:keys [::min-age] :as cfg}]
(fn [{:keys [props] :as task}] (fn [{:keys [props] :as task}]
(let [min-age (or (:min-age props) min-age)] (let [min-age (or (:min-age props) min-age)]
(db/with-atomic [conn pool] (-> cfg
(assoc ::db/rollback (:rollback? props))
(db/tx-run! (fn [{:keys [::db/conn]}]
(let [interval (db/interval min-age) (let [interval (db/interval min-age)
result (db/exec-one! conn [sql:delete-completed-tasks interval]) result (db/exec-one! conn [sql:delete-completed-tasks interval])
result (db/get-update-count result)] result (db/get-update-count result)]
(l/debug :hint "task finished" :total result) (l/debug :hint "task finished" :total result)
result)))))))
(when (:rollback? props)
(db/rollback! conn))
result)))))

View file

@ -71,11 +71,12 @@
(run-batch! [rconn] (run-batch! [rconn]
(try (try
(db/with-atomic [conn pool] (db/tx-run! cfg (fn [{:keys [::db/conn]}]
(if-let [tasks (get-tasks conn)] (if-let [tasks (get-tasks conn)]
(->> (group-by :queue tasks) (->> (group-by :queue tasks)
(run! (partial push-tasks! conn rconn))) (run! (partial push-tasks! conn rconn)))
(px/sleep (::wait-duration cfg)))) ;; FIXME: this sleep should be outside the transaction
(px/sleep (::wait-duration cfg)))))
(catch InterruptedException cause (catch InterruptedException cause
(throw cause)) (throw cause))
(catch Exception cause (catch Exception cause

View file

@ -138,14 +138,13 @@
" FROM information_schema.tables " " FROM information_schema.tables "
" WHERE table_schema = 'public' " " WHERE table_schema = 'public' "
" AND table_name != 'migrations';")] " AND table_name != 'migrations';")]
(db/with-atomic [conn *pool*] (db/transact! *pool* (fn [conn]
(db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"]) (db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"])
(db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"]) (db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"])
(let [result (->> (db/exec! conn [sql]) (let [result (->> (db/exec! conn [sql])
(map :table-name))] (map :table-name))]
(doseq [table result] (doseq [table result]
(db/exec! conn [(str "delete from " table ";")])))) (db/exec! conn [(str "delete from " table ";")])))))
(next))) (next)))
(defn clean-storage (defn clean-storage

View file

@ -20,19 +20,13 @@
(t/use-fixtures :each th/database-reset) (t/use-fixtures :each th/database-reset)
(t/deftest soft-auth-middleware (t/deftest soft-auth-middleware
(db/with-atomic [conn (::db/pool th/*system*)]
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
system (-> th/*system* token (db/tx-run! th/*system* app.rpc.commands.access-token/create-access-token (:id profile) "test" nil)
(assoc ::db/conn conn)
(assoc ::main/props (:app.setup/props th/*system*)))
token (app.rpc.commands.access-token/create-access-token
system (:id profile) "test" nil)
request (volatile! nil) request (volatile! nil)
handler (#'app.http.access-token/wrap-soft-auth handler (#'app.http.access-token/wrap-soft-auth
(fn [req] (vreset! request req)) (fn [req] (vreset! request req))
system)] th/*system*)]
(with-mocks [m1 {:target 'app.http.access-token/get-token (with-mocks [m1 {:target 'app.http.access-token/get-token
:return nil}] :return nil}]
@ -44,21 +38,15 @@
(handler {}) (handler {})
(let [token-id (get @request :app.http.access-token/id)] (let [token-id (get @request :app.http.access-token/id)]
(t/is (= token-id (:id token)))))))) (t/is (= token-id (:id token)))))))
(t/deftest authz-middleware (t/deftest authz-middleware
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
system (assoc th/*system* ::main/props (:app.setup/props th/*system*)) token (db/tx-run! th/*system* app.rpc.commands.access-token/create-access-token (:id profile) "test" nil)
token (db/with-atomic [conn (::db/pool th/*system*)]
(let [system (assoc system ::db/conn conn)]
(app.rpc.commands.access-token/create-access-token
system (:id profile) "test" nil)))
request (volatile! {}) request (volatile! {})
handler (#'app.http.access-token/wrap-authz handler (#'app.http.access-token/wrap-authz
(fn [req] (vreset! request req)) (fn [req] (vreset! request req))
system)] th/*system*)]
(handler nil) (handler nil)
(t/is (nil? @request)) (t/is (nil? @request))