diff --git a/backend/deps.edn b/backend/deps.edn
index 92fa269d5..b449a6ecc 100644
--- a/backend/deps.edn
+++ b/backend/deps.edn
@@ -37,6 +37,8 @@
buddy/buddy-hashers {:mvn/version "1.8.158"}
buddy/buddy-sign {:mvn/version "3.4.333"}
+ com.github.ben-manes.caffeine/caffeine {:mvn/version "3.1.1"}
+
org.jsoup/jsoup {:mvn/version "1.15.1"}
org.im4java/im4java
{:git/tag "1.4.0-penpot-2"
diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn
new file mode 100644
index 000000000..697d16539
--- /dev/null
+++ b/backend/resources/climit.edn
@@ -0,0 +1,7 @@
+;; Example climit.edn file
+;; Required: concurrency
+;; Optional: queue-size, ommited means Integer/MAX_VALUE
+{:update-file {:concurrency 1 :queue-size 3}
+ :auth {:concurrency 128}
+ :process-font {:concurrency 4 :queue-size 32}
+ :process-image {:concurrency 8 :queue-size 32}}
diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml
index b653c60fa..6e4c30572 100644
--- a/backend/resources/log4j2-devenv.xml
+++ b/backend/resources/log4j2-devenv.xml
@@ -32,6 +32,7 @@
+
diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj
index 9e65cd3d3..5ce6f52b1 100644
--- a/backend/src/app/config.clj
+++ b/backend/src/app/config.clj
@@ -52,7 +52,9 @@
:default-blob-version 5
:loggers-zmq-uri "tcp://localhost:45556"
+
:rpc-rlimit-config (fs/path "resources/rlimit.edn")
+ :rpc-climit-config (fs/path "resources/climit.edn")
:file-change-snapshot-every 5
:file-change-snapshot-timeout "3h"
@@ -90,6 +92,7 @@
(s/def ::default-rpc-rlimit ::us/vector-of-strings)
(s/def ::rpc-rlimit-config ::fs/path)
+(s/def ::rpc-climit-config ::fs/path)
(s/def ::media-max-file-size ::us/integer)
@@ -172,11 +175,6 @@
(s/def ::redis-uri ::us/string)
(s/def ::registration-domain-whitelist ::us/set-of-strings)
-(s/def ::semaphore-process-font ::us/integer)
-(s/def ::semaphore-process-image ::us/integer)
-(s/def ::semaphore-update-file ::us/integer)
-(s/def ::semaphore-auth ::us/integer)
-
(s/def ::smtp-default-from ::us/string)
(s/def ::smtp-default-reply-to ::us/string)
(s/def ::smtp-host ::us/string)
diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj
index b66b416ec..f6764dbab 100644
--- a/backend/src/app/http/errors.clj
+++ b/backend/src/app/http/errors.clj
@@ -94,6 +94,23 @@
[err _]
(yrs/response 404 (ex-data err)))
+(defmethod handle-exception :internal
+ [error request]
+ (let [{:keys [code] :as edata} (ex-data error)]
+ (cond
+ (= :concurrency-limit-reached code)
+ (yrs/response 429)
+
+ :else
+ (do
+ (l/error ::l/raw (ex-message error)
+ ::l/context (get-context request)
+ :cause error)
+ (yrs/response 500 {:type :server-error
+ :code :unhandled
+ :hint (ex-message error)
+ :data edata})))))
+
(defmethod handle-exception org.postgresql.util.PSQLException
[error request]
(let [state (.getSQLState ^java.sql.SQLException error)]
diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj
index 97bcdc7a2..9f3d5a150 100644
--- a/backend/src/app/main.clj
+++ b/backend/src/app/main.clj
@@ -204,7 +204,7 @@
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::default :app.worker/executor])}
- :app.rpc/semaphores
+ :app.rpc/climit
{:metrics (ig/ref :app.metrics/metrics)
:executor (ig/ref [::default :app.worker/executor])}
@@ -224,11 +224,11 @@
:audit (ig/ref :app.loggers.audit/collector)
:ldap (ig/ref :app.auth.ldap/provider)
:http-client (ig/ref :app.http/client)
+ :climit (ig/ref :app.rpc/climit)
:rlimit (ig/ref :app.rpc/rlimit)
:executors (ig/ref :app.worker/executors)
:executor (ig/ref [::default :app.worker/executor])
:templates (ig/ref :app.setup/builtin-templates)
- :semaphores (ig/ref :app.rpc/semaphores)
}
:app.rpc.doc/routes
diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj
index 1429b2f57..9f4d4a472 100644
--- a/backend/src/app/metrics.clj
+++ b/backend/src/app/metrics.clj
@@ -100,21 +100,21 @@
::mdef/labels ["name"]
::mdef/type :summary}
- :semaphore-queued-submissions
- {::mdef/name "penpot_semaphore_queued_submissions"
- ::mdef/help "Current number of queued submissions on SEMAPHORE."
+ :rpc-climit-queue-size
+ {::mdef/name "penpot_rpc_climit_queue_size"
+ ::mdef/help "Current number of queued submissions on the CLIMIT."
::mdef/labels ["name"]
::mdef/type :gauge}
- :semaphore-used-permits
- {::mdef/name "penpot_semaphore_used_permits"
- ::mdef/help "Current number of used permits on SEMAPHORE."
+ :rpc-climit-concurrency
+ {::mdef/name "penpot_rpc_climit_concurrency"
+ ::mdef/help "Current number of used concurrency capacity on the CLIMIT"
::mdef/labels ["name"]
::mdef/type :gauge}
- :semaphore-timing
- {::mdef/name "penpot_semaphore_timing"
- ::mdef/help "Total timing of SEMAPHORE."
+ :rpc-climit-timing
+ {::mdef/name "penpot_rpc_climit_timing"
+ ::mdef/help "Summary of the time between queuing and executing on the CLIMIT"
::mdef/labels ["name"]
::mdef/type :summary}
diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj
index 01adf6b07..6a5e28035 100644
--- a/backend/src/app/rpc.clj
+++ b/backend/src/app/rpc.clj
@@ -15,9 +15,9 @@
[app.loggers.audit :as audit]
[app.metrics :as mtx]
[app.msgbus :as-alias mbus]
+ [app.rpc.climit :as climit]
[app.rpc.retry :as retry]
[app.rpc.rlimit :as rlimit]
- [app.rpc.semaphore :as-alias rsem]
[app.storage :as-alias sto]
[app.util.services :as sv]
[app.util.time :as ts]
@@ -163,7 +163,7 @@
(wrap-dispatch cfg $ mdata)
(wrap-metrics cfg $ mdata)
(retry/wrap-retry cfg $ mdata)
- (rsem/wrap cfg $ mdata)
+ (climit/wrap cfg $ mdata)
(rlimit/wrap cfg $ mdata)
(wrap-audit cfg $ mdata))
@@ -175,6 +175,7 @@
(fn [{:keys [::request] :as params}]
;; Raise authentication error when rpc method requires auth but
;; no profile-id is found in the request.
+
(p/do!
(if (and auth? (not (uuid? (:profile-id params))))
(ex/raise :type :authentication
@@ -182,7 +183,6 @@
:hint "authentication required for this endpoint")
(let [params (us/conform spec (dissoc params ::request))]
(f cfg (assoc params ::request request))))))
-
mdata)))
(defn- process-method
@@ -238,6 +238,7 @@
(s/def ::http-client fn?)
(s/def ::ldap (s/nilable map?))
(s/def ::msgbus ::mbus/msgbus)
+(s/def ::climit (s/nilable ::climit/climit))
(s/def ::rlimit (s/nilable ::rlimit/rlimit))
(s/def ::public-uri ::us/not-empty-string)
@@ -251,7 +252,7 @@
::public-uri
::msgbus
::http-client
- ::rsem/semaphores
+ ::climit
::rlimit
::mtx/metrics
::db/pool
diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj
new file mode 100644
index 000000000..76c6f44e7
--- /dev/null
+++ b/backend/src/app/rpc/climit.clj
@@ -0,0 +1,205 @@
+;; This Source Code Form is subject to the terms of the Mozilla Public
+;; License, v. 2.0. If a copy of the MPL was not distributed with this
+;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
+;;
+;; Copyright (c) KALEIDOS INC
+
+(ns app.rpc.climit
+ "Concurrencly limiter for RPC."
+ (:require
+ [app.common.data :as d]
+ [app.common.exceptions :as ex]
+ [app.common.logging :as l]
+ [app.common.spec :as us]
+ [app.config :as cf]
+ [app.metrics :as mtx]
+ [app.rpc :as-alias rpc]
+ [app.util.services :as-alias sv]
+ [app.util.time :as dt]
+ [app.worker :as-alias wrk]
+ [clojure.edn :as edn]
+ [clojure.spec.alpha :as s]
+ [datoteka.fs :as fs]
+ [integrant.core :as ig]
+ [promesa.core :as p]
+ [promesa.exec :as px]
+ [promesa.exec.bulkhead :as pxb])
+ (:import
+ com.github.benmanes.caffeine.cache.Cache
+ com.github.benmanes.caffeine.cache.CacheLoader
+ com.github.benmanes.caffeine.cache.Caffeine
+ com.github.benmanes.caffeine.cache.RemovalListener))
+
+(defn- capacity-exception?
+ [o]
+ (and (ex/ex-info? o)
+ (let [data (ex-data o)]
+ (and (= :bulkhead-error (:type data))
+ (= :capacity-limit-reached (:code data))))))
+
+(defn invoke!
+ [limiter f]
+ (p/handle
+ (px/submit! limiter f)
+ (fn [result cause]
+ (cond
+ (capacity-exception? cause)
+ (p/rejected
+ (ex/error :type :internal
+ :code :concurrency-limit-reached
+ :queue (-> limiter meta :bkey name)
+ :cause cause))
+
+ (some? cause)
+ (p/rejected cause)
+
+ :else
+ (p/resolved result)))))
+
+(defn- create-limiter
+ [{:keys [executor metrics concurrency queue-size bkey skey]}]
+ (let [labels (into-array String [(name bkey)])
+ on-queue (fn [instance]
+ (l/trace :hint "enqueued"
+ :key (name bkey)
+ :skey (str skey)
+ :queue-size (get instance :current-queue-size)
+ :concurrency (get instance :current-concurrency)
+ (mtx/run! metrics
+ :id :rpc-climit-queue-size
+ :val (get instance :current-queue-size)
+ :labels labels)
+ (mtx/run! metrics
+ :id :rpc-climit-concurrency
+ :val (get instance :current-concurrency)
+ :labels labels)))
+
+ on-run (fn [instance task]
+ (let [elapsed (- (inst-ms (dt/now))
+ (inst-ms task))]
+ (l/trace :hint "execute"
+ :key (name bkey)
+ :skey (str skey)
+ :elapsed (str elapsed "ms"))
+ (mtx/run! metrics
+ :id :rpc-climit-timing
+ :val elapsed
+ :labels labels)
+ (mtx/run! metrics
+ :id :rpc-climit-queue-size
+ :val (get instance :current-queue-size)
+ :labels labels)
+ (mtx/run! metrics
+ :id :rpc-climit-concurrency
+ :val (get instance :current-concurrency)
+ :labels labels)))
+
+ options {:executor executor
+ :concurrency concurrency
+ :queue-size (or queue-size Integer/MAX_VALUE)
+ :on-queue on-queue
+ :on-run on-run}]
+
+ (-> (pxb/create options)
+ (vary-meta assoc :bkey bkey :skey skey))))
+
+(defn- create-cache
+ [{:keys [executor] :as params} config]
+ (let [listener (reify RemovalListener
+ (onRemoval [_ key _val cause]
+ (l/trace :hint "cache: remove" :key key :reason (str cause))))
+
+ loader (reify CacheLoader
+ (load [_ key]
+ (let [[bkey skey] key]
+ (when-let [config (get config bkey)]
+ (-> (merge params config)
+ (assoc :bkey bkey)
+ (assoc :skey skey)
+ (create-limiter))))))]
+
+ (.. (Caffeine/newBuilder)
+ (weakValues)
+ (executor executor)
+ (removalListener listener)
+ (build loader))))
+
+(defprotocol IConcurrencyManager)
+
+(s/def ::concurrency ::us/integer)
+(s/def ::queue-size ::us/integer)
+(s/def ::config
+ (s/map-of keyword?
+ (s/keys :req-un [::concurrency]
+ :opt-un [::queue-size])))
+
+(defmethod ig/prep-key ::rpc/climit
+ [_ cfg]
+ (merge {:path (cf/get :rpc-climit-config)}
+ (d/without-nils cfg)))
+
+(defmethod ig/pre-init-spec ::rpc/climit [_]
+ (s/keys :req-un [::wrk/executor ::mtx/metrics ::fs/path]))
+
+(defmethod ig/init-key ::rpc/climit
+ [_ {:keys [path] :as params}]
+ (when (contains? cf/flags :rpc-climit)
+ (if-let [config (some->> path slurp edn/read-string)]
+ (do
+ (l/info :hint "initializing concurrency limit" :config (str path))
+ (us/verify! ::config config)
+
+ (let [cache (create-cache params config)]
+ ^{::cache cache}
+ (reify
+ IConcurrencyManager
+ clojure.lang.IDeref
+ (deref [_] config)
+
+ clojure.lang.ILookup
+ (valAt [_ key]
+ (let [key (if (vector? key) key [key])]
+ (.get ^Cache cache key))))))
+
+ (l/warn :hint "unable to load configuration" :config (str path)))))
+
+
+(s/def ::climit #(satisfies? IConcurrencyManager %))
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;; PUBLIC API
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+(defmacro with-dispatch
+ [lim & body]
+ `(if ~lim
+ (invoke! ~lim (^:once fn [] (p/wrap (do ~@body))))
+ (p/wrap (do ~@body))))
+
+(defn wrap
+ [{:keys [climit]} f {:keys [::queue ::key-fn] :as mdata}]
+ (if (and (some? climit)
+ (some? queue))
+ (if-let [config (get @climit queue)]
+ (do
+ (l/debug :hint "wrap: instrumenting method"
+ :limit-name (name queue)
+ :service-name (::sv/name mdata)
+ :queue-size (or (:queue-size config) Integer/MAX_VALUE)
+ :concurrency (:concurrency config)
+ :keyed? (some? key-fn))
+ (if (some? key-fn)
+ (fn [cfg params]
+ (let [key [queue (key-fn params)]
+ lim (get climit key)]
+ (invoke! lim (partial f cfg params))))
+
+ (let [lim (get climit queue)]
+ (fn [cfg params]
+ (invoke! lim (partial f cfg params))))))
+ (do
+ (l/warn :hint "wrap: no config found"
+ :queue (name queue)
+ :service (::sv/name mdata))
+ f))
+ f))
diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj
index f66ed1358..f41f6bf92 100644
--- a/backend/src/app/rpc/commands/auth.clj
+++ b/backend/src/app/rpc/commands/auth.clj
@@ -16,10 +16,10 @@
[app.http.session :as session]
[app.loggers.audit :as audit]
[app.rpc :as-alias rpc]
+ [app.rpc.climit :as climit]
[app.rpc.doc :as-alias doc]
[app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile]
- [app.rpc.semaphore :as rsem]
[app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
@@ -147,7 +147,7 @@
(sv/defmethod ::login-with-password
"Performs authentication using penpot password."
{:auth false
- ::rsem/queue :auth
+ ::climit/queue :auth
::doc/added "1.15"}
[cfg params]
(login-with-password cfg params))
@@ -188,7 +188,7 @@
(sv/defmethod ::recover-profile
{:auth false
- ::rsem/queue :auth
+ ::climit/queue :auth
::doc/added "1.15"}
[cfg params]
(recover-profile cfg params))
@@ -438,7 +438,7 @@
(sv/defmethod ::register-profile
{:auth false
- ::rsem/queue :auth
+ ::climit/queue :auth
::doc/added "1.15"}
[{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool]
diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj
index 5884a1a43..d29382a37 100644
--- a/backend/src/app/rpc/mutations/files.clj
+++ b/backend/src/app/rpc/mutations/files.clj
@@ -22,10 +22,10 @@
[app.msgbus :as mbus]
[app.rpc :as-alias rpc]
[app.rpc.doc :as-alias doc]
+ [app.rpc.climit :as climit]
[app.rpc.permissions :as perms]
[app.rpc.queries.files :as files]
[app.rpc.queries.projects :as proj]
- [app.rpc.semaphore :as rsem]
[app.storage.impl :as simpl]
[app.util.blob :as blob]
[app.util.objects-map :as omap]
@@ -346,8 +346,8 @@
FOR KEY SHARE")
(sv/defmethod ::update-file
- {::rsem/queue :update-file
- ::doc/added "1.0"}
+ {::climit/queue :update-file
+ ::climit/key-fn :id}
[{:keys [pool] :as cfg} {:keys [id profile-id components-v2] :as params}]
(db/with-atomic [conn pool]
(db/xact-lock! conn id)
diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj
index 1868feae2..61f4b3ee5 100644
--- a/backend/src/app/rpc/mutations/fonts.clj
+++ b/backend/src/app/rpc/mutations/fonts.clj
@@ -12,14 +12,15 @@
[app.common.uuid :as uuid]
[app.db :as db]
[app.media :as media]
+ [app.rpc.climit :as-alias climit]
[app.rpc.doc :as-alias doc]
[app.rpc.queries.teams :as teams]
- [app.rpc.semaphore :as rsem]
[app.storage :as sto]
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
- [promesa.core :as p]))
+ [promesa.core :as p]
+ [promesa.exec :as px]))
(declare create-font-variant)
@@ -46,15 +47,15 @@
(create-font-variant cfg params)))
(defn create-font-variant
- [{:keys [storage pool executor semaphores] :as cfg} {:keys [data] :as params}]
+ [{:keys [storage pool executor climit] :as cfg} {:keys [data] :as params}]
(letfn [(generate-fonts [data]
- (rsem/with-dispatch (:process-font semaphores)
+ (climit/with-dispatch (:process-font climit)
(media/run {:cmd :generate-fonts :input data})))
;; Function responsible of calculating cryptographyc hash of
;; the provided data.
(calculate-hash [data]
- (rsem/with-dispatch (:process-font semaphores)
+ (px/with-dispatch executor
(sto/calculate-hash data)))
(validate-data [data]
@@ -120,6 +121,7 @@
and font_id = ?")
(sv/defmethod ::update-font
+ {::climit/queue :process-font}
[{:keys [pool] :as cfg} {:keys [team-id profile-id id name] :as params}]
(db/with-atomic [conn pool]
(teams/check-edition-permissions! conn profile-id team-id)
diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj
index 2e0483647..2971d64bc 100644
--- a/backend/src/app/rpc/mutations/media.clj
+++ b/backend/src/app/rpc/mutations/media.clj
@@ -14,8 +14,8 @@
[app.config :as cf]
[app.db :as db]
[app.media :as media]
+ [app.rpc.climit :as climit]
[app.rpc.queries.teams :as teams]
- [app.rpc.semaphore :as rsem]
[app.storage :as sto]
[app.storage.tmp :as tmp]
[app.util.services :as sv]
@@ -23,7 +23,8 @@
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.io :as io]
- [promesa.core :as p]))
+ [promesa.core :as p]
+ [promesa.exec :as px]))
(def default-max-file-size (* 1024 1024 10)) ; 10 MiB
@@ -104,25 +105,25 @@
;; inverse, soft referential integrity).
(defn create-file-media-object
- [{:keys [storage pool semaphores] :as cfg}
+ [{:keys [storage pool climit executor] :as cfg}
{:keys [id file-id is-local name content] :as params}]
(letfn [;; Function responsible to retrieve the file information, as
;; it is synchronous operation it should be wrapped into
;; with-dispatch macro.
(get-info [content]
- (rsem/with-dispatch (:process-image semaphores)
+ (climit/with-dispatch (:process-image climit)
(media/run {:cmd :info :input content})))
;; Function responsible of calculating cryptographyc hash of
;; the provided data.
(calculate-hash [data]
- (rsem/with-dispatch (:process-image semaphores)
+ (px/with-dispatch executor
(sto/calculate-hash data)))
;; Function responsible of generating thumnail. As it is synchronous
;; opetation, it should be wrapped into with-dispatch macro
(generate-thumbnail [info]
- (rsem/with-dispatch (:process-image semaphores)
+ (climit/with-dispatch (:process-image climit)
(media/run (assoc thumbnail-options
:cmd :generic-thumbnail
:input info))))
@@ -154,14 +155,15 @@
:bucket "file-media-object"})))
(insert-into-database [info image thumb]
- (db/exec-one! pool [sql:create-file-media-object
- (or id (uuid/next))
- file-id is-local name
- (:id image)
- (:id thumb)
- (:width info)
- (:height info)
- (:mtype info)]))]
+ (px/with-dispatch executor
+ (db/exec-one! pool [sql:create-file-media-object
+ (or id (uuid/next))
+ file-id is-local name
+ (:id image)
+ (:id thumb)
+ (:width info)
+ (:height info)
+ (:mtype info)])))]
(p/let [info (get-info content)
thumb (create-thumbnail info)
diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj
index 56846c30d..da087c8c1 100644
--- a/backend/src/app/rpc/mutations/profile.clj
+++ b/backend/src/app/rpc/mutations/profile.clj
@@ -16,11 +16,11 @@
[app.loggers.audit :as audit]
[app.media :as media]
[app.rpc :as-alias rpc]
+ [app.rpc.climit :as-alias climit]
[app.rpc.commands.auth :as cmd.auth]
[app.rpc.doc :as-alias doc]
[app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile]
- [app.rpc.semaphore :as rsem]
[app.storage :as sto]
[app.tokens :as tokens]
[app.util.services :as sv]
@@ -83,11 +83,11 @@
(s/keys :req-un [::profile-id ::password ::old-password]))
(sv/defmethod ::update-profile-password
- {::rsem/queue :auth}
+ {::climit/queue :auth}
[{:keys [pool] :as cfg} {:keys [password] :as params}]
(db/with-atomic [conn pool]
(let [profile (validate-password! conn params)
- session-id (:app.rpc/session-id params)]
+ session-id (::rpc/session-id params)]
(when (= (str/lower (:email profile))
(str/lower (:password params)))
(ex/raise :type :validation
@@ -309,7 +309,7 @@
(sv/defmethod ::login
{:auth false
- ::rsem/queue :auth
+ ::climit/queue :auth
::doc/added "1.0"
::doc/deprecated "1.15"}
[cfg params]
@@ -354,7 +354,7 @@
(sv/defmethod ::register-profile
{:auth false
- ::rsem/queue :auth
+ ::climit/queue :auth
::doc/added "1.0"
::doc/deprecated "1.15"}
[{:keys [pool] :as cfg} params]
diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj
index da04e4c65..288cfdf76 100644
--- a/backend/src/app/rpc/mutations/teams.clj
+++ b/backend/src/app/rpc/mutations/teams.clj
@@ -17,11 +17,11 @@
[app.loggers.audit :as audit]
[app.media :as media]
[app.rpc :as-alias rpc]
+ [app.rpc.climit :as climit]
[app.rpc.mutations.projects :as projects]
[app.rpc.permissions :as perms]
[app.rpc.queries.profile :as profile]
[app.rpc.queries.teams :as teams]
- [app.rpc.semaphore :as rsem]
[app.storage :as sto]
[app.tokens :as tokens]
[app.util.services :as sv]
@@ -316,13 +316,13 @@
(assoc team :photo-id (:id photo))))
(defn upload-photo
- [{:keys [storage semaphores] :as cfg} {:keys [file]}]
+ [{:keys [storage executor climit] :as cfg} {:keys [file]}]
(letfn [(get-info [content]
- (rsem/with-dispatch (:process-image semaphores)
+ (climit/with-dispatch (:process-image climit)
(media/run {:cmd :info :input content})))
(generate-thumbnail [info]
- (rsem/with-dispatch (:process-image semaphores)
+ (climit/with-dispatch (:process-image climit)
(media/run {:cmd :profile-thumbnail
:format :jpeg
:quality 85
@@ -333,7 +333,7 @@
;; Function responsible of calculating cryptographyc hash of
;; the provided data.
(calculate-hash [data]
- (rsem/with-dispatch (:process-image semaphores)
+ (px/with-dispatch executor
(sto/calculate-hash data)))]
(p/let [info (get-info file)
@@ -341,11 +341,10 @@
hash (calculate-hash (:data thumb))
content (-> (sto/content (:data thumb) (:size thumb))
(sto/wrap-with-hash hash))]
- (rsem/with-dispatch (:process-image semaphores)
- (sto/put-object! storage {::sto/content content
- ::sto/deduplicate? true
- :bucket "profile"
- :content-type (:mtype thumb)})))))
+ (sto/put-object! storage {::sto/content content
+ ::sto/deduplicate? true
+ :bucket "profile"
+ :content-type (:mtype thumb)}))))
;; --- Mutation: Invite Member
diff --git a/backend/src/app/rpc/semaphore.clj b/backend/src/app/rpc/semaphore.clj
deleted file mode 100644
index 5e8a5a5ed..000000000
--- a/backend/src/app/rpc/semaphore.clj
+++ /dev/null
@@ -1,149 +0,0 @@
-;; This Source Code Form is subject to the terms of the Mozilla Public
-;; License, v. 2.0. If a copy of the MPL was not distributed with this
-;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
-;;
-;; Copyright (c) KALEIDOS INC
-
-(ns app.rpc.semaphore
- "Resource usage limits (in other words: semaphores)."
- (:require
- [app.common.data :as d]
- [app.common.logging :as l]
- [app.common.spec :as us]
- [app.config :as cf]
- [app.metrics :as mtx]
- [app.rpc :as-alias rpc]
- [app.util.locks :as locks]
- [app.util.time :as ts]
- [app.worker :as-alias wrk]
- [clojure.spec.alpha :as s]
- [integrant.core :as ig]
- [promesa.core :as p]))
-
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-;; ASYNC SEMAPHORE IMPL
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-
-(defprotocol IAsyncSemaphore
- (acquire! [_])
- (release! [_ tp]))
-
-(defn create
- [& {:keys [permits metrics name executor]}]
- (let [used (volatile! 0)
- queue (volatile! (d/queue))
- labels (into-array String [(d/name name)])
- lock (locks/create)
- permits (or permits Long/MAX_VALUE)]
-
- (when (>= permits Long/MAX_VALUE)
- (l/warn :hint "permits value too high" :permits permits :semaphore name))
-
- ^{::wrk/executor executor
- ::name name}
- (reify IAsyncSemaphore
- (acquire! [_]
- (let [d (p/deferred)]
- (locks/locking lock
- (if (< @used permits)
- (do
- (vswap! used inc)
- (p/resolve! d))
- (vswap! queue conj d)))
-
- (mtx/run! metrics
- :id :semaphore-used-permits
- :val @used
- :labels labels)
- (mtx/run! metrics
- :id :semaphore-queued-submissions
- :val (count @queue)
- :labels labels)
- d))
-
- (release! [_ tp]
- (locks/locking lock
- (if-let [item (peek @queue)]
- (do
- (vswap! queue pop)
- (p/resolve! item))
- (when (pos? @used)
- (vswap! used dec))))
-
- (mtx/run! metrics
- :id :semaphore-timing
- :val (inst-ms (tp))
- :labels labels)
- (mtx/run! metrics
- :id :semaphore-used-permits
- :val @used
- :labels labels)
- (mtx/run! metrics
- :id :semaphore-queued-submissions
- :val (count @queue)
- :labels labels)))))
-
-(defn semaphore?
- [v]
- (satisfies? IAsyncSemaphore v))
-
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-;; PREDEFINED SEMAPHORES
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-
-(s/def ::semaphore semaphore?)
-(s/def ::semaphores
- (s/map-of ::us/keyword ::semaphore))
-
-(defmethod ig/pre-init-spec ::rpc/semaphores [_]
- (s/keys :req-un [::mtx/metrics]))
-
-(defn- create-default-semaphores
- [metrics executor]
- [(create :permits (cf/get :semaphore-process-font)
- :metrics metrics
- :name :process-font
- :executor executor)
- (create :permits (cf/get :semaphore-update-file)
- :metrics metrics
- :name :update-file
- :executor executor)
- (create :permits (cf/get :semaphore-process-image)
- :metrics metrics
- :name :process-image
- :executor executor)
- (create :permits (cf/get :semaphore-auth)
- :metrics metrics
- :name :auth
- :executor executor)])
-
-(defmethod ig/init-key ::rpc/semaphores
- [_ {:keys [metrics executor]}]
- (->> (create-default-semaphores metrics executor)
- (d/index-by (comp ::name meta))))
-
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-;; PUBLIC API
-;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-
-(defmacro with-dispatch
- [queue & body]
- `(let [tpoint# (ts/tpoint)
- queue# ~queue
- executor# (-> queue# meta ::wrk/executor)]
- (-> (acquire! queue#)
- (p/then (fn [_#] ~@body) executor#)
- (p/finally (fn [_# _#]
- (release! queue# tpoint#))))))
-
-(defn wrap
- [{:keys [semaphores]} f {:keys [::queue]}]
- (let [queue' (get semaphores queue)]
- (if (semaphore? queue')
- (fn [cfg params]
- (with-dispatch queue'
- (f cfg params)))
- (do
- (when (some? queue)
- (l/warn :hint "undefined semaphore" :name queue))
- f))))
diff --git a/common/deps.edn b/common/deps.edn
index 25ff87865..c8a61a63e 100644
--- a/common/deps.edn
+++ b/common/deps.edn
@@ -21,7 +21,7 @@
com.cognitect/transit-cljs {:mvn/version "0.8.280"}
java-http-clj/java-http-clj {:mvn/version "0.4.3"}
- funcool/promesa {:mvn/version "8.0.450"}
+ funcool/promesa {:mvn/version "9.0.507"}
funcool/cuerdas {:mvn/version "2022.06.16-403"}
lambdaisland/uri {:mvn/version "1.13.95"