♻️ Fix some fundamental bugs on climit module

The climit previously of this commit is heavily used inside a
transactions, so in heavy contention operation such that file thumbnail
creation can cause a db pool exhaust.

This commit fixes this issue setting up a better resource limiting
mechanism that works outside the transactions so, contention will
no longer hold an open connection/transaction.

It also adds general improvement to the traceability to the climit
mechanism: it now properly logs the profile-id that is currently
cause some contention on specific resources.

It also add a general/root climit that is applied to all requests
so if someone start making abussive requests, we can clearly detect
it.
This commit is contained in:
Andrey Antukh 2024-01-27 22:33:52 +01:00
parent 658c26014b
commit a5c6d78ee5
12 changed files with 291 additions and 212 deletions

View file

@ -3,15 +3,26 @@
;; Optional: queue, ommited means Integer/MAX_VALUE ;; Optional: queue, ommited means Integer/MAX_VALUE
;; Optional: timeout, ommited means no timeout ;; Optional: timeout, ommited means no timeout
;; Note: queue and timeout are excluding ;; Note: queue and timeout are excluding
{:update-file/by-profile {:update-file/global {:permits 20}
:update-file/by-profile
{:permits 1 :queue 5} {:permits 1 :queue 5}
:update-file/global {:permits 20}
:derive-password/global {:permits 8}
:process-font/global {:permits 4} :process-font/global {:permits 4}
:process-image/global {:permits 8} :process-font/by-profile {:permits 1}
:process-image/global {:permits 8}
:process-image/by-profile {:permits 1}
:auth/global {:permits 8}
:root/global
{:permits 40}
:root/by-profile
{:permits 10}
:file-thumbnail-ops/global
{:permits 20}
:file-thumbnail-ops/by-profile :file-thumbnail-ops/by-profile
{:permits 2} {:permits 2}

View file

@ -322,9 +322,7 @@
::rpc/climit (ig/ref ::rpc/climit) ::rpc/climit (ig/ref ::rpc/climit)
::rpc/rlimit (ig/ref ::rpc/rlimit) ::rpc/rlimit (ig/ref ::rpc/rlimit)
::setup/templates (ig/ref ::setup/templates) ::setup/templates (ig/ref ::setup/templates)
::props (ig/ref ::setup/props) ::props (ig/ref ::setup/props)}
:pool (ig/ref ::db/pool)}
:app.rpc.doc/routes :app.rpc.doc/routes
{:methods (ig/ref :app.rpc/methods)} {:methods (ig/ref :app.rpc/methods)}

View file

@ -240,8 +240,7 @@
::mtx/metrics ::mtx/metrics
::main/props] ::main/props]
:opt [::climit :opt [::climit
::rlimit] ::rlimit]))
:req-un [::db/pool]))
(defmethod ig/init-key ::methods (defmethod ig/init-key ::methods
[_ cfg] [_ cfg]

View file

@ -21,26 +21,31 @@
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.edn :as edn] [clojure.edn :as edn]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.fs :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px] [promesa.exec :as px]
[promesa.exec.bulkhead :as pbh]) [promesa.exec.bulkhead :as pbh])
(:import (:import
clojure.lang.ExceptionInfo)) clojure.lang.ExceptionInfo
java.util.concurrent.atomic.AtomicLong))
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(defn- id->str (defn- id->str
[id] ([id]
(-> (str id) (-> (str id)
(subs 1))) (subs 1)))
([id key]
(if key
(str (-> (str id) (subs 1)) "/" key)
(id->str id))))
(defn- create-cache (defn- create-cache
[{:keys [::wrk/executor]}] [{:keys [::wrk/executor]}]
(letfn [(on-remove [key _ cause] (letfn [(on-remove [key _ cause]
(let [[id skey] key] (let [[id skey] key]
(l/dbg :hint "destroy limiter" :id (id->str id) :key skey :reason (str cause))))] (l/dbg :hint "disposed" :id (id->str id skey) :reason (str cause))))]
(cache/create :executor executor (cache/create :executor executor
:on-remove on-remove :on-remove on-remove
:keepalive "5m"))) :keepalive "5m")))
@ -81,132 +86,179 @@
(defn- create-limiter (defn- create-limiter
[config [id skey]] [config [id skey]]
(l/dbg :hint "create limiter" :id (id->str id) :key skey) (l/dbg :hint "created" :id (id->str id skey))
(pbh/create :permits (or (:permits config) (:concurrency config)) (pbh/create :permits (or (:permits config) (:concurrency config))
:queue (or (:queue config) (:queue-size config)) :queue (or (:queue config) (:queue-size config))
:timeout (:timeout config) :timeout (:timeout config)
:type :semaphore)) :type :semaphore))
(defn- invoke! (defmacro ^:private measure-and-log!
[config cache metrics id key f] [metrics mlabels stats id action limit-id limit-label profile-id elapsed]
(if-let [limiter (cache/get cache [id key] (partial create-limiter config))] `(let [mpermits# (:max-permits ~stats)
mqueue# (:max-queue ~stats)
permits# (:permits ~stats)
queue# (:queue ~stats)
queue# (- queue# mpermits#)
queue# (if (neg? queue#) 0 queue#)
level# (if (pos? queue#) :warn :trace)]
(mtx/run! ~metrics
:id :rpc-climit-queue
:val queue#
:labels ~mlabels)
(mtx/run! ~metrics
:id :rpc-climit-permits
:val permits#
:labels ~mlabels)
(l/log level#
:hint ~action
:req ~id
:id ~limit-id
:label ~limit-label
:profile-id (str ~profile-id)
:permits permits#
:queue queue#
:max-permits mpermits#
:max-queue mqueue#
~@(if (some? elapsed)
[:elapsed `(dt/format-duration ~elapsed)]
[]))))
(def ^:private idseq (AtomicLong. 0))
(defn- invoke
[limiter metrics limit-id limit-key limit-label profile-id f params]
(let [tpoint (dt/tpoint) (let [tpoint (dt/tpoint)
labels (into-array String [(id->str id)]) limit-id (id->str limit-id limit-key)
wrapped (fn [] mlabels (into-array String [limit-id])
stats (pbh/get-stats limiter)
id (.incrementAndGet ^AtomicLong idseq)]
(try
(measure-and-log! metrics mlabels stats id "enqueued" limit-id limit-label profile-id nil)
(px/invoke! limiter (fn []
(let [elapsed (tpoint) (let [elapsed (tpoint)
stats (pbh/get-stats limiter)] stats (pbh/get-stats limiter)]
(l/trc :hint "acquired" (measure-and-log! metrics mlabels stats id "acquired" limit-id limit-label profile-id elapsed)
:id (id->str id)
:key key
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed))
(mtx/run! metrics (mtx/run! metrics
:id :rpc-climit-timing :id :rpc-climit-timing
:val (inst-ms elapsed) :val (inst-ms elapsed)
:labels labels) :labels mlabels)
(try (apply f params))))
(f)
(finally
(let [elapsed (tpoint)]
(l/trc :hint "finished"
:id (id->str id)
:key key
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed)))))))
measure!
(fn [stats]
(mtx/run! metrics
:id :rpc-climit-queue
:val (:queue stats)
:labels labels)
(mtx/run! metrics
:id :rpc-climit-permits
:val (:permits stats)
:labels labels))]
(try
(let [stats (pbh/get-stats limiter)]
(measure! stats)
(l/trc :hint "enqueued"
:id (id->str id)
:key key
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats))
(px/invoke! limiter wrapped))
(catch ExceptionInfo cause (catch ExceptionInfo cause
(let [{:keys [type code]} (ex-data cause)] (let [{:keys [type code]} (ex-data cause)]
(if (= :bulkhead-error type) (if (= :bulkhead-error type)
(let [elapsed (tpoint)]
(measure-and-log! metrics mlabels stats id "reject" limit-id limit-label profile-id elapsed)
(ex/raise :type :concurrency-limit (ex/raise :type :concurrency-limit
:code code :code code
:hint "concurrency limit reached") :hint "concurrency limit reached"
:cause cause))
(throw cause)))) (throw cause))))
(finally (finally
(measure! (pbh/get-stats limiter))))) (let [elapsed (tpoint)
stats (pbh/get-stats limiter)]
(do (measure-and-log! metrics mlabels stats id "finished" limit-id limit-label profile-id elapsed))))))
(l/wrn :hint "no limiter found" :id (id->str id))
(f))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; MIDDLEWARE ;; MIDDLEWARE
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def noop-fn (constantly nil)) (def ^:private noop-fn (constantly nil))
(def ^:private global-limits
[[:root/global noop-fn]
[:root/by-profile ::rpc/profile-id]])
(defn- get-limits
[cfg]
(when-let [ref (get cfg ::id)]
(cond
(keyword? ref)
[[ref]]
(and (vector? ref)
(keyword (first ref)))
[ref]
(and (vector? ref)
(vector? (first ref)))
(rseq ref)
:else
(throw (IllegalArgumentException. "unable to normalize limit")))))
(defn wrap (defn wrap
[{:keys [::rpc/climit ::mtx/metrics]} f {:keys [::id ::key-fn] :or {key-fn noop-fn} :as mdata}] [{:keys [::rpc/climit ::mtx/metrics]} handler mdata]
(if (and (some? climit) (some? id))
(let [cache (::cache climit) (let [cache (::cache climit)
config (::config climit)] config (::config climit)
(if-let [config (get config id)] label (::sv/name mdata)]
(do
(reduce (fn [handler [limit-id key-fn]]
(if-let [config (get config limit-id)]
(let [key-fn (or key-fn noop-fn)]
(l/dbg :hint "instrumenting method" (l/dbg :hint "instrumenting method"
:limit (id->str id) :method label
:service-name (::sv/name mdata) :limit (id->str limit-id)
:timeout (:timeout config) :timeout (:timeout config)
:permits (:permits config) :permits (:permits config)
:queue (:queue config) :queue (:queue config)
:keyed? (not= key-fn noop-fn)) :keyed (not= key-fn noop-fn))
(if (and (= key-fn ::rpc/profile-id)
(false? (::rpc/auth mdata true)))
;; We don't enforce by-profile limit on methods that does
;; not require authentication
handler
(fn [cfg params] (fn [cfg params]
(invoke! config cache metrics id (key-fn params) (partial f cfg params)))) (let [limit-key (key-fn params)
cache-key [limit-id limit-key]
limiter (cache/get cache cache-key (partial create-limiter config))
profile-id (if (= key-fn ::rpc/profile-id)
limit-key
(get params ::rpc/profile-id))]
(invoke limiter metrics limit-id limit-key label profile-id handler [cfg params])))))
(do (do
(l/wrn :hint "no config found for specified queue" :id (id->str id)) (l/wrn :hint "no config found for specified queue" :id (id->str limit-id))
f))) handler)))
f)) handler
(concat global-limits (get-limits mdata)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API ;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn configure (defn- build-exec-chain
[{:keys [::rpc/climit]} id] [{:keys [::label ::profile-id ::rpc/climit ::mtx/metrics] :as cfg} f]
(us/assert! ::rpc/climit climit) (let [config (get climit ::config)
(assoc climit ::id id)) cache (get climit ::cache)]
(defn run! (reduce (fn [handler [limit-id limit-key :as ckey]]
(let [config (get config limit-id)]
(when-not config
(throw (IllegalArgumentException.
(str/ffmt "config not found for: %" limit-id))))
(fn [& params]
(let [limiter (cache/get cache ckey (partial create-limiter config))]
(invoke limiter metrics limit-id limit-key label profile-id handler params)))))
f
(get-limits cfg))))
(defn invoke!
"Run a function in context of climit. "Run a function in context of climit.
Intended to be used in virtual threads." Intended to be used in virtual threads."
([{:keys [::id ::cache ::config ::mtx/metrics]} f] [{:keys [::executor] :as cfg} f & params]
(if-let [config (get config id)] (let [f (if (some? executor)
(invoke! config cache metrics id nil f) (fn [& params] (px/await! (px/submit! executor (fn [] (apply f params)))))
(f))) f)
f (build-exec-chain cfg f)]
([{:keys [::id ::cache ::config ::mtx/metrics]} f executor] (apply f params)))
(let [f #(p/await! (px/submit! executor f))]
(if-let [config (get config id)]
(invoke! config cache metrics id nil f)
(f)))))

View file

@ -21,6 +21,7 @@
[app.loggers.audit :as audit] [app.loggers.audit :as audit]
[app.main :as-alias main] [app.main :as-alias main]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.climit :as-alias climit]
[app.rpc.commands.profile :as profile] [app.rpc.commands.profile :as profile]
[app.rpc.commands.teams :as teams] [app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc] [app.rpc.doc :as-alias doc]
@ -39,7 +40,7 @@
;; ---- COMMAND: login with password ;; ---- COMMAND: login with password
(defn login-with-password (defn login-with-password
[{:keys [::db/pool] :as cfg} {:keys [email password] :as params}] [cfg {:keys [email password] :as params}]
(when-not (or (contains? cf/flags :login) (when-not (or (contains? cf/flags :login)
(contains? cf/flags :login-with-password)) (contains? cf/flags :login-with-password))
@ -47,7 +48,7 @@
:code :login-disabled :code :login-disabled
:hint "login is disabled in this instance")) :hint "login is disabled in this instance"))
(letfn [(check-password [conn profile password] (letfn [(check-password [cfg profile password]
(if (= (:password profile) "!") (if (= (:password profile) "!")
(ex/raise :type :validation (ex/raise :type :validation
:code :account-without-password :code :account-without-password
@ -57,10 +58,10 @@
(l/trc :hint "updating profile password" (l/trc :hint "updating profile password"
:id (str (:id profile)) :id (str (:id profile))
:email (:email profile)) :email (:email profile))
(profile/update-profile-password! conn (assoc profile :password password))) (profile/update-profile-password! cfg (assoc profile :password password)))
(:valid result)))) (:valid result))))
(validate-profile [conn profile] (validate-profile [cfg profile]
(when-not profile (when-not profile
(ex/raise :type :validation (ex/raise :type :validation
:code :wrong-credentials)) :code :wrong-credentials))
@ -70,7 +71,7 @@
(when (:is-blocked profile) (when (:is-blocked profile)
(ex/raise :type :restriction (ex/raise :type :restriction
:code :profile-blocked)) :code :profile-blocked))
(when-not (check-password conn profile password) (when-not (check-password cfg profile password)
(ex/raise :type :validation (ex/raise :type :validation
:code :wrong-credentials)) :code :wrong-credentials))
(when-let [deleted-at (:deleted-at profile)] (when-let [deleted-at (:deleted-at profile)]
@ -78,11 +79,11 @@
(ex/raise :type :validation (ex/raise :type :validation
:code :wrong-credentials))) :code :wrong-credentials)))
profile)] profile)
(db/with-atomic [conn pool] (login [{:keys [::db/conn] :as cfg}]
(let [profile (->> (profile/get-profile-by-email conn email) (let [profile (->> (profile/get-profile-by-email conn email)
(validate-profile conn) (validate-profile cfg)
(profile/strip-private-attrs)) (profile/strip-private-attrs))
invitation (when-let [token (:invitation-token params)] invitation (when-let [token (:invitation-token params)]
@ -98,7 +99,9 @@
(-> response (-> response
(rph/with-transform (session/create-fn cfg (:id profile))) (rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/props (audit/profile->props profile) (rph/with-meta {::audit/props (audit/profile->props profile)
::audit/profile-id (:id profile)})))))) ::audit/profile-id (:id profile)}))))]
(db/tx-run! cfg login)))
(def schema:login-with-password (def schema:login-with-password
[:map {:title "login-with-password"} [:map {:title "login-with-password"}
@ -110,6 +113,7 @@
"Performs authentication using penpot password." "Performs authentication using penpot password."
{::rpc/auth false {::rpc/auth false
::doc/added "1.15" ::doc/added "1.15"
::climit/id :auth/global
::sm/params schema:login-with-password} ::sm/params schema:login-with-password}
[cfg params] [cfg params]
(login-with-password cfg params)) (login-with-password cfg params))
@ -149,7 +153,8 @@
(sv/defmethod ::recover-profile (sv/defmethod ::recover-profile
{::rpc/auth false {::rpc/auth false
::doc/added "1.15" ::doc/added "1.15"
::sm/params schema:recover-profile} ::sm/params schema:recover-profile
::climit/id :auth/global}
[cfg params] [cfg params]
(recover-profile cfg params)) (recover-profile cfg params))
@ -360,7 +365,6 @@
{::audit/type "fact" {::audit/type "fact"
::audit/name "register-profile-retry" ::audit/name "register-profile-retry"
::audit/profile-id id})) ::audit/profile-id id}))
(cond (cond
;; If invitation token comes in params, this is because the ;; If invitation token comes in params, this is because the
;; user comes from team-invitation process; in this case, ;; user comes from team-invitation process; in this case,
@ -402,7 +406,6 @@
{::audit/replace-props (audit/profile->props profile) {::audit/replace-props (audit/profile->props profile)
::audit/profile-id (:id profile)}))))) ::audit/profile-id (:id profile)})))))
(def schema:register-profile (def schema:register-profile
[:map {:title "register-profile"} [:map {:title "register-profile"}
[:token schema:token] [:token schema:token]
@ -411,7 +414,8 @@
(sv/defmethod ::register-profile (sv/defmethod ::register-profile
{::rpc/auth false {::rpc/auth false
::doc/added "1.15" ::doc/added "1.15"
::sm/params schema:register-profile} ::sm/params schema:register-profile
::climit/id :auth/global}
[{:keys [::db/pool] :as cfg} params] [{:keys [::db/pool] :as cfg} params]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(-> (assoc cfg ::db/conn conn) (-> (assoc cfg ::db/conn conn)

View file

@ -285,12 +285,10 @@
(sv/defmethod ::create-file-object-thumbnail (sv/defmethod ::create-file-object-thumbnail
{::doc/added "1.19" {::doc/added "1.19"
::doc/module :files ::doc/module :files
::climit/id :file-thumbnail-ops/by-profile ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id]
::climit/key-fn ::rpc/profile-id [:file-thumbnail-ops/global]]
::rtry/enabled true ::rtry/enabled true
::rtry/when rtry/conflict-exception? ::rtry/when rtry/conflict-exception?
::audit/skip true ::audit/skip true
::sm/params schema:create-file-object-thumbnail} ::sm/params schema:create-file-object-thumbnail}
@ -332,8 +330,8 @@
{::doc/added "1.19" {::doc/added "1.19"
::doc/module :files ::doc/module :files
::doc/deprecated "1.20" ::doc/deprecated "1.20"
::climit/id :file-thumbnail-ops ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id]
::climit/key-fn ::rpc/profile-id [:file-thumbnail-ops/global]]
::audit/skip true} ::audit/skip true}
[cfg {:keys [::rpc/profile-id file-id object-id]}] [cfg {:keys [::rpc/profile-id file-id object-id]}]
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}] (db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
@ -408,8 +406,8 @@
{::doc/added "1.19" {::doc/added "1.19"
::doc/module :files ::doc/module :files
::audit/skip true ::audit/skip true
::climit/id :file-thumbnail-ops ::climit/id [[:file-thumbnail-ops/by-profile ::rpc/profile-id]
::climit/key-fn ::rpc/profile-id [:file-thumbnail-ops/global]]
::rtry/enabled true ::rtry/enabled true
::rtry/when rtry/conflict-exception? ::rtry/when rtry/conflict-exception?
::sm/params schema:create-file-thumbnail} ::sm/params schema:create-file-thumbnail}

View file

@ -35,7 +35,8 @@
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.set :as set])) [clojure.set :as set]
[promesa.exec :as px]))
;; --- SCHEMA ;; --- SCHEMA
@ -132,8 +133,8 @@
;; database. ;; database.
(sv/defmethod ::update-file (sv/defmethod ::update-file
{::climit/id :update-file/by-profile {::climit/id [[:update-file/by-profile ::rpc/profile-id]
::climit/key-fn ::rpc/profile-id [:update-file/global]]
::webhooks/event? true ::webhooks/event? true
::webhooks/batch-timeout (dt/duration "2m") ::webhooks/batch-timeout (dt/duration "2m")
::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id) ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id)
@ -232,13 +233,9 @@
(defn- update-file* (defn- update-file*
[{:keys [::db/conn ::wrk/executor] :as cfg} [{:keys [::db/conn ::wrk/executor] :as cfg}
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
(let [;; Process the file data in the CLIMIT context; scheduling it (let [;; Process the file data on separated thread for avoid to do
;; to be executed on a separated executor for avoid to do the ;; the CPU intensive operation on vthread.
;; CPU intensive operation on vthread. file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))]
update-fdata-fn (partial update-file-data cfg file changes skip-validate)
file (-> (climit/configure cfg :update-file/global)
(climit/run! update-fdata-fn executor))]
(db/insert! conn :file-change (db/insert! conn :file-change
{:id (uuid/next) {:id (uuid/next)
@ -306,7 +303,6 @@
(fmg/migrate-file)) (fmg/migrate-file))
file) file)
;; WARNING: this ruins performance; maybe we need to find ;; WARNING: this ruins performance; maybe we need to find
;; some other way to do general validation ;; some other way to do general validation
libs (when (and (or (contains? cf/flags :file-validation) libs (when (and (or (contains? cf/flags :file-validation)

View file

@ -16,7 +16,7 @@
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.media :as media] [app.media :as media]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.climit :as climit] [app.rpc.climit :as-alias climit]
[app.rpc.commands.files :as files] [app.rpc.commands.files :as files]
[app.rpc.commands.projects :as projects] [app.rpc.commands.projects :as projects]
[app.rpc.commands.teams :as teams] [app.rpc.commands.teams :as teams]
@ -26,7 +26,8 @@
[app.storage :as sto] [app.storage :as sto]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk])) [app.worker :as-alias wrk]
[promesa.exec :as px]))
(def valid-weight #{100 200 300 400 500 600 700 800 900 950}) (def valid-weight #{100 200 300 400 500 600 700 800 900 950})
(def valid-style #{"normal" "italic"}) (def valid-style #{"normal" "italic"})
@ -87,6 +88,8 @@
(sv/defmethod ::create-font-variant (sv/defmethod ::create-font-variant
{::doc/added "1.18" {::doc/added "1.18"
::climit/id [[:process-font/by-profile ::rpc/profile-id]
[:process-font/global]]
::webhooks/event? true ::webhooks/event? true
::sm/params schema:create-font-variant} ::sm/params schema:create-font-variant}
[cfg {:keys [::rpc/profile-id team-id] :as params}] [cfg {:keys [::rpc/profile-id team-id] :as params}]
@ -100,7 +103,7 @@
(create-font-variant cfg (assoc params :profile-id profile-id)))))) (create-font-variant cfg (assoc params :profile-id profile-id))))))
(defn create-font-variant (defn create-font-variant
[{:keys [::sto/storage ::db/conn] :as cfg} {:keys [data] :as params}] [{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}]
(letfn [(generate-missing! [data] (letfn [(generate-missing! [data]
(let [data (media/run {:cmd :generate-fonts :input data})] (let [data (media/run {:cmd :generate-fonts :input data})]
(when (and (not (contains? data "font/otf")) (when (and (not (contains? data "font/otf"))
@ -152,9 +155,7 @@
:otf-file-id (:id otf) :otf-file-id (:id otf)
:ttf-file-id (:id ttf)}))] :ttf-file-id (:id ttf)}))]
(let [data (-> (climit/configure cfg :process-font/global) (let [data (px/invoke! executor (partial generate-missing! data))
(climit/run! (partial generate-missing! data)
(::wrk/executor cfg)))
assets (persist-fonts-files! data) assets (persist-fonts-files! data)
result (insert-font-variant! assets)] result (insert-font-variant! assets)]
(vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys)))))) (vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys))))))

View file

@ -27,7 +27,8 @@
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.io :as io])) [datoteka.io :as io]
[promesa.exec :as px]))
(def default-max-file-size (def default-max-file-size
(* 1024 1024 10)) ; 10 MiB (* 1024 1024 10)) ; 10 MiB
@ -56,20 +57,25 @@
:opt-un [::id])) :opt-un [::id]))
(sv/defmethod ::upload-file-media-object (sv/defmethod ::upload-file-media-object
{::doc/added "1.17"} {::doc/added "1.17"
::climit/id [[:process-image/by-profile ::rpc/profile-id]
[:process-image/global]]}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}] [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id) (files/check-edition-permissions! pool profile-id file-id)
(media/validate-media-type! content) (media/validate-media-type! content)
(media/validate-media-size! content) (media/validate-media-size! content)
(let [object (db/run! cfg #(create-file-media-object % params))
(db/run! cfg (fn [cfg]
(let [object (create-file-media-object cfg params)
props {:name (:name params) props {:name (:name params)
:file-id file-id :file-id file-id
:is-local (:is-local params) :is-local (:is-local params)
:size (:size content) :size (:size content)
:mtype (:mtype content)}] :mtype (:mtype content)}]
(with-meta object (with-meta object
{::audit/replace-props props})))) {::audit/replace-props props}))))))
(defn- big-enough-for-thumbnail? (defn- big-enough-for-thumbnail?
"Checks if the provided image info is big enough for "Checks if the provided image info is big enough for
@ -144,12 +150,10 @@
(assoc ::image (process-main-image info))))) (assoc ::image (process-main-image info)))))
(defn create-file-media-object (defn create-file-media-object
[{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg} [{:keys [::sto/storage ::db/conn ::wrk/executor]}
{:keys [id file-id is-local name content]}] {:keys [id file-id is-local name content]}]
(let [result (-> (climit/configure cfg :process-image/global) (let [result (px/invoke! executor (partial process-image content))
(climit/run! (partial process-image content) executor))
image (sto/put-object! storage (::image result)) image (sto/put-object! storage (::image result))
thumb (when-let [params (::thumb result)] thumb (when-let [params (::thumb result)]
(sto/put-object! storage params))] (sto/put-object! storage params))]
@ -183,7 +187,7 @@
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}] [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)] (let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id) (files/check-edition-permissions! pool profile-id file-id)
(db/run! cfg #(create-file-media-object-from-url % params)))) (create-file-media-object-from-url cfg (assoc params :profile-id profile-id))))
(defn download-image (defn download-image
[{:keys [::http/client]} uri] [{:keys [::http/client]} uri]
@ -235,7 +239,16 @@
params (-> params params (-> params
(assoc :content content) (assoc :content content)
(assoc :name (or name (:filename content))))] (assoc :name (or name (:filename content))))]
(create-file-media-object cfg params)))
;; NOTE: we use the climit here in a dynamic invocation because we
;; don't want saturate the process-image limit with IO (download
;; of external image)
(-> cfg
(assoc ::climit/id [[:process-image/by-profile (:profile-id params)]
[:process-image/global]])
(assoc ::climit/profile-id (:profile-id params))
(assoc ::climit/label "create-file-media-object-from-url")
(climit/invoke! db/run! cfg create-file-media-object params))))
;; --- Clone File Media object (Upload and create from url) ;; --- Clone File Media object (Upload and create from url)

View file

@ -28,7 +28,8 @@
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk] [app.worker :as-alias wrk]
[cuerdas.core :as str])) [cuerdas.core :as str]
[promesa.exec :as px]))
(declare check-profile-existence!) (declare check-profile-existence!)
(declare decode-row) (declare decode-row)
@ -137,14 +138,13 @@
[:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]])) [:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]]))
(sv/defmethod ::update-profile-password (sv/defmethod ::update-profile-password
{:doc/added "1.0" {::doc/added "1.0"
::sm/params schema:update-profile-password ::sm/params schema:update-profile-password
::sm/result :nil} ::climit/id :auth/global}
[cfg {:keys [::rpc/profile-id password] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id password] :as params}] (db/tx-run! cfg (fn [cfg]
(db/with-atomic [conn pool] (let [profile (validate-password! cfg (assoc params :profile-id profile-id))
(let [cfg (assoc cfg ::db/conn conn)
profile (validate-password! cfg (assoc params :profile-id profile-id))
session-id (::session/id params)] session-id (::session/id params)]
(when (= (str/lower (:email profile)) (when (= (str/lower (:email profile))
@ -153,9 +153,9 @@
:code :email-as-password :code :email-as-password
:hint "you can't use your email as password")) :hint "you can't use your email as password"))
(update-profile-password! conn (assoc profile :password password)) (update-profile-password! cfg (assoc profile :password password))
(invalidate-profile-session! cfg profile-id session-id) (invalidate-profile-session! cfg profile-id session-id)
nil))) nil))))
(defn- invalidate-profile-session! (defn- invalidate-profile-session!
"Removes all sessions except the current one." "Removes all sessions except the current one."
@ -173,10 +173,10 @@
profile)) profile))
(defn update-profile-password! (defn update-profile-password!
[conn {:keys [id password] :as profile}] [{:keys [::db/conn] :as cfg} {:keys [id password] :as profile}]
(when-not (db/read-only? conn) (when-not (db/read-only? conn)
(db/update! conn :profile (db/update! conn :profile
{:password (auth/derive-password password)} {:password (derive-password cfg password)}
{:id id}) {:id id})
nil)) nil))
@ -203,6 +203,7 @@
(defn update-profile-photo (defn update-profile-photo
[{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}] [{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}]
(let [photo (upload-photo cfg params) (let [photo (upload-photo cfg params)
profile (db/get-by-id pool :profile profile-id ::sql/for-update true)] profile (db/get-by-id pool :profile profile-id ::sql/for-update true)]
@ -241,8 +242,11 @@
(defn upload-photo (defn upload-photo
[{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}] [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}]
(let [params (-> (climit/configure cfg :process-image/global) (let [params (-> cfg
(climit/run! (partial generate-thumbnail! file) executor))] (assoc ::climit/id :process-image/global)
(assoc ::climit/label "upload-photo")
(assoc ::climit/executor executor)
(climit/invoke! generate-thumbnail! file))]
(sto/put-object! storage params))) (sto/put-object! storage params)))
@ -438,17 +442,13 @@
(into {} (filter (fn [[k _]] (simple-ident? k))) props)) (into {} (filter (fn [[k _]] (simple-ident? k))) props))
(defn derive-password (defn derive-password
[cfg password] [{:keys [::wrk/executor]} password]
(when password (when password
(-> (climit/configure cfg :derive-password/global) (px/invoke! executor (partial auth/derive-password password))))
(climit/run! (partial auth/derive-password password)
(::wrk/executor cfg)))))
(defn verify-password (defn verify-password
[cfg password password-data] [{:keys [::wrk/executor]} password password-data]
(-> (climit/configure cfg :derive-password/global) (px/invoke! executor (partial auth/verify-password password password-data)))
(climit/run! (partial auth/verify-password password password-data)
(::wrk/executor cfg))))
(defn decode-row (defn decode-row
[{:keys [props] :as row}] [{:keys [props] :as row}]

View file

@ -71,6 +71,7 @@
:enable-email-verification :enable-email-verification
:enable-smtp :enable-smtp
:enable-quotes :enable-quotes
:enable-rpc-climit
:enable-feature-fdata-pointer-map :enable-feature-fdata-pointer-map
:enable-feature-fdata-objets-map :enable-feature-fdata-objets-map
:enable-feature-components-v2 :enable-feature-components-v2

View file

@ -319,6 +319,12 @@
::message (delay ~message)}) ::message (delay ~message)})
nil))) nil)))
(defmacro log
[level & params]
`(do
(log! ::logger ~(str *ns*) ::level ~level ~@params)
nil))
(defmacro info (defmacro info
[& params] [& params]
`(do `(do