mirror of
https://github.com/penpot/penpot.git
synced 2025-05-18 14:46:10 +02:00
🎉 Add robust concurrency limiter for RPC
This commit is contained in:
parent
6ad9a5aadb
commit
37ad04d2a6
17 changed files with 296 additions and 211 deletions
|
@ -37,6 +37,8 @@
|
||||||
buddy/buddy-hashers {:mvn/version "1.8.158"}
|
buddy/buddy-hashers {:mvn/version "1.8.158"}
|
||||||
buddy/buddy-sign {:mvn/version "3.4.333"}
|
buddy/buddy-sign {:mvn/version "3.4.333"}
|
||||||
|
|
||||||
|
com.github.ben-manes.caffeine/caffeine {:mvn/version "3.1.1"}
|
||||||
|
|
||||||
org.jsoup/jsoup {:mvn/version "1.15.1"}
|
org.jsoup/jsoup {:mvn/version "1.15.1"}
|
||||||
org.im4java/im4java
|
org.im4java/im4java
|
||||||
{:git/tag "1.4.0-penpot-2"
|
{:git/tag "1.4.0-penpot-2"
|
||||||
|
|
7
backend/resources/climit.edn
Normal file
7
backend/resources/climit.edn
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
;; Example climit.edn file
|
||||||
|
;; Required: concurrency
|
||||||
|
;; Optional: queue-size, ommited means Integer/MAX_VALUE
|
||||||
|
{:update-file {:concurrency 1 :queue-size 3}
|
||||||
|
:auth {:concurrency 128}
|
||||||
|
:process-font {:concurrency 4 :queue-size 32}
|
||||||
|
:process-image {:concurrency 8 :queue-size 32}}
|
|
@ -32,6 +32,7 @@
|
||||||
<Logger name="app.util.websocket" level="info" />
|
<Logger name="app.util.websocket" level="info" />
|
||||||
<Logger name="app.redis" level="info" />
|
<Logger name="app.redis" level="info" />
|
||||||
<Logger name="app.rpc.rlimit" level="info" />
|
<Logger name="app.rpc.rlimit" level="info" />
|
||||||
|
<Logger name="app.rpc.climit" level="info" />
|
||||||
<Logger name="app.rpc.mutations.files" level="info" />
|
<Logger name="app.rpc.mutations.files" level="info" />
|
||||||
|
|
||||||
<Logger name="app.cli" level="debug" additivity="false">
|
<Logger name="app.cli" level="debug" additivity="false">
|
||||||
|
|
|
@ -52,7 +52,9 @@
|
||||||
|
|
||||||
:default-blob-version 5
|
:default-blob-version 5
|
||||||
:loggers-zmq-uri "tcp://localhost:45556"
|
:loggers-zmq-uri "tcp://localhost:45556"
|
||||||
|
|
||||||
:rpc-rlimit-config (fs/path "resources/rlimit.edn")
|
:rpc-rlimit-config (fs/path "resources/rlimit.edn")
|
||||||
|
:rpc-climit-config (fs/path "resources/climit.edn")
|
||||||
|
|
||||||
:file-change-snapshot-every 5
|
:file-change-snapshot-every 5
|
||||||
:file-change-snapshot-timeout "3h"
|
:file-change-snapshot-timeout "3h"
|
||||||
|
@ -90,6 +92,7 @@
|
||||||
|
|
||||||
(s/def ::default-rpc-rlimit ::us/vector-of-strings)
|
(s/def ::default-rpc-rlimit ::us/vector-of-strings)
|
||||||
(s/def ::rpc-rlimit-config ::fs/path)
|
(s/def ::rpc-rlimit-config ::fs/path)
|
||||||
|
(s/def ::rpc-climit-config ::fs/path)
|
||||||
|
|
||||||
(s/def ::media-max-file-size ::us/integer)
|
(s/def ::media-max-file-size ::us/integer)
|
||||||
|
|
||||||
|
@ -172,11 +175,6 @@
|
||||||
(s/def ::redis-uri ::us/string)
|
(s/def ::redis-uri ::us/string)
|
||||||
(s/def ::registration-domain-whitelist ::us/set-of-strings)
|
(s/def ::registration-domain-whitelist ::us/set-of-strings)
|
||||||
|
|
||||||
(s/def ::semaphore-process-font ::us/integer)
|
|
||||||
(s/def ::semaphore-process-image ::us/integer)
|
|
||||||
(s/def ::semaphore-update-file ::us/integer)
|
|
||||||
(s/def ::semaphore-auth ::us/integer)
|
|
||||||
|
|
||||||
(s/def ::smtp-default-from ::us/string)
|
(s/def ::smtp-default-from ::us/string)
|
||||||
(s/def ::smtp-default-reply-to ::us/string)
|
(s/def ::smtp-default-reply-to ::us/string)
|
||||||
(s/def ::smtp-host ::us/string)
|
(s/def ::smtp-host ::us/string)
|
||||||
|
|
|
@ -94,6 +94,23 @@
|
||||||
[err _]
|
[err _]
|
||||||
(yrs/response 404 (ex-data err)))
|
(yrs/response 404 (ex-data err)))
|
||||||
|
|
||||||
|
(defmethod handle-exception :internal
|
||||||
|
[error request]
|
||||||
|
(let [{:keys [code] :as edata} (ex-data error)]
|
||||||
|
(cond
|
||||||
|
(= :concurrency-limit-reached code)
|
||||||
|
(yrs/response 429)
|
||||||
|
|
||||||
|
:else
|
||||||
|
(do
|
||||||
|
(l/error ::l/raw (ex-message error)
|
||||||
|
::l/context (get-context request)
|
||||||
|
:cause error)
|
||||||
|
(yrs/response 500 {:type :server-error
|
||||||
|
:code :unhandled
|
||||||
|
:hint (ex-message error)
|
||||||
|
:data edata})))))
|
||||||
|
|
||||||
(defmethod handle-exception org.postgresql.util.PSQLException
|
(defmethod handle-exception org.postgresql.util.PSQLException
|
||||||
[error request]
|
[error request]
|
||||||
(let [state (.getSQLState ^java.sql.SQLException error)]
|
(let [state (.getSQLState ^java.sql.SQLException error)]
|
||||||
|
|
|
@ -204,7 +204,7 @@
|
||||||
{:pool (ig/ref :app.db/pool)
|
{:pool (ig/ref :app.db/pool)
|
||||||
:executor (ig/ref [::default :app.worker/executor])}
|
:executor (ig/ref [::default :app.worker/executor])}
|
||||||
|
|
||||||
:app.rpc/semaphores
|
:app.rpc/climit
|
||||||
{:metrics (ig/ref :app.metrics/metrics)
|
{:metrics (ig/ref :app.metrics/metrics)
|
||||||
:executor (ig/ref [::default :app.worker/executor])}
|
:executor (ig/ref [::default :app.worker/executor])}
|
||||||
|
|
||||||
|
@ -224,11 +224,11 @@
|
||||||
:audit (ig/ref :app.loggers.audit/collector)
|
:audit (ig/ref :app.loggers.audit/collector)
|
||||||
:ldap (ig/ref :app.auth.ldap/provider)
|
:ldap (ig/ref :app.auth.ldap/provider)
|
||||||
:http-client (ig/ref :app.http/client)
|
:http-client (ig/ref :app.http/client)
|
||||||
|
:climit (ig/ref :app.rpc/climit)
|
||||||
:rlimit (ig/ref :app.rpc/rlimit)
|
:rlimit (ig/ref :app.rpc/rlimit)
|
||||||
:executors (ig/ref :app.worker/executors)
|
:executors (ig/ref :app.worker/executors)
|
||||||
:executor (ig/ref [::default :app.worker/executor])
|
:executor (ig/ref [::default :app.worker/executor])
|
||||||
:templates (ig/ref :app.setup/builtin-templates)
|
:templates (ig/ref :app.setup/builtin-templates)
|
||||||
:semaphores (ig/ref :app.rpc/semaphores)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
:app.rpc.doc/routes
|
:app.rpc.doc/routes
|
||||||
|
|
|
@ -100,21 +100,21 @@
|
||||||
::mdef/labels ["name"]
|
::mdef/labels ["name"]
|
||||||
::mdef/type :summary}
|
::mdef/type :summary}
|
||||||
|
|
||||||
:semaphore-queued-submissions
|
:rpc-climit-queue-size
|
||||||
{::mdef/name "penpot_semaphore_queued_submissions"
|
{::mdef/name "penpot_rpc_climit_queue_size"
|
||||||
::mdef/help "Current number of queued submissions on SEMAPHORE."
|
::mdef/help "Current number of queued submissions on the CLIMIT."
|
||||||
::mdef/labels ["name"]
|
::mdef/labels ["name"]
|
||||||
::mdef/type :gauge}
|
::mdef/type :gauge}
|
||||||
|
|
||||||
:semaphore-used-permits
|
:rpc-climit-concurrency
|
||||||
{::mdef/name "penpot_semaphore_used_permits"
|
{::mdef/name "penpot_rpc_climit_concurrency"
|
||||||
::mdef/help "Current number of used permits on SEMAPHORE."
|
::mdef/help "Current number of used concurrency capacity on the CLIMIT"
|
||||||
::mdef/labels ["name"]
|
::mdef/labels ["name"]
|
||||||
::mdef/type :gauge}
|
::mdef/type :gauge}
|
||||||
|
|
||||||
:semaphore-timing
|
:rpc-climit-timing
|
||||||
{::mdef/name "penpot_semaphore_timing"
|
{::mdef/name "penpot_rpc_climit_timing"
|
||||||
::mdef/help "Total timing of SEMAPHORE."
|
::mdef/help "Summary of the time between queuing and executing on the CLIMIT"
|
||||||
::mdef/labels ["name"]
|
::mdef/labels ["name"]
|
||||||
::mdef/type :summary}
|
::mdef/type :summary}
|
||||||
|
|
||||||
|
|
|
@ -15,9 +15,9 @@
|
||||||
[app.loggers.audit :as audit]
|
[app.loggers.audit :as audit]
|
||||||
[app.metrics :as mtx]
|
[app.metrics :as mtx]
|
||||||
[app.msgbus :as-alias mbus]
|
[app.msgbus :as-alias mbus]
|
||||||
|
[app.rpc.climit :as climit]
|
||||||
[app.rpc.retry :as retry]
|
[app.rpc.retry :as retry]
|
||||||
[app.rpc.rlimit :as rlimit]
|
[app.rpc.rlimit :as rlimit]
|
||||||
[app.rpc.semaphore :as-alias rsem]
|
|
||||||
[app.storage :as-alias sto]
|
[app.storage :as-alias sto]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
[app.util.time :as ts]
|
[app.util.time :as ts]
|
||||||
|
@ -163,7 +163,7 @@
|
||||||
(wrap-dispatch cfg $ mdata)
|
(wrap-dispatch cfg $ mdata)
|
||||||
(wrap-metrics cfg $ mdata)
|
(wrap-metrics cfg $ mdata)
|
||||||
(retry/wrap-retry cfg $ mdata)
|
(retry/wrap-retry cfg $ mdata)
|
||||||
(rsem/wrap cfg $ mdata)
|
(climit/wrap cfg $ mdata)
|
||||||
(rlimit/wrap cfg $ mdata)
|
(rlimit/wrap cfg $ mdata)
|
||||||
(wrap-audit cfg $ mdata))
|
(wrap-audit cfg $ mdata))
|
||||||
|
|
||||||
|
@ -175,6 +175,7 @@
|
||||||
(fn [{:keys [::request] :as params}]
|
(fn [{:keys [::request] :as params}]
|
||||||
;; Raise authentication error when rpc method requires auth but
|
;; Raise authentication error when rpc method requires auth but
|
||||||
;; no profile-id is found in the request.
|
;; no profile-id is found in the request.
|
||||||
|
|
||||||
(p/do!
|
(p/do!
|
||||||
(if (and auth? (not (uuid? (:profile-id params))))
|
(if (and auth? (not (uuid? (:profile-id params))))
|
||||||
(ex/raise :type :authentication
|
(ex/raise :type :authentication
|
||||||
|
@ -182,7 +183,6 @@
|
||||||
:hint "authentication required for this endpoint")
|
:hint "authentication required for this endpoint")
|
||||||
(let [params (us/conform spec (dissoc params ::request))]
|
(let [params (us/conform spec (dissoc params ::request))]
|
||||||
(f cfg (assoc params ::request request))))))
|
(f cfg (assoc params ::request request))))))
|
||||||
|
|
||||||
mdata)))
|
mdata)))
|
||||||
|
|
||||||
(defn- process-method
|
(defn- process-method
|
||||||
|
@ -238,6 +238,7 @@
|
||||||
(s/def ::http-client fn?)
|
(s/def ::http-client fn?)
|
||||||
(s/def ::ldap (s/nilable map?))
|
(s/def ::ldap (s/nilable map?))
|
||||||
(s/def ::msgbus ::mbus/msgbus)
|
(s/def ::msgbus ::mbus/msgbus)
|
||||||
|
(s/def ::climit (s/nilable ::climit/climit))
|
||||||
(s/def ::rlimit (s/nilable ::rlimit/rlimit))
|
(s/def ::rlimit (s/nilable ::rlimit/rlimit))
|
||||||
|
|
||||||
(s/def ::public-uri ::us/not-empty-string)
|
(s/def ::public-uri ::us/not-empty-string)
|
||||||
|
@ -251,7 +252,7 @@
|
||||||
::public-uri
|
::public-uri
|
||||||
::msgbus
|
::msgbus
|
||||||
::http-client
|
::http-client
|
||||||
::rsem/semaphores
|
::climit
|
||||||
::rlimit
|
::rlimit
|
||||||
::mtx/metrics
|
::mtx/metrics
|
||||||
::db/pool
|
::db/pool
|
||||||
|
|
205
backend/src/app/rpc/climit.clj
Normal file
205
backend/src/app/rpc/climit.clj
Normal file
|
@ -0,0 +1,205 @@
|
||||||
|
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
;;
|
||||||
|
;; Copyright (c) KALEIDOS INC
|
||||||
|
|
||||||
|
(ns app.rpc.climit
|
||||||
|
"Concurrencly limiter for RPC."
|
||||||
|
(:require
|
||||||
|
[app.common.data :as d]
|
||||||
|
[app.common.exceptions :as ex]
|
||||||
|
[app.common.logging :as l]
|
||||||
|
[app.common.spec :as us]
|
||||||
|
[app.config :as cf]
|
||||||
|
[app.metrics :as mtx]
|
||||||
|
[app.rpc :as-alias rpc]
|
||||||
|
[app.util.services :as-alias sv]
|
||||||
|
[app.util.time :as dt]
|
||||||
|
[app.worker :as-alias wrk]
|
||||||
|
[clojure.edn :as edn]
|
||||||
|
[clojure.spec.alpha :as s]
|
||||||
|
[datoteka.fs :as fs]
|
||||||
|
[integrant.core :as ig]
|
||||||
|
[promesa.core :as p]
|
||||||
|
[promesa.exec :as px]
|
||||||
|
[promesa.exec.bulkhead :as pxb])
|
||||||
|
(:import
|
||||||
|
com.github.benmanes.caffeine.cache.Cache
|
||||||
|
com.github.benmanes.caffeine.cache.CacheLoader
|
||||||
|
com.github.benmanes.caffeine.cache.Caffeine
|
||||||
|
com.github.benmanes.caffeine.cache.RemovalListener))
|
||||||
|
|
||||||
|
(defn- capacity-exception?
|
||||||
|
[o]
|
||||||
|
(and (ex/ex-info? o)
|
||||||
|
(let [data (ex-data o)]
|
||||||
|
(and (= :bulkhead-error (:type data))
|
||||||
|
(= :capacity-limit-reached (:code data))))))
|
||||||
|
|
||||||
|
(defn invoke!
|
||||||
|
[limiter f]
|
||||||
|
(p/handle
|
||||||
|
(px/submit! limiter f)
|
||||||
|
(fn [result cause]
|
||||||
|
(cond
|
||||||
|
(capacity-exception? cause)
|
||||||
|
(p/rejected
|
||||||
|
(ex/error :type :internal
|
||||||
|
:code :concurrency-limit-reached
|
||||||
|
:queue (-> limiter meta :bkey name)
|
||||||
|
:cause cause))
|
||||||
|
|
||||||
|
(some? cause)
|
||||||
|
(p/rejected cause)
|
||||||
|
|
||||||
|
:else
|
||||||
|
(p/resolved result)))))
|
||||||
|
|
||||||
|
(defn- create-limiter
|
||||||
|
[{:keys [executor metrics concurrency queue-size bkey skey]}]
|
||||||
|
(let [labels (into-array String [(name bkey)])
|
||||||
|
on-queue (fn [instance]
|
||||||
|
(l/trace :hint "enqueued"
|
||||||
|
:key (name bkey)
|
||||||
|
:skey (str skey)
|
||||||
|
:queue-size (get instance :current-queue-size)
|
||||||
|
:concurrency (get instance :current-concurrency)
|
||||||
|
(mtx/run! metrics
|
||||||
|
:id :rpc-climit-queue-size
|
||||||
|
:val (get instance :current-queue-size)
|
||||||
|
:labels labels)
|
||||||
|
(mtx/run! metrics
|
||||||
|
:id :rpc-climit-concurrency
|
||||||
|
:val (get instance :current-concurrency)
|
||||||
|
:labels labels)))
|
||||||
|
|
||||||
|
on-run (fn [instance task]
|
||||||
|
(let [elapsed (- (inst-ms (dt/now))
|
||||||
|
(inst-ms task))]
|
||||||
|
(l/trace :hint "execute"
|
||||||
|
:key (name bkey)
|
||||||
|
:skey (str skey)
|
||||||
|
:elapsed (str elapsed "ms"))
|
||||||
|
(mtx/run! metrics
|
||||||
|
:id :rpc-climit-timing
|
||||||
|
:val elapsed
|
||||||
|
:labels labels)
|
||||||
|
(mtx/run! metrics
|
||||||
|
:id :rpc-climit-queue-size
|
||||||
|
:val (get instance :current-queue-size)
|
||||||
|
:labels labels)
|
||||||
|
(mtx/run! metrics
|
||||||
|
:id :rpc-climit-concurrency
|
||||||
|
:val (get instance :current-concurrency)
|
||||||
|
:labels labels)))
|
||||||
|
|
||||||
|
options {:executor executor
|
||||||
|
:concurrency concurrency
|
||||||
|
:queue-size (or queue-size Integer/MAX_VALUE)
|
||||||
|
:on-queue on-queue
|
||||||
|
:on-run on-run}]
|
||||||
|
|
||||||
|
(-> (pxb/create options)
|
||||||
|
(vary-meta assoc :bkey bkey :skey skey))))
|
||||||
|
|
||||||
|
(defn- create-cache
|
||||||
|
[{:keys [executor] :as params} config]
|
||||||
|
(let [listener (reify RemovalListener
|
||||||
|
(onRemoval [_ key _val cause]
|
||||||
|
(l/trace :hint "cache: remove" :key key :reason (str cause))))
|
||||||
|
|
||||||
|
loader (reify CacheLoader
|
||||||
|
(load [_ key]
|
||||||
|
(let [[bkey skey] key]
|
||||||
|
(when-let [config (get config bkey)]
|
||||||
|
(-> (merge params config)
|
||||||
|
(assoc :bkey bkey)
|
||||||
|
(assoc :skey skey)
|
||||||
|
(create-limiter))))))]
|
||||||
|
|
||||||
|
(.. (Caffeine/newBuilder)
|
||||||
|
(weakValues)
|
||||||
|
(executor executor)
|
||||||
|
(removalListener listener)
|
||||||
|
(build loader))))
|
||||||
|
|
||||||
|
(defprotocol IConcurrencyManager)
|
||||||
|
|
||||||
|
(s/def ::concurrency ::us/integer)
|
||||||
|
(s/def ::queue-size ::us/integer)
|
||||||
|
(s/def ::config
|
||||||
|
(s/map-of keyword?
|
||||||
|
(s/keys :req-un [::concurrency]
|
||||||
|
:opt-un [::queue-size])))
|
||||||
|
|
||||||
|
(defmethod ig/prep-key ::rpc/climit
|
||||||
|
[_ cfg]
|
||||||
|
(merge {:path (cf/get :rpc-climit-config)}
|
||||||
|
(d/without-nils cfg)))
|
||||||
|
|
||||||
|
(defmethod ig/pre-init-spec ::rpc/climit [_]
|
||||||
|
(s/keys :req-un [::wrk/executor ::mtx/metrics ::fs/path]))
|
||||||
|
|
||||||
|
(defmethod ig/init-key ::rpc/climit
|
||||||
|
[_ {:keys [path] :as params}]
|
||||||
|
(when (contains? cf/flags :rpc-climit)
|
||||||
|
(if-let [config (some->> path slurp edn/read-string)]
|
||||||
|
(do
|
||||||
|
(l/info :hint "initializing concurrency limit" :config (str path))
|
||||||
|
(us/verify! ::config config)
|
||||||
|
|
||||||
|
(let [cache (create-cache params config)]
|
||||||
|
^{::cache cache}
|
||||||
|
(reify
|
||||||
|
IConcurrencyManager
|
||||||
|
clojure.lang.IDeref
|
||||||
|
(deref [_] config)
|
||||||
|
|
||||||
|
clojure.lang.ILookup
|
||||||
|
(valAt [_ key]
|
||||||
|
(let [key (if (vector? key) key [key])]
|
||||||
|
(.get ^Cache cache key))))))
|
||||||
|
|
||||||
|
(l/warn :hint "unable to load configuration" :config (str path)))))
|
||||||
|
|
||||||
|
|
||||||
|
(s/def ::climit #(satisfies? IConcurrencyManager %))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; PUBLIC API
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
(defmacro with-dispatch
|
||||||
|
[lim & body]
|
||||||
|
`(if ~lim
|
||||||
|
(invoke! ~lim (^:once fn [] (p/wrap (do ~@body))))
|
||||||
|
(p/wrap (do ~@body))))
|
||||||
|
|
||||||
|
(defn wrap
|
||||||
|
[{:keys [climit]} f {:keys [::queue ::key-fn] :as mdata}]
|
||||||
|
(if (and (some? climit)
|
||||||
|
(some? queue))
|
||||||
|
(if-let [config (get @climit queue)]
|
||||||
|
(do
|
||||||
|
(l/debug :hint "wrap: instrumenting method"
|
||||||
|
:limit-name (name queue)
|
||||||
|
:service-name (::sv/name mdata)
|
||||||
|
:queue-size (or (:queue-size config) Integer/MAX_VALUE)
|
||||||
|
:concurrency (:concurrency config)
|
||||||
|
:keyed? (some? key-fn))
|
||||||
|
(if (some? key-fn)
|
||||||
|
(fn [cfg params]
|
||||||
|
(let [key [queue (key-fn params)]
|
||||||
|
lim (get climit key)]
|
||||||
|
(invoke! lim (partial f cfg params))))
|
||||||
|
|
||||||
|
(let [lim (get climit queue)]
|
||||||
|
(fn [cfg params]
|
||||||
|
(invoke! lim (partial f cfg params))))))
|
||||||
|
(do
|
||||||
|
(l/warn :hint "wrap: no config found"
|
||||||
|
:queue (name queue)
|
||||||
|
:service (::sv/name mdata))
|
||||||
|
f))
|
||||||
|
f))
|
|
@ -16,10 +16,10 @@
|
||||||
[app.http.session :as session]
|
[app.http.session :as session]
|
||||||
[app.loggers.audit :as audit]
|
[app.loggers.audit :as audit]
|
||||||
[app.rpc :as-alias rpc]
|
[app.rpc :as-alias rpc]
|
||||||
|
[app.rpc.climit :as climit]
|
||||||
[app.rpc.doc :as-alias doc]
|
[app.rpc.doc :as-alias doc]
|
||||||
[app.rpc.mutations.teams :as teams]
|
[app.rpc.mutations.teams :as teams]
|
||||||
[app.rpc.queries.profile :as profile]
|
[app.rpc.queries.profile :as profile]
|
||||||
[app.rpc.semaphore :as rsem]
|
|
||||||
[app.tokens :as tokens]
|
[app.tokens :as tokens]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
[app.util.time :as dt]
|
[app.util.time :as dt]
|
||||||
|
@ -147,7 +147,7 @@
|
||||||
(sv/defmethod ::login-with-password
|
(sv/defmethod ::login-with-password
|
||||||
"Performs authentication using penpot password."
|
"Performs authentication using penpot password."
|
||||||
{:auth false
|
{:auth false
|
||||||
::rsem/queue :auth
|
::climit/queue :auth
|
||||||
::doc/added "1.15"}
|
::doc/added "1.15"}
|
||||||
[cfg params]
|
[cfg params]
|
||||||
(login-with-password cfg params))
|
(login-with-password cfg params))
|
||||||
|
@ -188,7 +188,7 @@
|
||||||
|
|
||||||
(sv/defmethod ::recover-profile
|
(sv/defmethod ::recover-profile
|
||||||
{:auth false
|
{:auth false
|
||||||
::rsem/queue :auth
|
::climit/queue :auth
|
||||||
::doc/added "1.15"}
|
::doc/added "1.15"}
|
||||||
[cfg params]
|
[cfg params]
|
||||||
(recover-profile cfg params))
|
(recover-profile cfg params))
|
||||||
|
@ -438,7 +438,7 @@
|
||||||
|
|
||||||
(sv/defmethod ::register-profile
|
(sv/defmethod ::register-profile
|
||||||
{:auth false
|
{:auth false
|
||||||
::rsem/queue :auth
|
::climit/queue :auth
|
||||||
::doc/added "1.15"}
|
::doc/added "1.15"}
|
||||||
[{:keys [pool] :as cfg} params]
|
[{:keys [pool] :as cfg} params]
|
||||||
(db/with-atomic [conn pool]
|
(db/with-atomic [conn pool]
|
||||||
|
|
|
@ -22,10 +22,10 @@
|
||||||
[app.msgbus :as mbus]
|
[app.msgbus :as mbus]
|
||||||
[app.rpc :as-alias rpc]
|
[app.rpc :as-alias rpc]
|
||||||
[app.rpc.doc :as-alias doc]
|
[app.rpc.doc :as-alias doc]
|
||||||
|
[app.rpc.climit :as climit]
|
||||||
[app.rpc.permissions :as perms]
|
[app.rpc.permissions :as perms]
|
||||||
[app.rpc.queries.files :as files]
|
[app.rpc.queries.files :as files]
|
||||||
[app.rpc.queries.projects :as proj]
|
[app.rpc.queries.projects :as proj]
|
||||||
[app.rpc.semaphore :as rsem]
|
|
||||||
[app.storage.impl :as simpl]
|
[app.storage.impl :as simpl]
|
||||||
[app.util.blob :as blob]
|
[app.util.blob :as blob]
|
||||||
[app.util.objects-map :as omap]
|
[app.util.objects-map :as omap]
|
||||||
|
@ -346,8 +346,8 @@
|
||||||
FOR KEY SHARE")
|
FOR KEY SHARE")
|
||||||
|
|
||||||
(sv/defmethod ::update-file
|
(sv/defmethod ::update-file
|
||||||
{::rsem/queue :update-file
|
{::climit/queue :update-file
|
||||||
::doc/added "1.0"}
|
::climit/key-fn :id}
|
||||||
[{:keys [pool] :as cfg} {:keys [id profile-id components-v2] :as params}]
|
[{:keys [pool] :as cfg} {:keys [id profile-id components-v2] :as params}]
|
||||||
(db/with-atomic [conn pool]
|
(db/with-atomic [conn pool]
|
||||||
(db/xact-lock! conn id)
|
(db/xact-lock! conn id)
|
||||||
|
|
|
@ -12,14 +12,15 @@
|
||||||
[app.common.uuid :as uuid]
|
[app.common.uuid :as uuid]
|
||||||
[app.db :as db]
|
[app.db :as db]
|
||||||
[app.media :as media]
|
[app.media :as media]
|
||||||
|
[app.rpc.climit :as-alias climit]
|
||||||
[app.rpc.doc :as-alias doc]
|
[app.rpc.doc :as-alias doc]
|
||||||
[app.rpc.queries.teams :as teams]
|
[app.rpc.queries.teams :as teams]
|
||||||
[app.rpc.semaphore :as rsem]
|
|
||||||
[app.storage :as sto]
|
[app.storage :as sto]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
[app.util.time :as dt]
|
[app.util.time :as dt]
|
||||||
[clojure.spec.alpha :as s]
|
[clojure.spec.alpha :as s]
|
||||||
[promesa.core :as p]))
|
[promesa.core :as p]
|
||||||
|
[promesa.exec :as px]))
|
||||||
|
|
||||||
(declare create-font-variant)
|
(declare create-font-variant)
|
||||||
|
|
||||||
|
@ -46,15 +47,15 @@
|
||||||
(create-font-variant cfg params)))
|
(create-font-variant cfg params)))
|
||||||
|
|
||||||
(defn create-font-variant
|
(defn create-font-variant
|
||||||
[{:keys [storage pool executor semaphores] :as cfg} {:keys [data] :as params}]
|
[{:keys [storage pool executor climit] :as cfg} {:keys [data] :as params}]
|
||||||
(letfn [(generate-fonts [data]
|
(letfn [(generate-fonts [data]
|
||||||
(rsem/with-dispatch (:process-font semaphores)
|
(climit/with-dispatch (:process-font climit)
|
||||||
(media/run {:cmd :generate-fonts :input data})))
|
(media/run {:cmd :generate-fonts :input data})))
|
||||||
|
|
||||||
;; Function responsible of calculating cryptographyc hash of
|
;; Function responsible of calculating cryptographyc hash of
|
||||||
;; the provided data.
|
;; the provided data.
|
||||||
(calculate-hash [data]
|
(calculate-hash [data]
|
||||||
(rsem/with-dispatch (:process-font semaphores)
|
(px/with-dispatch executor
|
||||||
(sto/calculate-hash data)))
|
(sto/calculate-hash data)))
|
||||||
|
|
||||||
(validate-data [data]
|
(validate-data [data]
|
||||||
|
@ -120,6 +121,7 @@
|
||||||
and font_id = ?")
|
and font_id = ?")
|
||||||
|
|
||||||
(sv/defmethod ::update-font
|
(sv/defmethod ::update-font
|
||||||
|
{::climit/queue :process-font}
|
||||||
[{:keys [pool] :as cfg} {:keys [team-id profile-id id name] :as params}]
|
[{:keys [pool] :as cfg} {:keys [team-id profile-id id name] :as params}]
|
||||||
(db/with-atomic [conn pool]
|
(db/with-atomic [conn pool]
|
||||||
(teams/check-edition-permissions! conn profile-id team-id)
|
(teams/check-edition-permissions! conn profile-id team-id)
|
||||||
|
|
|
@ -14,8 +14,8 @@
|
||||||
[app.config :as cf]
|
[app.config :as cf]
|
||||||
[app.db :as db]
|
[app.db :as db]
|
||||||
[app.media :as media]
|
[app.media :as media]
|
||||||
|
[app.rpc.climit :as climit]
|
||||||
[app.rpc.queries.teams :as teams]
|
[app.rpc.queries.teams :as teams]
|
||||||
[app.rpc.semaphore :as rsem]
|
|
||||||
[app.storage :as sto]
|
[app.storage :as sto]
|
||||||
[app.storage.tmp :as tmp]
|
[app.storage.tmp :as tmp]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
|
@ -23,7 +23,8 @@
|
||||||
[clojure.spec.alpha :as s]
|
[clojure.spec.alpha :as s]
|
||||||
[cuerdas.core :as str]
|
[cuerdas.core :as str]
|
||||||
[datoteka.io :as io]
|
[datoteka.io :as io]
|
||||||
[promesa.core :as p]))
|
[promesa.core :as p]
|
||||||
|
[promesa.exec :as px]))
|
||||||
|
|
||||||
(def default-max-file-size (* 1024 1024 10)) ; 10 MiB
|
(def default-max-file-size (* 1024 1024 10)) ; 10 MiB
|
||||||
|
|
||||||
|
@ -104,25 +105,25 @@
|
||||||
;; inverse, soft referential integrity).
|
;; inverse, soft referential integrity).
|
||||||
|
|
||||||
(defn create-file-media-object
|
(defn create-file-media-object
|
||||||
[{:keys [storage pool semaphores] :as cfg}
|
[{:keys [storage pool climit executor] :as cfg}
|
||||||
{:keys [id file-id is-local name content] :as params}]
|
{:keys [id file-id is-local name content] :as params}]
|
||||||
(letfn [;; Function responsible to retrieve the file information, as
|
(letfn [;; Function responsible to retrieve the file information, as
|
||||||
;; it is synchronous operation it should be wrapped into
|
;; it is synchronous operation it should be wrapped into
|
||||||
;; with-dispatch macro.
|
;; with-dispatch macro.
|
||||||
(get-info [content]
|
(get-info [content]
|
||||||
(rsem/with-dispatch (:process-image semaphores)
|
(climit/with-dispatch (:process-image climit)
|
||||||
(media/run {:cmd :info :input content})))
|
(media/run {:cmd :info :input content})))
|
||||||
|
|
||||||
;; Function responsible of calculating cryptographyc hash of
|
;; Function responsible of calculating cryptographyc hash of
|
||||||
;; the provided data.
|
;; the provided data.
|
||||||
(calculate-hash [data]
|
(calculate-hash [data]
|
||||||
(rsem/with-dispatch (:process-image semaphores)
|
(px/with-dispatch executor
|
||||||
(sto/calculate-hash data)))
|
(sto/calculate-hash data)))
|
||||||
|
|
||||||
;; Function responsible of generating thumnail. As it is synchronous
|
;; Function responsible of generating thumnail. As it is synchronous
|
||||||
;; opetation, it should be wrapped into with-dispatch macro
|
;; opetation, it should be wrapped into with-dispatch macro
|
||||||
(generate-thumbnail [info]
|
(generate-thumbnail [info]
|
||||||
(rsem/with-dispatch (:process-image semaphores)
|
(climit/with-dispatch (:process-image climit)
|
||||||
(media/run (assoc thumbnail-options
|
(media/run (assoc thumbnail-options
|
||||||
:cmd :generic-thumbnail
|
:cmd :generic-thumbnail
|
||||||
:input info))))
|
:input info))))
|
||||||
|
@ -154,6 +155,7 @@
|
||||||
:bucket "file-media-object"})))
|
:bucket "file-media-object"})))
|
||||||
|
|
||||||
(insert-into-database [info image thumb]
|
(insert-into-database [info image thumb]
|
||||||
|
(px/with-dispatch executor
|
||||||
(db/exec-one! pool [sql:create-file-media-object
|
(db/exec-one! pool [sql:create-file-media-object
|
||||||
(or id (uuid/next))
|
(or id (uuid/next))
|
||||||
file-id is-local name
|
file-id is-local name
|
||||||
|
@ -161,7 +163,7 @@
|
||||||
(:id thumb)
|
(:id thumb)
|
||||||
(:width info)
|
(:width info)
|
||||||
(:height info)
|
(:height info)
|
||||||
(:mtype info)]))]
|
(:mtype info)])))]
|
||||||
|
|
||||||
(p/let [info (get-info content)
|
(p/let [info (get-info content)
|
||||||
thumb (create-thumbnail info)
|
thumb (create-thumbnail info)
|
||||||
|
|
|
@ -16,11 +16,11 @@
|
||||||
[app.loggers.audit :as audit]
|
[app.loggers.audit :as audit]
|
||||||
[app.media :as media]
|
[app.media :as media]
|
||||||
[app.rpc :as-alias rpc]
|
[app.rpc :as-alias rpc]
|
||||||
|
[app.rpc.climit :as-alias climit]
|
||||||
[app.rpc.commands.auth :as cmd.auth]
|
[app.rpc.commands.auth :as cmd.auth]
|
||||||
[app.rpc.doc :as-alias doc]
|
[app.rpc.doc :as-alias doc]
|
||||||
[app.rpc.mutations.teams :as teams]
|
[app.rpc.mutations.teams :as teams]
|
||||||
[app.rpc.queries.profile :as profile]
|
[app.rpc.queries.profile :as profile]
|
||||||
[app.rpc.semaphore :as rsem]
|
|
||||||
[app.storage :as sto]
|
[app.storage :as sto]
|
||||||
[app.tokens :as tokens]
|
[app.tokens :as tokens]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
|
@ -83,11 +83,11 @@
|
||||||
(s/keys :req-un [::profile-id ::password ::old-password]))
|
(s/keys :req-un [::profile-id ::password ::old-password]))
|
||||||
|
|
||||||
(sv/defmethod ::update-profile-password
|
(sv/defmethod ::update-profile-password
|
||||||
{::rsem/queue :auth}
|
{::climit/queue :auth}
|
||||||
[{:keys [pool] :as cfg} {:keys [password] :as params}]
|
[{:keys [pool] :as cfg} {:keys [password] :as params}]
|
||||||
(db/with-atomic [conn pool]
|
(db/with-atomic [conn pool]
|
||||||
(let [profile (validate-password! conn params)
|
(let [profile (validate-password! conn params)
|
||||||
session-id (:app.rpc/session-id params)]
|
session-id (::rpc/session-id params)]
|
||||||
(when (= (str/lower (:email profile))
|
(when (= (str/lower (:email profile))
|
||||||
(str/lower (:password params)))
|
(str/lower (:password params)))
|
||||||
(ex/raise :type :validation
|
(ex/raise :type :validation
|
||||||
|
@ -309,7 +309,7 @@
|
||||||
|
|
||||||
(sv/defmethod ::login
|
(sv/defmethod ::login
|
||||||
{:auth false
|
{:auth false
|
||||||
::rsem/queue :auth
|
::climit/queue :auth
|
||||||
::doc/added "1.0"
|
::doc/added "1.0"
|
||||||
::doc/deprecated "1.15"}
|
::doc/deprecated "1.15"}
|
||||||
[cfg params]
|
[cfg params]
|
||||||
|
@ -354,7 +354,7 @@
|
||||||
|
|
||||||
(sv/defmethod ::register-profile
|
(sv/defmethod ::register-profile
|
||||||
{:auth false
|
{:auth false
|
||||||
::rsem/queue :auth
|
::climit/queue :auth
|
||||||
::doc/added "1.0"
|
::doc/added "1.0"
|
||||||
::doc/deprecated "1.15"}
|
::doc/deprecated "1.15"}
|
||||||
[{:keys [pool] :as cfg} params]
|
[{:keys [pool] :as cfg} params]
|
||||||
|
|
|
@ -17,11 +17,11 @@
|
||||||
[app.loggers.audit :as audit]
|
[app.loggers.audit :as audit]
|
||||||
[app.media :as media]
|
[app.media :as media]
|
||||||
[app.rpc :as-alias rpc]
|
[app.rpc :as-alias rpc]
|
||||||
|
[app.rpc.climit :as climit]
|
||||||
[app.rpc.mutations.projects :as projects]
|
[app.rpc.mutations.projects :as projects]
|
||||||
[app.rpc.permissions :as perms]
|
[app.rpc.permissions :as perms]
|
||||||
[app.rpc.queries.profile :as profile]
|
[app.rpc.queries.profile :as profile]
|
||||||
[app.rpc.queries.teams :as teams]
|
[app.rpc.queries.teams :as teams]
|
||||||
[app.rpc.semaphore :as rsem]
|
|
||||||
[app.storage :as sto]
|
[app.storage :as sto]
|
||||||
[app.tokens :as tokens]
|
[app.tokens :as tokens]
|
||||||
[app.util.services :as sv]
|
[app.util.services :as sv]
|
||||||
|
@ -316,13 +316,13 @@
|
||||||
(assoc team :photo-id (:id photo))))
|
(assoc team :photo-id (:id photo))))
|
||||||
|
|
||||||
(defn upload-photo
|
(defn upload-photo
|
||||||
[{:keys [storage semaphores] :as cfg} {:keys [file]}]
|
[{:keys [storage executor climit] :as cfg} {:keys [file]}]
|
||||||
(letfn [(get-info [content]
|
(letfn [(get-info [content]
|
||||||
(rsem/with-dispatch (:process-image semaphores)
|
(climit/with-dispatch (:process-image climit)
|
||||||
(media/run {:cmd :info :input content})))
|
(media/run {:cmd :info :input content})))
|
||||||
|
|
||||||
(generate-thumbnail [info]
|
(generate-thumbnail [info]
|
||||||
(rsem/with-dispatch (:process-image semaphores)
|
(climit/with-dispatch (:process-image climit)
|
||||||
(media/run {:cmd :profile-thumbnail
|
(media/run {:cmd :profile-thumbnail
|
||||||
:format :jpeg
|
:format :jpeg
|
||||||
:quality 85
|
:quality 85
|
||||||
|
@ -333,7 +333,7 @@
|
||||||
;; Function responsible of calculating cryptographyc hash of
|
;; Function responsible of calculating cryptographyc hash of
|
||||||
;; the provided data.
|
;; the provided data.
|
||||||
(calculate-hash [data]
|
(calculate-hash [data]
|
||||||
(rsem/with-dispatch (:process-image semaphores)
|
(px/with-dispatch executor
|
||||||
(sto/calculate-hash data)))]
|
(sto/calculate-hash data)))]
|
||||||
|
|
||||||
(p/let [info (get-info file)
|
(p/let [info (get-info file)
|
||||||
|
@ -341,11 +341,10 @@
|
||||||
hash (calculate-hash (:data thumb))
|
hash (calculate-hash (:data thumb))
|
||||||
content (-> (sto/content (:data thumb) (:size thumb))
|
content (-> (sto/content (:data thumb) (:size thumb))
|
||||||
(sto/wrap-with-hash hash))]
|
(sto/wrap-with-hash hash))]
|
||||||
(rsem/with-dispatch (:process-image semaphores)
|
|
||||||
(sto/put-object! storage {::sto/content content
|
(sto/put-object! storage {::sto/content content
|
||||||
::sto/deduplicate? true
|
::sto/deduplicate? true
|
||||||
:bucket "profile"
|
:bucket "profile"
|
||||||
:content-type (:mtype thumb)})))))
|
:content-type (:mtype thumb)}))))
|
||||||
|
|
||||||
;; --- Mutation: Invite Member
|
;; --- Mutation: Invite Member
|
||||||
|
|
||||||
|
|
|
@ -1,149 +0,0 @@
|
||||||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
||||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
||||||
;;
|
|
||||||
;; Copyright (c) KALEIDOS INC
|
|
||||||
|
|
||||||
(ns app.rpc.semaphore
|
|
||||||
"Resource usage limits (in other words: semaphores)."
|
|
||||||
(:require
|
|
||||||
[app.common.data :as d]
|
|
||||||
[app.common.logging :as l]
|
|
||||||
[app.common.spec :as us]
|
|
||||||
[app.config :as cf]
|
|
||||||
[app.metrics :as mtx]
|
|
||||||
[app.rpc :as-alias rpc]
|
|
||||||
[app.util.locks :as locks]
|
|
||||||
[app.util.time :as ts]
|
|
||||||
[app.worker :as-alias wrk]
|
|
||||||
[clojure.spec.alpha :as s]
|
|
||||||
[integrant.core :as ig]
|
|
||||||
[promesa.core :as p]))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; ASYNC SEMAPHORE IMPL
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(defprotocol IAsyncSemaphore
|
|
||||||
(acquire! [_])
|
|
||||||
(release! [_ tp]))
|
|
||||||
|
|
||||||
(defn create
|
|
||||||
[& {:keys [permits metrics name executor]}]
|
|
||||||
(let [used (volatile! 0)
|
|
||||||
queue (volatile! (d/queue))
|
|
||||||
labels (into-array String [(d/name name)])
|
|
||||||
lock (locks/create)
|
|
||||||
permits (or permits Long/MAX_VALUE)]
|
|
||||||
|
|
||||||
(when (>= permits Long/MAX_VALUE)
|
|
||||||
(l/warn :hint "permits value too high" :permits permits :semaphore name))
|
|
||||||
|
|
||||||
^{::wrk/executor executor
|
|
||||||
::name name}
|
|
||||||
(reify IAsyncSemaphore
|
|
||||||
(acquire! [_]
|
|
||||||
(let [d (p/deferred)]
|
|
||||||
(locks/locking lock
|
|
||||||
(if (< @used permits)
|
|
||||||
(do
|
|
||||||
(vswap! used inc)
|
|
||||||
(p/resolve! d))
|
|
||||||
(vswap! queue conj d)))
|
|
||||||
|
|
||||||
(mtx/run! metrics
|
|
||||||
:id :semaphore-used-permits
|
|
||||||
:val @used
|
|
||||||
:labels labels)
|
|
||||||
(mtx/run! metrics
|
|
||||||
:id :semaphore-queued-submissions
|
|
||||||
:val (count @queue)
|
|
||||||
:labels labels)
|
|
||||||
d))
|
|
||||||
|
|
||||||
(release! [_ tp]
|
|
||||||
(locks/locking lock
|
|
||||||
(if-let [item (peek @queue)]
|
|
||||||
(do
|
|
||||||
(vswap! queue pop)
|
|
||||||
(p/resolve! item))
|
|
||||||
(when (pos? @used)
|
|
||||||
(vswap! used dec))))
|
|
||||||
|
|
||||||
(mtx/run! metrics
|
|
||||||
:id :semaphore-timing
|
|
||||||
:val (inst-ms (tp))
|
|
||||||
:labels labels)
|
|
||||||
(mtx/run! metrics
|
|
||||||
:id :semaphore-used-permits
|
|
||||||
:val @used
|
|
||||||
:labels labels)
|
|
||||||
(mtx/run! metrics
|
|
||||||
:id :semaphore-queued-submissions
|
|
||||||
:val (count @queue)
|
|
||||||
:labels labels)))))
|
|
||||||
|
|
||||||
(defn semaphore?
|
|
||||||
[v]
|
|
||||||
(satisfies? IAsyncSemaphore v))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; PREDEFINED SEMAPHORES
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(s/def ::semaphore semaphore?)
|
|
||||||
(s/def ::semaphores
|
|
||||||
(s/map-of ::us/keyword ::semaphore))
|
|
||||||
|
|
||||||
(defmethod ig/pre-init-spec ::rpc/semaphores [_]
|
|
||||||
(s/keys :req-un [::mtx/metrics]))
|
|
||||||
|
|
||||||
(defn- create-default-semaphores
|
|
||||||
[metrics executor]
|
|
||||||
[(create :permits (cf/get :semaphore-process-font)
|
|
||||||
:metrics metrics
|
|
||||||
:name :process-font
|
|
||||||
:executor executor)
|
|
||||||
(create :permits (cf/get :semaphore-update-file)
|
|
||||||
:metrics metrics
|
|
||||||
:name :update-file
|
|
||||||
:executor executor)
|
|
||||||
(create :permits (cf/get :semaphore-process-image)
|
|
||||||
:metrics metrics
|
|
||||||
:name :process-image
|
|
||||||
:executor executor)
|
|
||||||
(create :permits (cf/get :semaphore-auth)
|
|
||||||
:metrics metrics
|
|
||||||
:name :auth
|
|
||||||
:executor executor)])
|
|
||||||
|
|
||||||
(defmethod ig/init-key ::rpc/semaphores
|
|
||||||
[_ {:keys [metrics executor]}]
|
|
||||||
(->> (create-default-semaphores metrics executor)
|
|
||||||
(d/index-by (comp ::name meta))))
|
|
||||||
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
;; PUBLIC API
|
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
||||||
|
|
||||||
(defmacro with-dispatch
|
|
||||||
[queue & body]
|
|
||||||
`(let [tpoint# (ts/tpoint)
|
|
||||||
queue# ~queue
|
|
||||||
executor# (-> queue# meta ::wrk/executor)]
|
|
||||||
(-> (acquire! queue#)
|
|
||||||
(p/then (fn [_#] ~@body) executor#)
|
|
||||||
(p/finally (fn [_# _#]
|
|
||||||
(release! queue# tpoint#))))))
|
|
||||||
|
|
||||||
(defn wrap
|
|
||||||
[{:keys [semaphores]} f {:keys [::queue]}]
|
|
||||||
(let [queue' (get semaphores queue)]
|
|
||||||
(if (semaphore? queue')
|
|
||||||
(fn [cfg params]
|
|
||||||
(with-dispatch queue'
|
|
||||||
(f cfg params)))
|
|
||||||
(do
|
|
||||||
(when (some? queue)
|
|
||||||
(l/warn :hint "undefined semaphore" :name queue))
|
|
||||||
f))))
|
|
|
@ -21,7 +21,7 @@
|
||||||
com.cognitect/transit-cljs {:mvn/version "0.8.280"}
|
com.cognitect/transit-cljs {:mvn/version "0.8.280"}
|
||||||
java-http-clj/java-http-clj {:mvn/version "0.4.3"}
|
java-http-clj/java-http-clj {:mvn/version "0.4.3"}
|
||||||
|
|
||||||
funcool/promesa {:mvn/version "8.0.450"}
|
funcool/promesa {:mvn/version "9.0.507"}
|
||||||
funcool/cuerdas {:mvn/version "2022.06.16-403"}
|
funcool/cuerdas {:mvn/version "2022.06.16-403"}
|
||||||
|
|
||||||
lambdaisland/uri {:mvn/version "1.13.95"
|
lambdaisland/uri {:mvn/version "1.13.95"
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue