Merge pull request #2211 from penpot/niwinz-rate-limit

Rate Limit for RPC methods
This commit is contained in:
Andrey Antukh 2022-08-31 13:19:34 +02:00 committed by GitHub
commit 89e5607d7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 1349 additions and 645 deletions

View file

@ -29,8 +29,6 @@
org.postgresql/postgresql {:mvn/version "42.4.0"} org.postgresql/postgresql {:mvn/version "42.4.0"}
com.zaxxer/HikariCP {:mvn/version "5.0.1"} com.zaxxer/HikariCP {:mvn/version "5.0.1"}
funcool/datoteka {:mvn/version "3.0.64"}
buddy/buddy-hashers {:mvn/version "1.8.158"} buddy/buddy-hashers {:mvn/version "1.8.158"}
buddy/buddy-sign {:mvn/version "3.4.333"} buddy/buddy-sign {:mvn/version "3.4.333"}

View file

@ -30,6 +30,8 @@
<Logger name="app.msgbus" level="info" /> <Logger name="app.msgbus" level="info" />
<Logger name="app.http.websocket" level="info" /> <Logger name="app.http.websocket" level="info" />
<Logger name="app.util.websocket" level="info" /> <Logger name="app.util.websocket" level="info" />
<Logger name="app.redis" level="info" />
<Logger name="app.rpc.rlimit" level="info" />
<Logger name="app.cli" level="debug" additivity="false"> <Logger name="app.cli" level="debug" additivity="false">
<AppenderRef ref="console"/> <AppenderRef ref="console"/>

View file

@ -2,7 +2,7 @@
export PENPOT_HOST=devenv export PENPOT_HOST=devenv
export PENPOT_TENANT=dev export PENPOT_TENANT=dev
export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enable-transit-readable-response enable-demo-users disable-secure-session-cookies" export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enable-transit-readable-response enable-demo-users disable-secure-session-cookies enable-rpc-rate-limit enable-warn-rpc-rate-limits"
# export PENPOT_DATABASE_URI="postgresql://172.17.0.1:5432/penpot" # export PENPOT_DATABASE_URI="postgresql://172.17.0.1:5432/penpot"
# export PENPOT_DATABASE_USERNAME="penpot" # export PENPOT_DATABASE_USERNAME="penpot"
@ -16,6 +16,8 @@ export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enabl
# export PENPOT_LOGGERS_LOKI_URI="http://172.17.0.1:3100/loki/api/v1/push" # export PENPOT_LOGGERS_LOKI_URI="http://172.17.0.1:3100/loki/api/v1/push"
# export PENPOT_AUDIT_LOG_ARCHIVE_URI="http://localhost:6070/api/audit" # export PENPOT_AUDIT_LOG_ARCHIVE_URI="http://localhost:6070/api/audit"
export PENPOT_DEFAULT_RATE_LIMIT="default,window,10000/h"
# Initialize MINIO config # Initialize MINIO config
mc alias set penpot-s3/ http://minio:9000 minioadmin minioadmin mc alias set penpot-s3/ http://minio:9000 minioadmin minioadmin
mc admin user add penpot-s3 penpot-devenv penpot-devenv mc admin user add penpot-s3 penpot-devenv penpot-devenv

View file

@ -15,9 +15,11 @@
[app.common.uri :as u] [app.common.uri :as u]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.http.client :as http]
[app.http.middleware :as hmw] [app.http.middleware :as hmw]
[app.loggers.audit :as audit] [app.loggers.audit :as audit]
[app.rpc.queries.profile :as profile] [app.rpc.queries.profile :as profile]
[app.tokens :as tokens]
[app.util.json :as json] [app.util.json :as json]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
@ -47,7 +49,7 @@
(defn- discover-oidc-config (defn- discover-oidc-config
[{:keys [http-client]} {:keys [base-uri] :as opts}] [{:keys [http-client]} {:keys [base-uri] :as opts}]
(let [discovery-uri (u/join base-uri ".well-known/openid-configuration") (let [discovery-uri (u/join base-uri ".well-known/openid-configuration")
response (ex/try (http-client {:method :get :uri (str discovery-uri)} {:sync? true}))] response (ex/try (http/req! http-client {:method :get :uri (str discovery-uri)} {:sync? true}))]
(cond (cond
(ex/exception? response) (ex/exception? response)
(do (do
@ -158,7 +160,7 @@
(defn- retrieve-github-email (defn- retrieve-github-email
[{:keys [http-client]} tdata info] [{:keys [http-client]} tdata info]
(or (some-> info :email p/resolved) (or (some-> info :email p/resolved)
(-> (http-client {:uri "https://api.github.com/user/emails" (-> (http/req! http-client {:uri "https://api.github.com/user/emails"
:headers {"Authorization" (dm/str (:type tdata) " " (:token tdata))} :headers {"Authorization" (dm/str (:type tdata) " " (:token tdata))}
:timeout 6000 :timeout 6000
:method :get}) :method :get})
@ -278,7 +280,7 @@
:uri (:token-uri provider) :uri (:token-uri provider)
:body (u/map->query-string params)}] :body (u/map->query-string params)}]
(p/then (p/then
(http-client req) (http/req! http-client req)
(fn [{:keys [status body] :as res}] (fn [{:keys [status body] :as res}]
(if (= status 200) (if (= status 200)
(let [data (json/read body)] (let [data (json/read body)]
@ -292,11 +294,10 @@
(defn- retrieve-user-info (defn- retrieve-user-info
[{:keys [provider http-client] :as cfg} tdata] [{:keys [provider http-client] :as cfg} tdata]
(letfn [(retrieve [] (letfn [(retrieve []
(http-client {:uri (:user-uri provider) (http/req! http-client {:uri (:user-uri provider)
:headers {"Authorization" (str (:type tdata) " " (:token tdata))} :headers {"Authorization" (str (:type tdata) " " (:token tdata))}
:timeout 6000 :timeout 6000
:method :get})) :method :get}))
(validate-response [response] (validate-response [response]
(when-not (s/int-in-range? 200 300 (:status response)) (when-not (s/int-in-range? 200 300 (:status response))
(ex/raise :type :internal (ex/raise :type :internal
@ -353,7 +354,7 @@
::props])) ::props]))
(defn retrieve-info (defn retrieve-info
[{:keys [tokens provider] :as cfg} {:keys [params] :as request}] [{:keys [sprops provider] :as cfg} {:keys [params] :as request}]
(letfn [(validate-oidc [info] (letfn [(validate-oidc [info]
;; If the provider is OIDC, we can proceed to check ;; If the provider is OIDC, we can proceed to check
;; roles if they are defined. ;; roles if they are defined.
@ -392,7 +393,7 @@
(let [state (get params :state) (let [state (get params :state)
code (get params :code) code (get params :code)
state (tokens :verify {:token state :iss :oauth})] state (tokens/verify sprops {:token state :iss :oauth})]
(-> (p/resolved code) (-> (p/resolved code)
(p/then #(retrieve-access-token cfg %)) (p/then #(retrieve-access-token cfg %))
(p/then #(retrieve-user-info cfg %)) (p/then #(retrieve-user-info cfg %))
@ -420,11 +421,11 @@
(redirect-response uri))) (redirect-response uri)))
(defn- generate-redirect (defn- generate-redirect
[{:keys [tokens session audit] :as cfg} request info profile] [{:keys [sprops session audit] :as cfg} request info profile]
(if profile (if profile
(let [sxf ((:create session) (:id profile)) (let [sxf ((:create session) (:id profile))
token (or (:invitation-token info) token (or (:invitation-token info)
(tokens :generate {:iss :auth (tokens/generate sprops {:iss :auth
:exp (dt/in-future "15m") :exp (dt/in-future "15m")
:profile-id (:id profile)})) :profile-id (:id profile)}))
params {:token token} params {:token token}
@ -448,7 +449,7 @@
:iss :prepared-register :iss :prepared-register
:is-active true :is-active true
:exp (dt/in-future {:hours 48})) :exp (dt/in-future {:hours 48}))
token (tokens :generate info) token (tokens/generate sprops info)
params (d/without-nils params (d/without-nils
{:token token {:token token
:fullname (:fullname info)}) :fullname (:fullname info)})
@ -458,9 +459,9 @@
(redirect-response uri)))) (redirect-response uri))))
(defn- auth-handler (defn- auth-handler
[{:keys [tokens] :as cfg} {:keys [params] :as request}] [{:keys [sprops] :as cfg} {:keys [params] :as request}]
(let [props (audit/extract-utm-params params) (let [props (audit/extract-utm-params params)
state (tokens :generate state (tokens/generate sprops
{:iss :oauth {:iss :oauth
:invitation-token (:invitation-token params) :invitation-token (:invitation-token params)
:props props :props props
@ -496,16 +497,16 @@
:hint "provider not configured"))))))}) :hint "provider not configured"))))))})
(s/def ::public-uri ::us/not-empty-string) (s/def ::public-uri ::us/not-empty-string)
(s/def ::http-client fn?) (s/def ::http-client ::http/client)
(s/def ::session map?) (s/def ::session map?)
(s/def ::tokens fn?) (s/def ::sprops map?)
(s/def ::providers map?) (s/def ::providers map?)
(defmethod ig/pre-init-spec ::routes (defmethod ig/pre-init-spec ::routes
[_] [_]
(s/keys :req-un [::public-uri (s/keys :req-un [::public-uri
::session ::session
::tokens ::sprops
::http-client ::http-client
::providers ::providers
::db/pool ::db/pool

View file

@ -20,6 +20,7 @@
[clojure.pprint :as pprint] [clojure.pprint :as pprint]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.fs :as fs]
[environ.core :refer [env]] [environ.core :refer [env]]
[integrant.core :as ig])) [integrant.core :as ig]))
@ -83,16 +84,18 @@
;; a server prop key where initial project is stored. ;; a server prop key where initial project is stored.
:initial-project-skey "initial-project"}) :initial-project-skey "initial-project"})
(s/def ::default-rpc-rlimit ::us/vector-of-strings)
(s/def ::rpc-rlimit-config ::fs/path)
(s/def ::media-max-file-size ::us/integer) (s/def ::media-max-file-size ::us/integer)
(s/def ::flags ::us/vec-of-valid-keywords) (s/def ::flags ::us/vector-of-keywords)
(s/def ::telemetry-enabled ::us/boolean) (s/def ::telemetry-enabled ::us/boolean)
(s/def ::audit-log-archive-uri ::us/string) (s/def ::audit-log-archive-uri ::us/string)
(s/def ::audit-log-gc-max-age ::dt/duration) (s/def ::audit-log-gc-max-age ::dt/duration)
(s/def ::admins ::us/set-of-non-empty-strings) (s/def ::admins ::us/set-of-strings)
(s/def ::file-change-snapshot-every ::us/integer) (s/def ::file-change-snapshot-every ::us/integer)
(s/def ::file-change-snapshot-timeout ::dt/duration) (s/def ::file-change-snapshot-timeout ::dt/duration)
@ -131,8 +134,8 @@
(s/def ::oidc-token-uri ::us/string) (s/def ::oidc-token-uri ::us/string)
(s/def ::oidc-auth-uri ::us/string) (s/def ::oidc-auth-uri ::us/string)
(s/def ::oidc-user-uri ::us/string) (s/def ::oidc-user-uri ::us/string)
(s/def ::oidc-scopes ::us/set-of-non-empty-strings) (s/def ::oidc-scopes ::us/set-of-strings)
(s/def ::oidc-roles ::us/set-of-non-empty-strings) (s/def ::oidc-roles ::us/set-of-strings)
(s/def ::oidc-roles-attr ::us/keyword) (s/def ::oidc-roles-attr ::us/keyword)
(s/def ::oidc-email-attr ::us/keyword) (s/def ::oidc-email-attr ::us/keyword)
(s/def ::oidc-name-attr ::us/keyword) (s/def ::oidc-name-attr ::us/keyword)
@ -165,11 +168,14 @@
(s/def ::profile-complaint-threshold ::us/integer) (s/def ::profile-complaint-threshold ::us/integer)
(s/def ::public-uri ::us/string) (s/def ::public-uri ::us/string)
(s/def ::redis-uri ::us/string) (s/def ::redis-uri ::us/string)
(s/def ::registration-domain-whitelist ::us/set-of-non-empty-strings) (s/def ::registration-domain-whitelist ::us/set-of-strings)
(s/def ::rlimit-font ::us/integer)
(s/def ::rlimit-file-update ::us/integer)
(s/def ::rlimit-image ::us/integer)
(s/def ::rlimit-password ::us/integer) (s/def ::rpc-semaphore-permits-font ::us/integer)
(s/def ::rpc-semaphore-permits-file-update ::us/integer)
(s/def ::rpc-semaphore-permits-image ::us/integer)
(s/def ::rpc-semaphore-permits-password ::us/integer)
(s/def ::smtp-default-from ::us/string) (s/def ::smtp-default-from ::us/string)
(s/def ::smtp-default-reply-to ::us/string) (s/def ::smtp-default-reply-to ::us/string)
(s/def ::smtp-host ::us/string) (s/def ::smtp-host ::us/string)
@ -217,6 +223,7 @@
::database-min-pool-size ::database-min-pool-size
::database-max-pool-size ::database-max-pool-size
::default-blob-version ::default-blob-version
::default-rpc-rlimit
::error-report-webhook ::error-report-webhook
::default-executor-parallelism ::default-executor-parallelism
::blocking-executor-parallelism ::blocking-executor-parallelism
@ -272,10 +279,11 @@
::public-uri ::public-uri
::redis-uri ::redis-uri
::registration-domain-whitelist ::registration-domain-whitelist
::rlimit-font ::rpc-semaphore-permits-font
::rlimit-file-update ::rpc-semaphore-permits-file-update
::rlimit-image ::rpc-semaphore-permits-image
::rlimit-password ::rpc-semaphore-permits-password
::rpc-rlimit-config
::sentry-dsn ::sentry-dsn
::sentry-debug ::sentry-debug
::sentry-attach-stack-trace ::sentry-attach-stack-trace

View file

@ -150,7 +150,7 @@
;; When metrics namespace is provided ;; When metrics namespace is provided
(when metrics (when metrics
(->> (:registry metrics) (->> (::mtx/registry metrics)
(PrometheusMetricsTrackerFactory.) (PrometheusMetricsTrackerFactory.)
(.setMetricsTrackerFactory config))) (.setMetricsTrackerFactory config)))

View file

@ -114,18 +114,18 @@
;; HTTP ROUTER ;; HTTP ROUTER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::oauth map?)
(s/def ::storage map?)
(s/def ::assets map?) (s/def ::assets map?)
(s/def ::feedback fn?)
(s/def ::ws fn?)
(s/def ::audit-handler fn?) (s/def ::audit-handler fn?)
(s/def ::awsns-handler fn?) (s/def ::awsns-handler fn?)
(s/def ::session map?)
(s/def ::rpc-routes (s/nilable vector?))
(s/def ::debug-routes (s/nilable vector?)) (s/def ::debug-routes (s/nilable vector?))
(s/def ::oidc-routes (s/nilable vector?))
(s/def ::doc-routes (s/nilable vector?)) (s/def ::doc-routes (s/nilable vector?))
(s/def ::feedback fn?)
(s/def ::oauth map?)
(s/def ::oidc-routes (s/nilable vector?))
(s/def ::rpc-routes (s/nilable vector?))
(s/def ::session map?)
(s/def ::storage map?)
(s/def ::ws fn?)
(defmethod ig/pre-init-spec ::router [_] (defmethod ig/pre-init-spec ::router [_]
(s/keys :req-un [::mtx/metrics (s/keys :req-un [::mtx/metrics
@ -151,7 +151,7 @@
[middleware/errors errors/handle] [middleware/errors errors/handle]
[middleware/restrict-methods]]} [middleware/restrict-methods]]}
["/metrics" {:handler (:handler metrics)}] ["/metrics" {:handler (::mtx/handler metrics)}]
["/assets" {:middleware [(:middleware session)]} ["/assets" {:middleware [(:middleware session)]}
["/by-id/:id" {:handler (:objects-handler assets)}] ["/by-id/:id" {:handler (:objects-handler assets)}]
["/by-file-media-id/:id" {:handler (:file-objects-handler assets)}] ["/by-file-media-id/:id" {:handler (:file-objects-handler assets)}]

View file

@ -11,6 +11,8 @@
[app.common.logging :as l] [app.common.logging :as l]
[app.db :as db] [app.db :as db]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.http.client :as http]
[app.tokens :as tokens]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
@ -24,10 +26,11 @@
(declare parse-notification) (declare parse-notification)
(declare process-report) (declare process-report)
(s/def ::http-client fn?) (s/def ::http-client ::http/client)
(s/def ::sprops map?)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::http-client])) (s/keys :req-un [::db/pool ::http-client ::sprops]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ {:keys [executor] :as cfg}] [_ {:keys [executor] :as cfg}]
@ -46,7 +49,7 @@
(let [surl (get body "SubscribeURL") (let [surl (get body "SubscribeURL")
stopic (get body "TopicArn")] stopic (get body "TopicArn")]
(l/info :action "subscription received" :topic stopic :url surl) (l/info :action "subscription received" :topic stopic :url surl)
(http-client {:uri surl :method :post :timeout 10000} {:sync? true})) (http/req! http-client {:uri surl :method :post :timeout 10000} {:sync? true}))
(= mtype "Notification") (= mtype "Notification")
(when-let [message (parse-json (get body "Message"))] (when-let [message (parse-json (get body "Message"))]
@ -97,10 +100,10 @@
(get mail "headers"))) (get mail "headers")))
(defn- extract-identity (defn- extract-identity
[{:keys [tokens] :as cfg} headers] [{:keys [sprops]} headers]
(let [tdata (get headers "x-penpot-data")] (let [tdata (get headers "x-penpot-data")]
(when-not (str/empty? tdata) (when-not (str/empty? tdata)
(let [result (tokens :verify {:token tdata :iss :profile-identity})] (let [result (tokens/verify sprops {:token tdata :iss :profile-identity})]
(:profile-id result))))) (:profile-id result)))))
(defn- parse-notification (defn- parse-notification

View file

@ -31,7 +31,6 @@
(http/send-async req {:client client :as response-type})))) (http/send-async req {:client client :as response-type}))))
{::client client}))) {::client client})))
(defn req! (defn req!
"A convencience toplevel function for gradual migration to a new API "A convencience toplevel function for gradual migration to a new API
convention." convention."

View file

@ -10,6 +10,7 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us] [app.common.spec :as us]
[app.http :as-alias http]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[yetti.request :as yrq] [yetti.request :as yrq]
@ -50,6 +51,11 @@
[err _] [err _]
(yrs/response 400 (ex-data err))) (yrs/response 400 (ex-data err)))
(defmethod handle-exception :rate-limit
[err _]
(let [headers (-> err ex-data ::http/headers)]
(yrs/response :status 429 :body "" :headers headers)))
(defmethod handle-exception :validation (defmethod handle-exception :validation
[err _] [err _]
(let [{:keys [code] :as data} (ex-data err)] (let [{:keys [code] :as data} (ex-data err)]

View file

@ -11,6 +11,7 @@
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.tokens :as tokens]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
@ -39,7 +40,7 @@
(delete-session [store key])) (delete-session [store key]))
(defn- make-database-store (defn- make-database-store
[{:keys [pool tokens executor]}] [{:keys [pool sprops executor]}]
(reify ISessionStore (reify ISessionStore
(read-session [_ token] (read-session [_ token]
(px/with-dispatch executor (px/with-dispatch executor
@ -50,7 +51,7 @@
(let [profile-id (:profile-id data) (let [profile-id (:profile-id data)
user-agent (:user-agent data) user-agent (:user-agent data)
created-at (or (:created-at data) (dt/now)) created-at (or (:created-at data) (dt/now))
token (tokens :generate {:iss "authentication" token (tokens/generate sprops {:iss "authentication"
:iat created-at :iat created-at
:uid profile-id}) :uid profile-id})
params {:user-agent user-agent params {:user-agent user-agent
@ -68,14 +69,13 @@
{:id (:id data)}) {:id (:id data)})
(assoc data :updated-at updated-at)))) (assoc data :updated-at updated-at))))
(delete-session [_ token] (delete-session [_ token]
(px/with-dispatch executor (px/with-dispatch executor
(db/delete! pool :http-session {:id token}) (db/delete! pool :http-session {:id token})
nil)))) nil))))
(defn make-inmemory-store (defn make-inmemory-store
[{:keys [tokens]}] [{:keys [sprops]}]
(let [cache (atom {})] (let [cache (atom {})]
(reify ISessionStore (reify ISessionStore
(read-session [_ token] (read-session [_ token]
@ -86,7 +86,7 @@
(let [profile-id (:profile-id data) (let [profile-id (:profile-id data)
user-agent (:user-agent data) user-agent (:user-agent data)
created-at (or (:created-at data) (dt/now)) created-at (or (:created-at data) (dt/now))
token (tokens :generate {:iss "authentication" token (tokens/generate sprops {:iss "authentication"
:iat created-at :iat created-at
:uid profile-id}) :uid profile-id})
params {:user-agent user-agent params {:user-agent user-agent
@ -108,9 +108,9 @@
(swap! cache dissoc token) (swap! cache dissoc token)
nil))))) nil)))))
(s/def ::tokens fn?) (s/def ::sprops map?)
(defmethod ig/pre-init-spec ::store [_] (defmethod ig/pre-init-spec ::store [_]
(s/keys :req-un [::db/pool ::wrk/executor ::tokens])) (s/keys :req-un [::db/pool ::wrk/executor ::sprops]))
(defmethod ig/init-key ::store (defmethod ig/init-key ::store
[_ {:keys [pool] :as cfg}] [_ {:keys [pool] :as cfg}]

View file

@ -15,6 +15,7 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.tokens :as tokens]
[app.util.async :as aa] [app.util.async :as aa]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
@ -237,10 +238,10 @@
(s/def ::http-client fn?) (s/def ::http-client fn?)
(s/def ::uri ::us/string) (s/def ::uri ::us/string)
(s/def ::tokens fn?) (s/def ::sprops map?)
(defmethod ig/pre-init-spec ::archive-task [_] (defmethod ig/pre-init-spec ::archive-task [_]
(s/keys :req-un [::db/pool ::tokens ::http-client] (s/keys :req-un [::db/pool ::sprops ::http-client]
:opt-un [::uri])) :opt-un [::uri]))
(defmethod ig/init-key ::archive-task (defmethod ig/init-key ::archive-task
@ -276,7 +277,7 @@
for update skip locked;") for update skip locked;")
(defn archive-events (defn archive-events
[{:keys [pool uri tokens http-client] :as cfg}] [{:keys [pool uri sprops http-client] :as cfg}]
(letfn [(decode-row [{:keys [props ip-addr context] :as row}] (letfn [(decode-row [{:keys [props ip-addr context] :as row}]
(cond-> row (cond-> row
(db/pgobject? props) (db/pgobject? props)
@ -300,7 +301,7 @@
:context])) :context]))
(send [events] (send [events]
(let [token (tokens :generate {:iss "authentication" (let [token (tokens/generate sprops {:iss "authentication"
:iat (dt/now) :iat (dt/now)
:uid uuid/zero}) :uid uuid/zero})
body (t/encode {:events events}) body (t/encode {:events events})

View file

@ -64,13 +64,14 @@
:app.migrations/all :app.migrations/all
{:main (ig/ref :app.migrations/migrations)} {:main (ig/ref :app.migrations/migrations)}
:app.redis/redis
{:uri (cf/get :redis-uri)
:metrics (ig/ref :app.metrics/metrics)}
:app.msgbus/msgbus :app.msgbus/msgbus
{:backend (cf/get :msgbus-backend :redis) {:backend (cf/get :msgbus-backend :redis)
:executor (ig/ref [::default :app.worker/executor]) :executor (ig/ref [::default :app.worker/executor])
:redis-uri (cf/get :redis-uri)} :redis (ig/ref :app.redis/redis)}
:app.tokens/tokens
{:keys (ig/ref :app.setup/keys)}
:app.storage.tmp/cleaner :app.storage.tmp/cleaner
{:executor (ig/ref [::worker :app.worker/executor]) {:executor (ig/ref [::worker :app.worker/executor])
@ -92,7 +93,7 @@
:app.http.session/store :app.http.session/store
{:pool (ig/ref :app.db/pool) {:pool (ig/ref :app.db/pool)
:tokens (ig/ref :app.tokens/tokens) :sprops (ig/ref :app.setup/props)
:executor (ig/ref [::default :app.worker/executor])} :executor (ig/ref [::default :app.worker/executor])}
:app.http.session/gc-task :app.http.session/gc-task
@ -100,7 +101,7 @@
:max-age (cf/get :auth-token-cookie-max-age)} :max-age (cf/get :auth-token-cookie-max-age)}
:app.http.awsns/handler :app.http.awsns/handler
{:tokens (ig/ref :app.tokens/tokens) {:sprops (ig/ref :app.setup/props)
:pool (ig/ref :app.db/pool) :pool (ig/ref :app.db/pool)
:http-client (ig/ref :app.http/client) :http-client (ig/ref :app.http/client)
:executor (ig/ref [::worker :app.worker/executor])} :executor (ig/ref [::worker :app.worker/executor])}
@ -168,13 +169,14 @@
:github (ig/ref :app.auth.oidc/github-provider) :github (ig/ref :app.auth.oidc/github-provider)
:gitlab (ig/ref :app.auth.oidc/gitlab-provider) :gitlab (ig/ref :app.auth.oidc/gitlab-provider)
:oidc (ig/ref :app.auth.oidc/generic-provider)} :oidc (ig/ref :app.auth.oidc/generic-provider)}
:tokens (ig/ref :app.tokens/tokens) :sprops (ig/ref :app.setup/props)
:http-client (ig/ref :app.http/client) :http-client (ig/ref :app.http/client)
:pool (ig/ref :app.db/pool) :pool (ig/ref :app.db/pool)
:session (ig/ref :app.http/session) :session (ig/ref :app.http/session)
:public-uri (cf/get :public-uri) :public-uri (cf/get :public-uri)
:executor (ig/ref [::default :app.worker/executor])} :executor (ig/ref [::default :app.worker/executor])}
;; TODO: revisit the dependencies of this service, looks they are too much unused of them
:app.http/router :app.http/router
{:assets (ig/ref :app.http.assets/handlers) {:assets (ig/ref :app.http.assets/handlers)
:feedback (ig/ref :app.http.feedback/handler) :feedback (ig/ref :app.http.feedback/handler)
@ -186,7 +188,6 @@
:metrics (ig/ref :app.metrics/metrics) :metrics (ig/ref :app.metrics/metrics)
:public-uri (cf/get :public-uri) :public-uri (cf/get :public-uri)
:storage (ig/ref :app.storage/storage) :storage (ig/ref :app.storage/storage)
:tokens (ig/ref :app.tokens/tokens)
:audit-handler (ig/ref :app.loggers.audit/http-handler) :audit-handler (ig/ref :app.loggers.audit/http-handler)
:rpc-routes (ig/ref :app.rpc/routes) :rpc-routes (ig/ref :app.rpc/routes)
:doc-routes (ig/ref :app.rpc.doc/routes) :doc-routes (ig/ref :app.rpc.doc/routes)
@ -218,11 +219,12 @@
:app.rpc/methods :app.rpc/methods
{:pool (ig/ref :app.db/pool) {:pool (ig/ref :app.db/pool)
:session (ig/ref :app.http/session) :session (ig/ref :app.http/session)
:tokens (ig/ref :app.tokens/tokens) :sprops (ig/ref :app.setup/props)
:metrics (ig/ref :app.metrics/metrics) :metrics (ig/ref :app.metrics/metrics)
:storage (ig/ref :app.storage/storage) :storage (ig/ref :app.storage/storage)
:msgbus (ig/ref :app.msgbus/msgbus) :msgbus (ig/ref :app.msgbus/msgbus)
:public-uri (cf/get :public-uri) :public-uri (cf/get :public-uri)
:redis (ig/ref :app.redis/redis)
:audit (ig/ref :app.loggers.audit/collector) :audit (ig/ref :app.loggers.audit/collector)
:ldap (ig/ref :app.auth.ldap/provider) :ldap (ig/ref :app.auth.ldap/provider)
:http-client (ig/ref :app.http/client) :http-client (ig/ref :app.http/client)
@ -293,9 +295,6 @@
{:pool (ig/ref :app.db/pool) {:pool (ig/ref :app.db/pool)
:key (cf/get :secret-key)} :key (cf/get :secret-key)}
:app.setup/keys
{:props (ig/ref :app.setup/props)}
:app.loggers.zmq/receiver :app.loggers.zmq/receiver
{:endpoint (cf/get :loggers-zmq-uri)} {:endpoint (cf/get :loggers-zmq-uri)}
@ -309,7 +308,7 @@
:app.loggers.audit/archive-task :app.loggers.audit/archive-task
{:uri (cf/get :audit-log-archive-uri) {:uri (cf/get :audit-log-archive-uri)
:tokens (ig/ref :app.tokens/tokens) :sprops (ig/ref :app.setup/props)
:pool (ig/ref :app.db/pool) :pool (ig/ref :app.db/pool)
:http-client (ig/ref :app.http/client)} :http-client (ig/ref :app.http/client)}

View file

@ -20,7 +20,7 @@
[clojure.java.shell :as sh] [clojure.java.shell :as sh]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.core :as fs]) [datoteka.fs :as fs])
(:import (:import
org.im4java.core.ConvertCmd org.im4java.core.ConvertCmd
org.im4java.core.IMOperation org.im4java.core.IMOperation

View file

@ -8,6 +8,8 @@
(:refer-clojure :exclude [run!]) (:refer-clojure :exclude [run!])
(:require (:require
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us]
[app.metrics.definition :as-alias mdef]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig]) [integrant.core :as ig])
(:import (:import
@ -16,11 +18,12 @@
io.prometheus.client.Counter$Child io.prometheus.client.Counter$Child
io.prometheus.client.Gauge io.prometheus.client.Gauge
io.prometheus.client.Gauge$Child io.prometheus.client.Gauge$Child
io.prometheus.client.Summary
io.prometheus.client.Summary$Child
io.prometheus.client.Summary$Builder
io.prometheus.client.Histogram io.prometheus.client.Histogram
io.prometheus.client.Histogram$Child io.prometheus.client.Histogram$Child
io.prometheus.client.SimpleCollector
io.prometheus.client.Summary
io.prometheus.client.Summary$Builder
io.prometheus.client.Summary$Child
io.prometheus.client.exporter.common.TextFormat io.prometheus.client.exporter.common.TextFormat
io.prometheus.client.hotspot.DefaultExports io.prometheus.client.hotspot.DefaultExports
java.io.StringWriter)) java.io.StringWriter))
@ -28,7 +31,7 @@
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(declare create-registry) (declare create-registry)
(declare create) (declare create-collector)
(declare handler) (declare handler)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -37,120 +40,151 @@
(def default-metrics (def default-metrics
{:update-file-changes {:update-file-changes
{:name "rpc_update_file_changes_total" {::mdef/name "penpot_rpc_update_file_changes_total"
:help "A total number of changes submitted to update-file." ::mdef/help "A total number of changes submitted to update-file."
:type :counter} ::mdef/type :counter}
:update-file-bytes-processed :update-file-bytes-processed
{:name "rpc_update_file_bytes_processed_total" {::mdef/name "penpot_rpc_update_file_bytes_processed_total"
:help "A total number of bytes processed by update-file." ::mdef/help "A total number of bytes processed by update-file."
:type :counter} ::mdef/type :counter}
:rpc-mutation-timing :rpc-mutation-timing
{:name "rpc_mutation_timing" {::mdef/name "penpot_rpc_mutation_timing"
:help "RPC mutation method call timming." ::mdef/help "RPC mutation method call timming."
:labels ["name"] ::mdef/labels ["name"]
:type :histogram} ::mdef/type :histogram}
:rpc-command-timing :rpc-command-timing
{:name "rpc_command_timing" {::mdef/name "penpot_rpc_command_timing"
:help "RPC command method call timming." ::mdef/help "RPC command method call timming."
:labels ["name"] ::mdef/labels ["name"]
:type :histogram} ::mdef/type :histogram}
:rpc-query-timing :rpc-query-timing
{:name "rpc_query_timing" {::mdef/name "penpot_rpc_query_timing"
:help "RPC query method call timing." ::mdef/help "RPC query method call timing."
:labels ["name"] ::mdef/labels ["name"]
:type :histogram} ::mdef/type :histogram}
:websocket-active-connections :websocket-active-connections
{:name "websocket_active_connections" {::mdef/name "penpot_websocket_active_connections"
:help "Active websocket connections gauge" ::mdef/help "Active websocket connections gauge"
:type :gauge} ::mdef/type :gauge}
:websocket-messages-total :websocket-messages-total
{:name "websocket_message_total" {::mdef/name "penpot_websocket_message_total"
:help "Counter of processed messages." ::mdef/help "Counter of processed messages."
:labels ["op"] ::mdef/labels ["op"]
:type :counter} ::mdef/type :counter}
:websocket-session-timing :websocket-session-timing
{:name "websocket_session_timing" {::mdef/name "penpot_websocket_session_timing"
:help "Websocket session timing (seconds)." ::mdef/help "Websocket session timing (seconds)."
:type :summary} ::mdef/type :summary}
:session-update-total :session-update-total
{:name "http_session_update_total" {::mdef/name "penpot_http_session_update_total"
:help "A counter of session update batch events." ::mdef/help "A counter of session update batch events."
:type :counter} ::mdef/type :counter}
:tasks-timing :tasks-timing
{:name "penpot_tasks_timing" {::mdef/name "penpot_tasks_timing"
:help "Background tasks timing (milliseconds)." ::mdef/help "Background tasks timing (milliseconds)."
:labels ["name"] ::mdef/labels ["name"]
:type :summary} ::mdef/type :summary}
:rlimit-queued-submissions :redis-eval-timing
{:name "penpot_rlimit_queued_submissions" {::mdef/name "penpot_redis_eval_timing"
:help "Current number of queued submissions on RLIMIT." ::mdef/help "Redis EVAL commands execution timings (ms)"
:labels ["name"] ::mdef/labels ["name"]
:type :gauge} ::mdef/type :summary}
:rlimit-used-permits :rpc-semaphore-queued-submissions
{:name "penpot_rlimit_used_permits" {::mdef/name "penpot_rpc_semaphore_queued_submissions"
:help "Current number of used permits on RLIMIT." ::mdef/help "Current number of queued submissions on RPC-SEMAPHORE."
:labels ["name"] ::mdef/labels ["name"]
:type :gauge} ::mdef/type :gauge}
:rlimit-acquires-total :rpc-semaphore-used-permits
{:name "penpot_rlimit_acquires_total" {::mdef/name "penpot_rpc_semaphore_used_permits"
:help "Total number of acquire operations on RLIMIT." ::mdef/help "Current number of used permits on RPC-SEMAPHORE."
:labels ["name"] ::mdef/labels ["name"]
:type :counter} ::mdef/type :gauge}
:rpc-semaphore-acquires-total
{::mdef/name "penpot_rpc_semaphore_acquires_total"
::mdef/help "Total number of acquire operations on RPC-SEMAPHORE."
::mdef/labels ["name"]
::mdef/type :counter}
:executors-active-threads :executors-active-threads
{:name "penpot_executors_active_threads" {::mdef/name "penpot_executors_active_threads"
:help "Current number of threads available in the executor service." ::mdef/help "Current number of threads available in the executor service."
:labels ["name"] ::mdef/labels ["name"]
:type :gauge} ::mdef/type :gauge}
:executors-completed-tasks :executors-completed-tasks
{:name "penpot_executors_completed_tasks_total" {::mdef/name "penpot_executors_completed_tasks_total"
:help "Aproximate number of completed tasks by the executor." ::mdef/help "Aproximate number of completed tasks by the executor."
:labels ["name"] ::mdef/labels ["name"]
:type :counter} ::mdef/type :counter}
:executors-running-threads :executors-running-threads
{:name "penpot_executors_running_threads" {::mdef/name "penpot_executors_running_threads"
:help "Current number of threads with state RUNNING." ::mdef/help "Current number of threads with state RUNNING."
:labels ["name"] ::mdef/labels ["name"]
:type :gauge} ::mdef/type :gauge}
:executors-queued-submissions :executors-queued-submissions
{:name "penpot_executors_queued_submissions" {::mdef/name "penpot_executors_queued_submissions"
:help "Current number of queued submissions." ::mdef/help "Current number of queued submissions."
:labels ["name"] ::mdef/labels ["name"]
:type :gauge}}) ::mdef/type :gauge}})
(s/def ::mdef/name string?)
(s/def ::mdef/help string?)
(s/def ::mdef/labels (s/every string? :kind vector?))
(s/def ::mdef/type #{:gauge :counter :summary :histogram})
(s/def ::mdef/instance
#(instance? SimpleCollector %))
(s/def ::mdef/definition
(s/keys :req [::mdef/name
::mdef/help
::mdef/type]
:opt [::mdef/labels
::mdef/instance]))
(s/def ::definitions
(s/map-of keyword? ::mdef/definition))
(s/def ::registry
#(instance? CollectorRegistry %))
(s/def ::handler fn?)
(s/def ::metrics
(s/keys :req [::registry
::handler
::definitions]))
(defmethod ig/init-key ::metrics (defmethod ig/init-key ::metrics
[_ _] [_ _]
(l/info :action "initialize metrics") (l/info :action "initialize metrics")
(let [registry (create-registry) (let [registry (create-registry)
definitions (reduce-kv (fn [res k v] definitions (reduce-kv (fn [res k v]
(->> (assoc v :registry registry) (->> (assoc v ::registry registry)
(create) (create-collector)
(assoc res k))) (assoc res k)))
{} {}
default-metrics)] default-metrics)]
{:handler (partial handler registry)
:definitions definitions
:registry registry}))
(s/def ::handler fn?) (us/verify! ::definitions definitions)
(s/def ::registry #(instance? CollectorRegistry %))
(s/def ::metrics {::handler (partial handler registry)
(s/keys :req-un [::registry ::handler])) ::definitions definitions
::registry registry}))
(defn- handler (defn- handler
[registry _ respond _] [registry _ respond _]
@ -174,13 +208,16 @@
(def default-histogram-buckets (def default-histogram-buckets
[1 5 10 25 50 75 100 250 500 750 1000 2500 5000 7500]) [1 5 10 25 50 75 100 250 500 750 1000 2500 5000 7500])
(defmulti run-collector! (fn [mdef _] (::mdef/type mdef)))
(defmulti create-collector ::mdef/type)
(defn run! (defn run!
[{:keys [definitions]} {:keys [id] :as params}] [{:keys [::definitions]} {:keys [id] :as params}]
(when-let [mobj (get definitions id)] (when-let [mobj (get definitions id)]
((::fn mobj) params) (run-collector! mobj params)
true)) true))
(defn create-registry (defn- create-registry
[] []
(let [registry (CollectorRegistry.)] (let [registry (CollectorRegistry.)]
(DefaultExports/register registry) (DefaultExports/register registry)
@ -192,79 +229,89 @@
(and (.isArray ^Class oc) (and (.isArray ^Class oc)
(= (.getComponentType oc) String)))) (= (.getComponentType oc) String))))
(defn make-counter (defmethod run-collector! :counter
[{:keys [name help registry reg labels] :as props}] [{:keys [::mdef/instance]} {:keys [inc labels] :or {inc 1 labels default-empty-labels}}]
(let [registry (or registry reg)
instance (.. (Counter/build)
(name name)
(help help))
_ (when (seq labels)
(.labelNames instance (into-array String labels)))
instance (.register instance registry)]
{::instance instance
::fn (fn [{:keys [inc labels] :or {inc 1 labels default-empty-labels}}]
(let [instance (.labels instance (if (is-array? labels) labels (into-array String labels)))] (let [instance (.labels instance (if (is-array? labels) labels (into-array String labels)))]
(.inc ^Counter$Child instance (double inc))))})) (.inc ^Counter$Child instance (double inc))))
(defn make-gauge (defmethod run-collector! :gauge
[{:keys [name help registry reg labels] :as props}] [{:keys [::mdef/instance]} {:keys [inc dec labels val] :or {labels default-empty-labels}}]
(let [registry (or registry reg)
instance (.. (Gauge/build)
(name name)
(help help))
_ (when (seq labels)
(.labelNames instance (into-array String labels)))
instance (.register instance registry)]
{::instance instance
::fn (fn [{:keys [inc dec labels val] :or {labels default-empty-labels}}]
(let [instance (.labels ^Gauge instance (if (is-array? labels) labels (into-array String labels)))] (let [instance (.labels ^Gauge instance (if (is-array? labels) labels (into-array String labels)))]
(cond (number? inc) (.inc ^Gauge$Child instance (double inc)) (cond (number? inc) (.inc ^Gauge$Child instance (double inc))
(number? dec) (.dec ^Gauge$Child instance (double dec)) (number? dec) (.dec ^Gauge$Child instance (double dec))
(number? val) (.set ^Gauge$Child instance (double val)))))})) (number? val) (.set ^Gauge$Child instance (double val)))))
(defn make-summary (defmethod run-collector! :summary
[{:keys [name help registry reg labels max-age quantiles buckets] [{:keys [::mdef/instance]} {:keys [val labels] :or {labels default-empty-labels}}]
:or {max-age 3600 buckets 12 quantiles default-quantiles} :as props}] (let [instance (.labels ^Summary instance (if (is-array? labels) labels (into-array String labels)))]
(.observe ^Summary$Child instance val)))
(defmethod run-collector! :histogram
[{:keys [::mdef/instance]} {:keys [val labels] :or {labels default-empty-labels}}]
(let [instance (.labels ^Histogram instance (if (is-array? labels) labels (into-array String labels)))]
(.observe ^Histogram$Child instance val)))
(defmethod create-collector :counter
[{::mdef/keys [name help reg labels]
::keys [registry]
:as props}]
(let [registry (or registry reg)
instance (.. (Counter/build)
(name name)
(help help))]
(when (seq labels)
(.labelNames instance (into-array String labels)))
(assoc props ::mdef/instance (.register instance registry))))
(defmethod create-collector :gauge
[{::mdef/keys [name help reg labels]
::keys [registry]
:as props}]
(let [registry (or registry reg)
instance (.. (Gauge/build)
(name name)
(help help))]
(when (seq labels)
(.labelNames instance (into-array String labels)))
(assoc props ::mdef/instance (.register instance registry))))
(defmethod create-collector :summary
[{::mdef/keys [name help reg labels max-age quantiles buckets]
::keys [registry]
:or {max-age 3600 buckets 12 quantiles default-quantiles}
:as props}]
(let [registry (or registry reg) (let [registry (or registry reg)
builder (doto (Summary/build) builder (doto (Summary/build)
(.name name) (.name name)
(.help help)) (.help help))]
_ (when (seq quantiles)
(when (seq quantiles)
(.maxAgeSeconds ^Summary$Builder builder ^long max-age) (.maxAgeSeconds ^Summary$Builder builder ^long max-age)
(.ageBuckets ^Summary$Builder builder buckets)) (.ageBuckets ^Summary$Builder builder buckets))
_ (doseq [[q e] quantiles]
(doseq [[q e] quantiles]
(.quantile ^Summary$Builder builder q e)) (.quantile ^Summary$Builder builder q e))
_ (when (seq labels)
(when (seq labels)
(.labelNames ^Summary$Builder builder (into-array String labels))) (.labelNames ^Summary$Builder builder (into-array String labels)))
instance (.register ^Summary$Builder builder registry)]
{::instance instance (assoc props ::mdef/instance (.register ^Summary$Builder builder registry))))
::fn (fn [{:keys [val labels] :or {labels default-empty-labels}}]
(let [instance (.labels ^Summary instance (if (is-array? labels) labels (into-array String labels)))]
(.observe ^Summary$Child instance val)))}))
(defn make-histogram (defmethod create-collector :histogram
[{:keys [name help registry reg labels buckets] [{::mdef/keys [name help reg labels buckets]
:or {buckets default-histogram-buckets}}] ::keys [registry]
:or {buckets default-histogram-buckets}
:as props}]
(let [registry (or registry reg) (let [registry (or registry reg)
instance (doto (Histogram/build) instance (doto (Histogram/build)
(.name name) (.name name)
(.help help) (.help help)
(.buckets (into-array Double/TYPE buckets))) (.buckets (into-array Double/TYPE buckets)))]
_ (when (seq labels)
(when (seq labels)
(.labelNames instance (into-array String labels))) (.labelNames instance (into-array String labels)))
instance (.register instance registry)]
{::instance instance (assoc props ::mdef/instance (.register instance registry))))
::fn (fn [{:keys [val labels] :or {labels default-empty-labels}}]
(let [instance (.labels ^Histogram instance (if (is-array? labels) labels (into-array String labels)))]
(.observe ^Histogram$Child instance val)))}))
(defn create
[{:keys [type] :as props}]
(case type
:counter (make-counter props)
:gauge (make-gauge props)
:summary (make-summary props)
:histogram (make-histogram props)))

View file

@ -13,28 +13,14 @@
[app.common.spec :as us] [app.common.spec :as us]
[app.common.transit :as t] [app.common.transit :as t]
[app.config :as cfg] [app.config :as cfg]
[app.redis :as redis]
[app.util.async :as aa] [app.util.async :as aa]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p]) [promesa.core :as p]))
(:import
io.lettuce.core.RedisClient
io.lettuce.core.RedisURI
io.lettuce.core.api.StatefulConnection
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.codec.ByteArrayCodec
io.lettuce.core.codec.RedisCodec
io.lettuce.core.codec.StringCodec
io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
io.lettuce.core.resource.ClientResources
io.lettuce.core.resource.DefaultClientResources
java.time.Duration))
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
@ -62,18 +48,14 @@
:timeout (dt/duration {:seconds 30})} :timeout (dt/duration {:seconds 30})}
(d/without-nils cfg))) (d/without-nils cfg)))
(s/def ::timeout ::dt/duration)
(s/def ::redis-uri ::us/string)
(s/def ::buffer-size ::us/integer) (s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_] (defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::buffer-size ::redis-uri ::timeout ::wrk/executor])) (s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor]))
(defmethod ig/init-key ::msgbus (defmethod ig/init-key ::msgbus
[_ {:keys [buffer-size redis-uri] :as cfg}] [_ {:keys [buffer-size] :as cfg}]
(l/info :hint "initialize msgbus" (l/info :hint "initialize msgbus" :buffer-size buffer-size)
:buffer-size buffer-size
:redis-uri redis-uri)
(let [cmd-ch (a/chan buffer-size) (let [cmd-ch (a/chan buffer-size)
rcv-ch (a/chan (a/dropping-buffer buffer-size)) rcv-ch (a/chan (a/dropping-buffer buffer-size))
pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic) pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic)
@ -106,33 +88,17 @@
;; --- IMPL ;; --- IMPL
(defn- redis-connect (defn- redis-connect
[{:keys [redis-uri timeout] :as cfg}] [{:keys [timeout redis] :as cfg}]
(let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) (let [pconn (redis/connect redis :timeout timeout)
sconn (redis/connect redis :type :pubsub :timeout timeout)]
resources (.. (DefaultClientResources/builder)
(ioThreadPoolSize 4)
(computationThreadPoolSize 4)
(build))
uri (RedisURI/create redis-uri)
rclient (RedisClient/create ^ClientResources resources ^RedisURI uri)
pconn (.connect ^RedisClient rclient ^RedisCodec codec)
sconn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)]
(.setTimeout ^StatefulRedisConnection pconn ^Duration timeout)
(.setTimeout ^StatefulRedisPubSubConnection sconn ^Duration timeout)
(-> cfg (-> cfg
(assoc ::resources resources)
(assoc ::pconn pconn) (assoc ::pconn pconn)
(assoc ::sconn sconn)))) (assoc ::sconn sconn))))
(defn- redis-disconnect (defn- redis-disconnect
[{:keys [::pconn ::sconn ::resources] :as cfg}] [{:keys [::pconn ::sconn] :as cfg}]
(.. ^StatefulConnection pconn close) (redis/close! pconn)
(.. ^StatefulConnection sconn close) (redis/close! sconn))
(.shutdown ^ClientResources resources))
(defn- conj-subscription (defn- conj-subscription
"A low level function that is responsible to create on-demand "A low level function that is responsible to create on-demand
@ -204,27 +170,18 @@
(defn- create-listener (defn- create-listener
[rcv-ch] [rcv-ch]
(reify RedisPubSubListener (redis/pubsub-listener
(message [_ _pattern _topic _message]) :on-message (fn [_ topic message]
(message [_ topic message]
;; There are no back pressure, so we use a slidding ;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends ;; buffer for cases when the pubsub broker sends
;; more messages that we can process. ;; more messages that we can process.
(let [val {:topic topic :message (t/decode message)}] (let [val {:topic topic :message (t/decode message)}]
(when-not (a/offer! rcv-ch val) (when-not (a/offer! rcv-ch val)
(l/warn :msg "dropping message on subscription loop")))) (l/warn :msg "dropping message on subscription loop"))))))
(psubscribed [_ _pattern _count])
(punsubscribed [_ _pattern _count])
(subscribed [_ _topic _count])
(unsubscribed [_ _topic _count])))
(defn start-io-loop (defn start-io-loop
[{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}] [{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}]
(redis/add-listener! sconn (create-listener rcv-ch))
;; Add a single listener to the pubsub connection
(.addListener ^StatefulRedisPubSubConnection sconn
^RedisPubSubListener (create-listener rcv-ch))
(letfn [(send-to-topic [topic message] (letfn [(send-to-topic [topic message]
(a/go-loop [chans (seq (get-in @state [:topics topic])) (a/go-loop [chans (seq (get-in @state [:topics topic]))
closed #{}] closed #{}]
@ -270,11 +227,10 @@
intended to be used in core.async go blocks." intended to be used in core.async go blocks."
[{:keys [::pconn] :as cfg} {:keys [topic message]}] [{:keys [::pconn] :as cfg} {:keys [topic message]}]
(let [message (t/encode message) (let [message (t/encode message)
res (a/chan 1) res (a/chan 1)]
pcomm (.async ^StatefulRedisConnection pconn)] (-> (redis/publish! pconn topic message)
(-> (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)
(p/finally (fn [_ cause] (p/finally (fn [_ cause]
(when (and cause (.isOpen ^StatefulConnection pconn)) (when (and cause (redis/open? pconn))
(a/offer! res cause)) (a/offer! res cause))
(a/close! res)))) (a/close! res))))
res)) res))
@ -283,14 +239,10 @@
"Create redis subscription. Blocking operation, intended to be used "Create redis subscription. Blocking operation, intended to be used
inside an agent." inside an agent."
[{:keys [::sconn] :as cfg} topic] [{:keys [::sconn] :as cfg} topic]
(let [topic (into-array String [topic]) (redis/subscribe! sconn topic))
scomm (.sync ^StatefulRedisPubSubConnection sconn)]
(.subscribe ^RedisPubSubCommands scomm topic)))
(defn redis-unsub (defn redis-unsub
"Removes redis subscription. Blocking operation, intended to be used "Removes redis subscription. Blocking operation, intended to be used
inside an agent." inside an agent."
[{:keys [::sconn] :as cfg} topic] [{:keys [::sconn] :as cfg} topic]
(let [topic (into-array String [topic]) (redis/unsubscribe! sconn topic))
scomm (.sync ^StatefulRedisPubSubConnection sconn)]
(.unsubscribe ^RedisPubSubCommands scomm topic)))

319
backend/src/app/redis.clj Normal file
View file

@ -0,0 +1,319 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.redis
"The msgbus abstraction implemented using redis as underlying backend."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.common.spec :as us]
[app.metrics :as mtx]
[app.redis.script :as-alias rscript]
[app.util.time :as dt]
[clojure.core :as c]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.core :as p])
(:import
clojure.lang.IDeref
io.lettuce.core.RedisClient
io.lettuce.core.RedisURI
io.lettuce.core.ScriptOutputType
io.lettuce.core.api.StatefulConnection
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.api.async.RedisScriptingAsyncCommands
io.lettuce.core.codec.ByteArrayCodec
io.lettuce.core.codec.RedisCodec
io.lettuce.core.codec.StringCodec
io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
io.lettuce.core.resource.ClientResources
io.lettuce.core.resource.DefaultClientResources
io.netty.util.HashedWheelTimer
io.netty.util.Timer
java.lang.AutoCloseable
java.time.Duration))
(set! *warn-on-reflection* true)
(declare initialize-resources)
(declare shutdown-resources)
(declare connect)
(declare close!)
(s/def ::timer
#(instance? Timer %))
(s/def ::connection
#(or (instance? StatefulRedisConnection %)
(and (instance? IDeref %)
(instance? StatefulRedisConnection (deref %)))))
(s/def ::pubsub-connection
#(or (instance? StatefulRedisPubSubConnection %)
(and (instance? IDeref %)
(instance? StatefulRedisPubSubConnection (deref %)))))
(s/def ::redis-uri
#(instance? RedisURI %))
(s/def ::resources
#(instance? ClientResources %))
(s/def ::pubsub-listener
#(instance? RedisPubSubListener %))
(s/def ::uri ::us/not-empty-string)
(s/def ::timeout ::dt/duration)
(s/def ::connect? ::us/boolean)
(s/def ::io-threads ::us/integer)
(s/def ::worker-threads ::us/integer)
(s/def ::redis
(s/keys :req [::resources ::redis-uri ::timer ::mtx/metrics]
:opt [::connection]))
(defmethod ig/pre-init-spec ::redis [_]
(s/keys :req-un [::uri ::mtx/metrics]
:opt-un [::timeout
::connect?
::io-threads
::worker-threads]))
(defmethod ig/prep-key ::redis
[_ cfg]
(let [runtime (Runtime/getRuntime)
cpus (.availableProcessors ^Runtime runtime)]
(merge {:timeout (dt/duration 5000)
:io-threads (max 3 cpus)
:worker-threads (max 3 cpus)}
(d/without-nils cfg))))
(defmethod ig/init-key ::redis
[_ {:keys [connect?] :as cfg}]
(let [cfg (initialize-resources cfg)]
(cond-> cfg
connect? (assoc ::connection (connect cfg)))))
(defmethod ig/halt-key! ::redis
[_ state]
(shutdown-resources state))
(def default-codec
(RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE))
(def string-codec
(RedisCodec/of StringCodec/UTF8 StringCodec/UTF8))
(defn- initialize-resources
"Initialize redis connection resources"
[{:keys [uri io-threads worker-threads connect? metrics] :as cfg}]
(l/info :hint "initialize redis resources"
:uri uri
:io-threads io-threads
:worker-threads worker-threads
:connect? connect?)
(let [timer (HashedWheelTimer.)
resources (.. (DefaultClientResources/builder)
(ioThreadPoolSize ^long io-threads)
(computationThreadPoolSize ^long worker-threads)
(timer ^Timer timer)
(build))
redis-uri (RedisURI/create ^String uri)]
(-> cfg
(assoc ::mtx/metrics metrics)
(assoc ::cache (atom {}))
(assoc ::timer timer)
(assoc ::redis-uri redis-uri)
(assoc ::resources resources))))
(defn- shutdown-resources
[{:keys [::resources ::cache ::timer]}]
(run! close! (vals @cache))
(when resources
(.shutdown ^ClientResources resources))
(when timer
(.stop ^Timer timer)))
(defn connect
[{:keys [::resources ::redis-uri] :as cfg}
& {:keys [timeout codec type] :or {codec default-codec type :default}}]
(us/assert! ::resources resources)
(let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri)
timeout (or timeout (:timeout cfg))
conn (case type
:default (.connect ^RedisClient client ^RedisCodec codec)
:pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))]
(.setTimeout ^StatefulConnection conn ^Duration timeout)
(reify
IDeref
(deref [_] conn)
AutoCloseable
(close [_]
(.close ^StatefulConnection conn)
(.shutdown ^RedisClient client)))))
(defn get-or-connect
[{:keys [::cache] :as state} key options]
(assoc state ::connection
(or (get @cache key)
(-> (swap! cache (fn [cache]
(when-let [prev (get cache key)]
(close! prev))
(assoc cache key (connect state options))))
(get key)))))
(defn add-listener!
[conn listener]
(us/assert! ::pubsub-connection @conn)
(us/assert! ::pubsub-listener listener)
(.addListener ^StatefulRedisPubSubConnection @conn
^RedisPubSubListener listener)
conn)
(defn publish!
[conn topic message]
(us/assert! ::us/string topic)
(us/assert! ::us/bytes message)
(us/assert! ::connection @conn)
(let [pcomm (.async ^StatefulRedisConnection @conn)]
(.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)))
(defn subscribe!
"Blocking operation, intended to be used on a worker/agent thread."
[conn & topics]
(us/assert! ::pubsub-connection @conn)
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @conn)]
(.subscribe ^RedisPubSubCommands cmd topics)))
(defn unsubscribe!
"Blocking operation, intended to be used on a worker/agent thread."
[conn & topics]
(us/assert! ::pubsub-connection @conn)
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @conn)]
(.unsubscribe ^RedisPubSubCommands cmd topics)))
(defn open?
[conn]
(.isOpen ^StatefulConnection @conn))
(defn pubsub-listener
[& {:keys [on-message on-subscribe on-unsubscribe]}]
(reify RedisPubSubListener
(message [_ pattern topic message]
(when on-message
(on-message pattern topic message)))
(message [_ topic message]
(when on-message
(on-message nil topic message)))
(psubscribed [_ pattern count]
(when on-subscribe
(on-subscribe pattern nil count)))
(punsubscribed [_ pattern count]
(when on-unsubscribe
(on-unsubscribe pattern nil count)))
(subscribed [_ topic count]
(when on-subscribe
(on-subscribe nil topic count)))
(unsubscribed [_ topic count]
(when on-unsubscribe
(on-unsubscribe nil topic count)))))
(defn close!
[o]
(.close ^AutoCloseable o))
(def ^:private scripts-cache (atom {}))
(def noop-fn (constantly nil))
(s/def ::rscript/name qualified-keyword?)
(s/def ::rscript/path ::us/not-empty-string)
(s/def ::rscript/keys (s/every any? :kind vector?))
(s/def ::rscript/vals (s/every any? :kind vector?))
(s/def ::rscript/script
(s/keys :req [::rscript/name
::rscript/path]
:opt [::rscript/keys
::rscript/vals]))
(defn eval!
[{:keys [::mtx/metrics] :as state} script]
(us/assert! ::rscript/script script)
(us/assert! ::redis state)
(let [rconn (-> state ::connection deref)
cmd (.async ^StatefulRedisConnection rconn)
keys (into-array String (map str (::rscript/keys script)))
vals (into-array String (map str (::rscript/vals script)))
sname (::rscript/name script)]
(letfn [(on-error [cause]
(if (instance? io.lettuce.core.RedisNoScriptException cause)
(do
(l/error :hint "no script found" :name sname :cause cause)
(-> (load-script)
(p/then eval-script)))
(if-let [on-error (::rscript/on-error script)]
(on-error cause)
(p/rejected cause))))
(eval-script [sha]
(let [tpoint (dt/tpoint)]
(-> (.evalsha ^RedisScriptingAsyncCommands cmd
^String sha
^ScriptOutputType ScriptOutputType/MULTI
^"[Ljava.lang.String;" keys
^"[Ljava.lang.String;" vals)
(p/then (fn [result]
(let [elapsed (tpoint)]
(mtx/run! metrics {:id :redis-eval-timing
:labels [(name sname)]
:val (inst-ms elapsed)})
(l/trace :hint "eval script"
:name (name sname)
:sha sha
:params (str/join "," (::rscript/vals script))
:elapsed (dt/format-duration elapsed))
result)))
(p/catch on-error))))
(read-script []
(-> script ::rscript/path io/resource slurp))
(load-script []
(l/trace :hint "load script" :name sname)
(-> (.scriptLoad ^RedisScriptingAsyncCommands cmd
^String (read-script))
(p/then (fn [sha]
(swap! scripts-cache assoc sname sha)
sha))))]
(if-let [sha (get @scripts-cache sname)]
(eval-script sha)
(-> (load-script)
(p/then eval-script))))))

View file

@ -10,10 +10,12 @@
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.http :as-alias http]
[app.loggers.audit :as audit] [app.loggers.audit :as audit]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.rpc.retry :as retry] [app.rpc.retry :as retry]
[app.rpc.rlimit :as rlimit] [app.rpc.rlimit :as rlimit]
[app.rpc.semaphore :as rsem]
[app.util.async :as async] [app.util.async :as async]
[app.util.services :as sv] [app.util.services :as sv]
[app.worker :as wrk] [app.worker :as wrk]
@ -39,39 +41,35 @@
(ex/ignoring (hook-fn))) (ex/ignoring (hook-fn)))
response) response)
(defn- handle-response
[request result]
(let [mdata (meta result)]
(p/-> (yrs/response 200 result (::http/headers mdata {}))
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata))))
(defn- rpc-query-handler (defn- rpc-query-handler
"Ring handler that dispatches query requests and convert between "Ring handler that dispatches query requests and convert between
internal async flow into ring async flow." internal async flow into ring async flow."
[methods {:keys [profile-id session-id params] :as request} respond raise] [methods {:keys [profile-id session-id params] :as request} respond raise]
(letfn [(handle-response [result]
(let [mdata (meta result)]
(-> (yrs/response 200 result)
(handle-response-transformation request mdata))))]
(let [type (keyword (:type params)) (let [type (keyword (:type params))
data (into {::request request} params) data (into {::http/request request} params)
data (if profile-id data (if profile-id
(assoc data :profile-id profile-id ::session-id session-id) (assoc data :profile-id profile-id ::session-id session-id)
(dissoc data :profile-id)) (dissoc data :profile-id))
method (get methods type default-handler)] method (get methods type default-handler)]
(-> (method data) (-> (method data)
(p/then handle-response) (p/then (partial handle-response request))
(p/then respond) (p/then respond)
(p/catch (fn [cause] (p/catch (fn [cause]
(let [context {:profile-id profile-id}] (let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context))))))))) (raise (ex/wrap-with-context cause context))))))))
(defn- rpc-mutation-handler (defn- rpc-mutation-handler
"Ring handler that dispatches mutation requests and convert between "Ring handler that dispatches mutation requests and convert between
internal async flow into ring async flow." internal async flow into ring async flow."
[methods {:keys [profile-id session-id params] :as request} respond raise] [methods {:keys [profile-id session-id params] :as request} respond raise]
(letfn [(handle-response [result]
(let [mdata (meta result)]
(p/-> (yrs/response 200 result)
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata))))]
(let [type (keyword (:type params)) (let [type (keyword (:type params))
data (into {::request request} params) data (into {::request request} params)
data (if profile-id data (if profile-id
@ -80,22 +78,16 @@
method (get methods type default-handler)] method (get methods type default-handler)]
(-> (method data) (-> (method data)
(p/then handle-response) (p/then (partial handle-response request))
(p/then respond) (p/then respond)
(p/catch (fn [cause] (p/catch (fn [cause]
(let [context {:profile-id profile-id}] (let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context))))))))) (raise (ex/wrap-with-context cause context))))))))
(defn- rpc-command-handler (defn- rpc-command-handler
"Ring handler that dispatches cmd requests and convert between "Ring handler that dispatches cmd requests and convert between
internal async flow into ring async flow." internal async flow into ring async flow."
[methods {:keys [profile-id session-id params] :as request} respond raise] [methods {:keys [profile-id session-id params] :as request} respond raise]
(letfn [(handle-response [result]
(let [mdata (meta result)]
(p/-> (yrs/response 200 result)
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata))))]
(let [cmd (keyword (:command params)) (let [cmd (keyword (:command params))
data (into {::request request} params) data (into {::request request} params)
data (if profile-id data (if profile-id
@ -104,16 +96,17 @@
method (get methods cmd default-handler)] method (get methods cmd default-handler)]
(-> (method data) (-> (method data)
(p/then handle-response) (p/then (partial handle-response request))
(p/then respond) (p/then respond)
(p/catch (fn [cause] (p/catch (fn [cause]
(let [context {:profile-id profile-id}] (let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context))))))))) (raise (ex/wrap-with-context cause context))))))))
(defn- wrap-metrics (defn- wrap-metrics
"Wrap service method with metrics measurement." "Wrap service method with metrics measurement."
[{:keys [metrics ::metrics-id]} f mdata] [{:keys [metrics ::metrics-id]} f mdata]
(let [labels (into-array String [(::sv/name mdata)])] (let [labels (into-array String [(::sv/name mdata)])]
(fn [cfg params] (fn [cfg params]
(let [start (System/nanoTime)] (let [start (System/nanoTime)]
(p/finally (p/finally
@ -177,7 +170,8 @@
[cfg f mdata] [cfg f mdata]
(let [f (as-> f $ (let [f (as-> f $
(wrap-dispatch cfg $ mdata) (wrap-dispatch cfg $ mdata)
(rlimit/wrap-rlimit cfg $ mdata) (rsem/wrap cfg $ mdata)
(rlimit/wrap cfg $ mdata)
(retry/wrap-retry cfg $ mdata) (retry/wrap-retry cfg $ mdata)
(wrap-audit cfg $ mdata) (wrap-audit cfg $ mdata)
(wrap-metrics cfg $ mdata) (wrap-metrics cfg $ mdata)
@ -258,12 +252,12 @@
(s/def ::public-uri ::us/not-empty-string) (s/def ::public-uri ::us/not-empty-string)
(s/def ::session map?) (s/def ::session map?)
(s/def ::storage some?) (s/def ::storage some?)
(s/def ::tokens fn?) (s/def ::sprops map?)
(defmethod ig/pre-init-spec ::methods [_] (defmethod ig/pre-init-spec ::methods [_]
(s/keys :req-un [::storage (s/keys :req-un [::storage
::session ::session
::tokens ::sprops
::audit ::audit
::executors ::executors
::public-uri ::public-uri

View file

@ -16,7 +16,8 @@
[app.rpc.doc :as-alias doc] [app.rpc.doc :as-alias doc]
[app.rpc.mutations.teams :as teams] [app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile] [app.rpc.queries.profile :as profile]
[app.rpc.rlimit :as rlimit] [app.rpc.semaphore :as rsem]
[app.tokens :as tokens]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[buddy.hashers :as hashers] [buddy.hashers :as hashers]
@ -80,7 +81,7 @@
;; ---- COMMAND: login with password ;; ---- COMMAND: login with password
(defn login-with-password (defn login-with-password
[{:keys [pool session tokens] :as cfg} {:keys [email password] :as params}] [{:keys [pool session sprops] :as cfg} {:keys [email password] :as params}]
(when-not (contains? cf/flags :login) (when-not (contains? cf/flags :login)
(ex/raise :type :restriction (ex/raise :type :restriction
@ -114,7 +115,7 @@
(profile/decode-profile-row)) (profile/decode-profile-row))
invitation (when-let [token (:invitation-token params)] invitation (when-let [token (:invitation-token params)]
(tokens :verify {:token token :iss :team-invitation})) (tokens/verify sprops {:token token :iss :team-invitation}))
;; If invitation member-id does not matches the profile-id, we just proceed to ignore the ;; If invitation member-id does not matches the profile-id, we just proceed to ignore the
;; invitation because invitations matches exactly; and user can't loging with other email and ;; invitation because invitations matches exactly; and user can't loging with other email and
@ -135,7 +136,7 @@
(sv/defmethod ::login-with-password (sv/defmethod ::login-with-password
"Performs authentication using penpot password." "Performs authentication using penpot password."
{:auth false {:auth false
::rlimit/permits (cf/get :rlimit-password) ::rsem/permits (cf/get :rpc-semaphore-permits-password)
::doc/added "1.15"} ::doc/added "1.15"}
[cfg params] [cfg params]
(login-with-password cfg params)) (login-with-password cfg params))
@ -156,9 +157,9 @@
;; ---- COMMAND: Recover Profile ;; ---- COMMAND: Recover Profile
(defn recover-profile (defn recover-profile
[{:keys [pool tokens] :as cfg} {:keys [token password]}] [{:keys [pool sprops] :as cfg} {:keys [token password]}]
(letfn [(validate-token [token] (letfn [(validate-token [token]
(let [tdata (tokens :verify {:token token :iss :password-recovery})] (let [tdata (tokens/verify sprops {:token token :iss :password-recovery})]
(:profile-id tdata))) (:profile-id tdata)))
(update-password [conn profile-id] (update-password [conn profile-id]
@ -176,7 +177,7 @@
(sv/defmethod ::recover-profile (sv/defmethod ::recover-profile
{:auth false {:auth false
::rlimit/permits (cf/get :rlimit-password) ::rsem/permits (cf/get :rpc-semaphore-permits-password)
::doc/added "1.15"} ::doc/added "1.15"}
[cfg params] [cfg params]
(recover-profile cfg params)) (recover-profile cfg params))
@ -184,12 +185,12 @@
;; ---- COMMAND: Prepare Register ;; ---- COMMAND: Prepare Register
(defn prepare-register (defn prepare-register
[{:keys [pool tokens] :as cfg} params] [{:keys [pool sprops] :as cfg} params]
(when-not (contains? cf/flags :registration) (when-not (contains? cf/flags :registration)
(if-not (contains? params :invitation-token) (if-not (contains? params :invitation-token)
(ex/raise :type :restriction (ex/raise :type :restriction
:code :registration-disabled) :code :registration-disabled)
(let [invitation (tokens :verify {:token (:invitation-token params) :iss :team-invitation})] (let [invitation (tokens/verify sprops {:token (:invitation-token params) :iss :team-invitation})]
(when-not (= (:email params) (:member-email invitation)) (when-not (= (:email params) (:member-email invitation))
(ex/raise :type :restriction (ex/raise :type :restriction
:code :email-does-not-match-invitation :code :email-does-not-match-invitation
@ -222,7 +223,7 @@
:iss :prepared-register :iss :prepared-register
:exp (dt/in-future "48h")} :exp (dt/in-future "48h")}
token (tokens :generate params)] token (tokens/generate sprops params)]
(with-meta {:token token} (with-meta {:token token}
{::audit/profile-id uuid/zero}))) {::audit/profile-id uuid/zero})))
@ -297,8 +298,8 @@
(assoc :default-project-id (:default-project-id team))))) (assoc :default-project-id (:default-project-id team)))))
(defn register-profile (defn register-profile
[{:keys [conn tokens session] :as cfg} {:keys [token] :as params}] [{:keys [conn sprops session] :as cfg} {:keys [token] :as params}]
(let [claims (tokens :verify {:token token :iss :prepared-register}) (let [claims (tokens/verify sprops {:token token :iss :prepared-register})
params (merge params claims)] params (merge params claims)]
(check-profile-existence! conn params) (check-profile-existence! conn params)
(let [is-active (or (:is-active params) (let [is-active (or (:is-active params)
@ -308,14 +309,14 @@
(create-profile-relations conn) (create-profile-relations conn)
(profile/decode-profile-row)) (profile/decode-profile-row))
invitation (when-let [token (:invitation-token params)] invitation (when-let [token (:invitation-token params)]
(tokens :verify {:token token :iss :team-invitation}))] (tokens/verify sprops {:token token :iss :team-invitation}))]
(cond (cond
;; If invitation token comes in params, this is because the user comes from team-invitation process; ;; If invitation token comes in params, this is because the user comes from team-invitation process;
;; in this case, regenerate token and send back to the user a new invitation token (and mark current ;; in this case, regenerate token and send back to the user a new invitation token (and mark current
;; session as logged). This happens only if the invitation email matches with the register email. ;; session as logged). This happens only if the invitation email matches with the register email.
(and (some? invitation) (= (:email profile) (:member-email invitation))) (and (some? invitation) (= (:email profile) (:member-email invitation)))
(let [claims (assoc invitation :member-id (:id profile)) (let [claims (assoc invitation :member-id (:id profile))
token (tokens :generate claims) token (tokens/generate sprops claims)
resp {:invitation-token token}] resp {:invitation-token token}]
(with-meta resp (with-meta resp
{:transform-response ((:create session) (:id profile)) {:transform-response ((:create session) (:id profile))
@ -341,14 +342,15 @@
;; In all other cases, send a verification email. ;; In all other cases, send a verification email.
:else :else
(let [vtoken (tokens :generate (let [vtoken (tokens/generate sprops
{:iss :verify-email {:iss :verify-email
:exp (dt/in-future "48h") :exp (dt/in-future "48h")
:profile-id (:id profile) :profile-id (:id profile)
:email (:email profile)}) :email (:email profile)})
ptoken (tokens :generate-predefined ptoken (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})] :profile-id (:id profile)
:exp (dt/in-future {:days 30})})]
(eml/send! {::eml/conn conn (eml/send! {::eml/conn conn
::eml/factory eml/register ::eml/factory eml/register
:public-uri (:public-uri cfg) :public-uri (:public-uri cfg)
@ -366,7 +368,7 @@
(sv/defmethod ::register-profile (sv/defmethod ::register-profile
{:auth false {:auth false
::rlimit/permits (cf/get :rlimit-password) ::rsem/permits (cf/get :rpc-semaphore-permits-password)
::doc/added "1.15"} ::doc/added "1.15"}
[{:keys [pool] :as cfg} params] [{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
@ -376,18 +378,19 @@
;; ---- COMMAND: Request Profile Recovery ;; ---- COMMAND: Request Profile Recovery
(defn request-profile-recovery (defn request-profile-recovery
[{:keys [pool tokens] :as cfg} {:keys [email] :as params}] [{:keys [pool sprops] :as cfg} {:keys [email] :as params}]
(letfn [(create-recovery-token [{:keys [id] :as profile}] (letfn [(create-recovery-token [{:keys [id] :as profile}]
(let [token (tokens :generate (let [token (tokens/generate sprops
{:iss :password-recovery {:iss :password-recovery
:exp (dt/in-future "15m") :exp (dt/in-future "15m")
:profile-id id})] :profile-id id})]
(assoc profile :token token))) (assoc profile :token token)))
(send-email-notification [conn profile] (send-email-notification [conn profile]
(let [ptoken (tokens :generate-predefined (let [ptoken (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})] :profile-id (:id profile)
:exp (dt/in-future {:days 30})})]
(eml/send! {::eml/conn conn (eml/send! {::eml/conn conn
::eml/factory eml/password-recovery ::eml/factory eml/password-recovery
:public-uri (:public-uri cfg) :public-uri (:public-uri cfg)

View file

@ -20,7 +20,7 @@
[app.rpc.permissions :as perms] [app.rpc.permissions :as perms]
[app.rpc.queries.files :as files] [app.rpc.queries.files :as files]
[app.rpc.queries.projects :as proj] [app.rpc.queries.projects :as proj]
[app.rpc.rlimit :as rlimit] [app.rpc.semaphore :as rsem]
[app.storage.impl :as simpl] [app.storage.impl :as simpl]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.services :as sv] [app.util.services :as sv]
@ -318,7 +318,7 @@
(contains? o :changes-with-metadata))))) (contains? o :changes-with-metadata)))))
(sv/defmethod ::update-file (sv/defmethod ::update-file
{::rlimit/permits (cf/get :rlimit-file-update)} {::rsem/permits (cf/get :rpc-semaphore-permits-file-update)}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(db/xact-lock! conn id) (db/xact-lock! conn id)

View file

@ -15,7 +15,7 @@
[app.media :as media] [app.media :as media]
[app.rpc.doc :as-alias doc] [app.rpc.doc :as-alias doc]
[app.rpc.queries.teams :as teams] [app.rpc.queries.teams :as teams]
[app.rpc.rlimit :as rlimit] [app.rpc.semaphore :as rsem]
[app.storage :as sto] [app.storage :as sto]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
@ -42,7 +42,7 @@
::font-id ::font-family ::font-weight ::font-style])) ::font-id ::font-family ::font-weight ::font-style]))
(sv/defmethod ::create-font-variant (sv/defmethod ::create-font-variant
{::rlimit/permits (cf/get :rlimit-font)} {::rsem/permits (cf/get :rpc-semaphore-permits-font)}
[{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}] [{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)] (let [cfg (update cfg :storage media/configure-assets-storage)]
(teams/check-edition-permissions! pool profile-id team-id) (teams/check-edition-permissions! pool profile-id team-id)

View file

@ -15,7 +15,7 @@
[app.db :as db] [app.db :as db]
[app.media :as media] [app.media :as media]
[app.rpc.queries.teams :as teams] [app.rpc.queries.teams :as teams]
[app.rpc.rlimit :as rlimit] [app.rpc.semaphore :as rsem]
[app.storage :as sto] [app.storage :as sto]
[app.storage.tmp :as tmp] [app.storage.tmp :as tmp]
[app.util.bytes :as bs] [app.util.bytes :as bs]
@ -53,7 +53,7 @@
:opt-un [::id])) :opt-un [::id]))
(sv/defmethod ::upload-file-media-object (sv/defmethod ::upload-file-media-object
{::rlimit/permits (cf/get :rlimit-image)} {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[{:keys [pool] :as cfg} {:keys [profile-id file-id content] :as params}] [{:keys [pool] :as cfg} {:keys [profile-id file-id content] :as params}]
(let [file (select-file pool file-id) (let [file (select-file pool file-id)
cfg (update cfg :storage media/configure-assets-storage)] cfg (update cfg :storage media/configure-assets-storage)]
@ -181,7 +181,7 @@
:opt-un [::id ::name])) :opt-un [::id ::name]))
(sv/defmethod ::create-file-media-object-from-url (sv/defmethod ::create-file-media-object-from-url
{::rlimit/permits (cf/get :rlimit-image)} {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}]
(let [file (select-file pool file-id) (let [file (select-file pool file-id)
cfg (update cfg :storage media/configure-assets-storage)] cfg (update cfg :storage media/configure-assets-storage)]

View file

@ -17,8 +17,9 @@
[app.rpc.commands.auth :as cmd.auth] [app.rpc.commands.auth :as cmd.auth]
[app.rpc.mutations.teams :as teams] [app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile] [app.rpc.queries.profile :as profile]
[app.rpc.rlimit :as rlimit] [app.rpc.semaphore :as rsem]
[app.storage :as sto] [app.storage :as sto]
[app.tokens :as tokens]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
@ -86,7 +87,7 @@
(s/keys :req-un [::profile-id ::password ::old-password])) (s/keys :req-un [::profile-id ::password ::old-password]))
(sv/defmethod ::update-profile-password (sv/defmethod ::update-profile-password
{::rlimit/permits (cf/get :rlimit-password)} {::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[{:keys [pool] :as cfg} {:keys [password] :as params}] [{:keys [pool] :as cfg} {:keys [password] :as params}]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(let [profile (validate-password! conn params) (let [profile (validate-password! conn params)
@ -129,7 +130,7 @@
(s/keys :req-un [::profile-id ::file])) (s/keys :req-un [::profile-id ::file]))
(sv/defmethod ::update-profile-photo (sv/defmethod ::update-profile-photo
{::rlimit/permits (cf/get :rlimit-image)} {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[cfg {:keys [file] :as params}] [cfg {:keys [file] :as params}]
;; Validate incoming mime type ;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
@ -183,15 +184,16 @@
{:changed true}) {:changed true})
(defn- request-email-change (defn- request-email-change
[{:keys [conn tokens] :as cfg} {:keys [profile email] :as params}] [{:keys [conn sprops] :as cfg} {:keys [profile email] :as params}]
(let [token (tokens :generate (let [token (tokens/generate sprops
{:iss :change-email {:iss :change-email
:exp (dt/in-future "15m") :exp (dt/in-future "15m")
:profile-id (:id profile) :profile-id (:id profile)
:email email}) :email email})
ptoken (tokens :generate-predefined ptoken (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})] :profile-id (:id profile)
:exp (dt/in-future {:days 30})})]
(when (not= email (:email profile)) (when (not= email (:email profile))
(cmd.auth/check-profile-existence! conn params)) (cmd.auth/check-profile-existence! conn params))
@ -303,7 +305,7 @@
(s/def ::login ::cmd.auth/login-with-password) (s/def ::login ::cmd.auth/login-with-password)
(sv/defmethod ::login (sv/defmethod ::login
{:auth false ::rlimit/permits (cf/get :rlimit-password)} {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[cfg params] [cfg params]
(cmd.auth/login-with-password cfg params)) (cmd.auth/login-with-password cfg params))
@ -321,7 +323,7 @@
(s/def ::recover-profile ::cmd.auth/recover-profile) (s/def ::recover-profile ::cmd.auth/recover-profile)
(sv/defmethod ::recover-profile (sv/defmethod ::recover-profile
{:auth false ::rlimit/permits (cf/get :rlimit-password)} {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[cfg params] [cfg params]
(cmd.auth/recover-profile cfg params)) (cmd.auth/recover-profile cfg params))
@ -338,7 +340,7 @@
(s/def ::register-profile ::cmd.auth/register-profile) (s/def ::register-profile ::cmd.auth/register-profile)
(sv/defmethod ::register-profile (sv/defmethod ::register-profile
{:auth false ::rlimit/permits (cf/get :rlimit-password)} {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[{:keys [pool] :as cfg} params] [{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(-> (assoc cfg :conn conn) (-> (assoc cfg :conn conn)

View file

@ -20,8 +20,9 @@
[app.rpc.permissions :as perms] [app.rpc.permissions :as perms]
[app.rpc.queries.profile :as profile] [app.rpc.queries.profile :as profile]
[app.rpc.queries.teams :as teams] [app.rpc.queries.teams :as teams]
[app.rpc.rlimit :as rlimit] [app.rpc.semaphore :as rsem]
[app.storage :as sto] [app.storage :as sto]
[app.tokens :as tokens]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
@ -289,7 +290,7 @@
(s/keys :req-un [::profile-id ::team-id ::file])) (s/keys :req-un [::profile-id ::team-id ::file]))
(sv/defmethod ::update-team-photo (sv/defmethod ::update-team-photo
{::rlimit/permits (cf/get :rlimit-image)} {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[cfg {:keys [file] :as params}] [cfg {:keys [file] :as params}]
;; Validate incoming mime type ;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
@ -398,10 +399,10 @@
update set role = ?, valid_until = ?, updated_at = now();") update set role = ?, valid_until = ?, updated_at = now();")
(defn- create-team-invitation (defn- create-team-invitation
[{:keys [conn tokens team profile role email] :as cfg}] [{:keys [conn sprops team profile role email] :as cfg}]
(let [member (profile/retrieve-profile-data-by-email conn email) (let [member (profile/retrieve-profile-data-by-email conn email)
token-exp (dt/in-future "168h") ;; 7 days token-exp (dt/in-future "168h") ;; 7 days
itoken (tokens :generate itoken (tokens/generate sprops
{:iss :team-invitation {:iss :team-invitation
:exp token-exp :exp token-exp
:profile-id (:id profile) :profile-id (:id profile)
@ -409,9 +410,10 @@
:team-id (:id team) :team-id (:id team)
:member-email (:email member email) :member-email (:email member email)
:member-id (:id member)}) :member-id (:id member)})
ptoken (tokens :generate-predefined ptoken (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})] :profile-id (:id profile)
:exp (dt/in-future {:days 30})})]
(when (contains? cf/flags :log-invitation-tokens) (when (contains? cf/flags :log-invitation-tokens)
(l/trace :hint "invitation token" :token itoken)) (l/trace :hint "invitation token" :token itoken))

View file

@ -12,6 +12,8 @@
[app.loggers.audit :as audit] [app.loggers.audit :as audit]
[app.rpc.mutations.teams :as teams] [app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile] [app.rpc.queries.profile :as profile]
[app.tokens :as tokens]
[app.tokens.spec.team-invitation :as-alias spec.team-invitation]
[app.util.services :as sv] [app.util.services :as sv]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str])) [cuerdas.core :as str]))
@ -23,9 +25,9 @@
:opt-un [::profile-id])) :opt-un [::profile-id]))
(sv/defmethod ::verify-token {:auth false} (sv/defmethod ::verify-token {:auth false}
[{:keys [pool tokens] :as cfg} {:keys [token] :as params}] [{:keys [pool sprops] :as cfg} {:keys [token] :as params}]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(let [claims (tokens :verify {:token token}) (let [claims (tokens/verify sprops {:token token})
cfg (assoc cfg :conn conn)] cfg (assoc cfg :conn conn)]
(process-token cfg params claims)))) (process-token cfg params claims))))
@ -76,19 +78,19 @@
(s/def ::iss keyword?) (s/def ::iss keyword?)
(s/def ::exp ::us/inst) (s/def ::exp ::us/inst)
(s/def :internal.tokens.team-invitation/profile-id ::us/uuid) (s/def ::spec.team-invitation/profile-id ::us/uuid)
(s/def :internal.tokens.team-invitation/role ::us/keyword) (s/def ::spec.team-invitation/role ::us/keyword)
(s/def :internal.tokens.team-invitation/team-id ::us/uuid) (s/def ::spec.team-invitation/team-id ::us/uuid)
(s/def :internal.tokens.team-invitation/member-email ::us/email) (s/def ::spec.team-invitation/member-email ::us/email)
(s/def :internal.tokens.team-invitation/member-id (s/nilable ::us/uuid)) (s/def ::spec.team-invitation/member-id (s/nilable ::us/uuid))
(s/def ::team-invitation-claims (s/def ::team-invitation-claims
(s/keys :req-un [::iss ::exp (s/keys :req-un [::iss ::exp
:internal.tokens.team-invitation/profile-id ::spec.team-invitation/profile-id
:internal.tokens.team-invitation/role ::spec.team-invitation/role
:internal.tokens.team-invitation/team-id ::spec.team-invitation/team-id
:internal.tokens.team-invitation/member-email] ::spec.team-invitation/member-email]
:opt-un [:internal.tokens.team-invitation/member-id])) :opt-un [::spec.team-invitation/member-id]))
(defn- accept-invitation (defn- accept-invitation
[{:keys [conn] :as cfg} {:keys [member-id team-id role member-email] :as claims}] [{:keys [conn] :as cfg} {:keys [member-id team-id role member-email] :as claims}]

View file

@ -5,63 +5,266 @@
;; Copyright (c) UXBOX Labs SL ;; Copyright (c) UXBOX Labs SL
(ns app.rpc.rlimit (ns app.rpc.rlimit
"Resource usage limits (in other words: semaphores)." "Rate limit strategies implementation for RPC services.
It mainly implements two strategies: fixed window and bucket. You
can use one of them or both to create a combination of limits. All
limits are updated in each request and the most restrictive one
blocks the user activity.
On the HTTP layer it translates to the 429 http response.
The limits are defined as vector of 3 elements:
[<name:keyword> <strategy:keyword> <opts:string>]
The opts format is strategy dependent. With fixed `:window` strategy
you have the following format:
[:somename :window \"1000/m\"]
Where the first number means the quantity of allowed request and the
letter indicates the window unit, that can be `w` for weeks, `h` for
hours, `m` for minutes and `s` for seconds.
The the `:bucket` strategy you will have something like this:
[:somename :bucket \"100/10/15s]
Where the first number indicates the total tokens capacity (or
available burst), the second number indicates the refill rate and
the last number suffixed with the unit indicates the time window (or
interval) of the refill. This means that this limit configurations
allow burst of 100 elements and will refill 10 tokens each 15s (1
token each 1.5segons).
The bucket strategy works well for small intervals and window
strategy works better for large intervals.
All limits uses the profile-id as user identifier. In case of the
profile-id is not available, the IP address is used as fallback
value.
"
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.metrics :as mtx] [app.common.spec :as us]
[app.util.services :as sv] [app.common.uri :as uri]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.http :as-alias http]
[app.loggers.audit :refer [parse-client-ip]]
[app.redis :as redis]
[app.redis.script :as-alias rscript]
[app.rpc.rlimit.result :as-alias lresult]
[app.util.services :as-alias sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[promesa.core :as p])) [promesa.core :as p]))
(defprotocol IAsyncSemaphore (def ^:private default-timeout
(acquire! [_]) (dt/duration 400))
(release! [_]))
(defn semaphore (def ^:private default-options
[{:keys [permits metrics name]}] {:codec redis/string-codec
(let [name (d/name name) :timeout default-timeout})
used (volatile! 0)
queue (volatile! (d/queue))
labels (into-array String [name])]
(reify IAsyncSemaphore
(acquire! [this]
(let [d (p/deferred)]
(locking this
(if (< @used permits)
(do
(vswap! used inc)
(p/resolve! d))
(vswap! queue conj d)))
(mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels }) (def ^:private bucket-rate-limit-script
(mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels}) {::rscript/name ::bucket-rate-limit
(mtx/run! metrics {:id :rlimit-acquires-total :inc 1 :labels labels}) ::rscript/path "app/rpc/rlimit/bucket.lua"})
d))
(release! [this] (def ^:private window-rate-limit-script
(locking this {::rscript/name ::window-rate-limit
(if-let [item (peek @queue)] ::rscript/path "app/rpc/rlimit/window.lua"})
(do
(vswap! queue pop)
(p/resolve! item))
(when (pos? @used)
(vswap! used dec))))
(mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels}) (def enabled?
(mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels}) "Allows on runtime completly disable rate limiting."
)))) (atom true))
(defn wrap-rlimit (def ^:private window-opts-re
[{:keys [metrics executors] :as cfg} f mdata] #"^(\d+)/([wdhms])$")
(if-let [permits (::permits mdata)]
(let [sem (semaphore {:permits permits
:metrics metrics
:name (::sv/name mdata)})]
(l/debug :hint "wrapping rlimit" :handler (::sv/name mdata) :permits permits)
(fn [cfg params]
(-> (acquire! sem)
(p/then (fn [_] (f cfg params)) (:default executors))
(p/finally (fn [_ _] (release! sem))))))
f))
(def ^:private bucket-opts-re
#"^(\d+)/(\d+)/(\d+[hms])$")
(s/def ::strategy (s/and ::us/keyword #{:window :bucket}))
(s/def ::limit-definition
(s/tuple ::us/keyword ::strategy string?))
(defmulti parse-limit (fn [[_ strategy _]] strategy))
(defmulti process-limit (fn [_ _ _ o] (::strategy o)))
(defmethod parse-limit :window
[[name strategy opts :as vlimit]]
(us/assert! ::limit-definition vlimit)
(merge
{::name name
::strategy strategy}
(if-let [[_ nreq unit] (re-find window-opts-re opts)]
(let [nreq (parse-long nreq)]
{::nreq nreq
::unit (case unit
"d" :days
"h" :hours
"m" :minutes
"s" :seconds
"w" :weeks)
::key (dm/str "ratelimit.window." (d/name name))
::opts opts})
(ex/raise :type :validation
:code :invalid-window-limit-opts
:hint (str/ffmt "looks like '%' does not have a valid format" opts)))))
(defmethod parse-limit :bucket
[[name strategy opts :as vlimit]]
(us/assert! ::limit-definition vlimit)
(merge
{::name name
::strategy strategy}
(if-let [[_ capacity rate interval] (re-find bucket-opts-re opts)]
(let [interval (dt/duration interval)
rate (parse-long rate)
capacity (parse-long capacity)]
{::capacity capacity
::rate rate
::interval interval
::opts opts
::params [(dt/->seconds interval) rate capacity]
::key (dm/str "ratelimit.bucket." (d/name name))})
(ex/raise :type :validation
:code :invalid-bucket-limit-opts
:hint (str/ffmt "looks like '%' does not have a valid format" opts)))))
(defmethod process-limit :bucket
[redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}]
(let [script (-> bucket-rate-limit-script
(assoc ::rscript/keys [(dm/str key "." service "." user-id)])
(assoc ::rscript/vals (conj params (dt/->seconds now))))]
(-> (redis/eval! redis script)
(p/then (fn [result]
(let [allowed? (boolean (nth result 0))
remaining (nth result 1)
reset (* (/ (inst-ms interval) rate)
(- capacity remaining))]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
:allowed? allowed?
:remaining remaining)
(-> limit
(assoc ::lresult/allowed? allowed?)
(assoc ::lresult/reset (dt/plus now reset))
(assoc ::lresult/remaining remaining))))))))
(defmethod process-limit :window
[redis user-id now {:keys [::nreq ::unit ::key ::service] :as limit}]
(let [ts (dt/truncate now unit)
ttl (dt/diff now (dt/plus ts {unit 1}))
script (-> window-rate-limit-script
(assoc ::rscript/keys [(dm/str key "." service "." user-id "." (dt/format-instant ts))])
(assoc ::rscript/vals [nreq (dt/->seconds ttl)]))]
(-> (redis/eval! redis script)
(p/then (fn [result]
(let [allowed? (boolean (nth result 0))
remaining (nth result 1)]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
:allowed? allowed?
:remaining remaining)
(-> limit
(assoc ::lresult/allowed? allowed?)
(assoc ::lresult/remaining remaining)
(assoc ::lresult/reset (dt/plus ts {unit 1})))))))))
(defn- process-limits
[redis user-id limits now]
(-> (p/all (map (partial process-limit redis user-id now) (reverse limits)))
(p/then (fn [results]
(let [remaining (->> results
(d/index-by ::name ::lresult/remaining)
(uri/map->query-string))
reset (->> results
(d/index-by ::name (comp dt/->seconds ::lresult/reset))
(uri/map->query-string))
rejected (->> results
(filter (complement ::lresult/allowed?))
(first))]
(when (and rejected (contains? cf/flags :warn-rpc-rate-limits))
(l/warn :hint "rejected rate limit"
:user-id (dm/str user-id)
:limit-service (-> rejected ::service name)
:limit-name (-> rejected ::name name)
:limit-strategy (-> rejected ::strategy name)))
{:enabled? true
:allowed? (some? rejected)
:headers {"x-rate-limit-remaining" remaining
"x-rate-limit-reset" reset}})))))
(defn- parse-limits
[service limits]
(let [default (some->> (cf/get :default-rpc-rlimit)
(us/conform ::limit-definition))
limits (cond->> limits
(some? default) (cons default))]
(->> (reverse limits)
(sequence (comp (map parse-limit)
(map #(assoc % ::service service))
(d/distinct-xf ::name))))))
(defn- handle-response
[f cfg params rres]
(if (:enabled? rres)
(let [headers {"x-rate-limit-remaining" (:remaining rres)
"x-rate-limit-reset" (:reset rres)}]
(when-not (:allowed? rres)
(ex/raise :type :rate-limit
:code :request-blocked
:hint "rate limit reached"
::http/headers headers))
(-> (f cfg params)
(p/then (fn [response]
(with-meta response
{::http/headers headers})))))
(f cfg params)))
(defn wrap
[{:keys [redis] :as cfg} f {service ::sv/name :as mdata}]
(let [limits (parse-limits service (::limits mdata))
default-rresp (p/resolved {:enabled? false})]
(if (and (seq limits)
(or (contains? cf/flags :rpc-rate-limit)
(contains? cf/flags :soft-rpc-rate-limit)))
(fn [cfg {:keys [::http/request] :as params}]
(let [user-id (or (:profile-id params)
(some-> request parse-client-ip)
uuid/zero)
rresp (when (and user-id @enabled?)
(let [redis (redis/get-or-connect redis ::rlimit default-options)
rresp (-> (process-limits redis user-id limits (dt/now))
(p/catch (fn [cause]
;; If we have an error on processing the
;; rate-limit we just skip it for do not cause
;; service interruption because of redis downtime
;; or similar situation.
(l/error :hint "error on processing rate-limit" :cause cause)
{:enabled? false})))]
;; If soft rate are enabled, we process the rate-limit but return
;; unprotected response.
(and (contains? cf/flags :soft-rpc-rate-limit) rresp)))]
(p/then (or rresp default-rresp)
(partial handle-response f cfg params))))
f)))

View file

@ -0,0 +1,33 @@
local tokensKey = KEYS[1]
local interval = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local timestamp = tonumber(ARGV[4])
local requested = tonumber(ARGV[5] or 1)
local fillTime = capacity / (rate / interval);
local ttl = math.floor(fillTime * 2)
local lastTokens = tonumber(redis.call("hget", tokensKey, "tokens"))
if lastTokens == nil then
lastTokens = capacity
end
local lastRefreshed = tonumber(redis.call("hget", tokensKey, "timestamp"))
if lastRefreshed == nil then
lastRefreshed = 0
end
local delta = math.max(0, (timestamp - lastRefreshed) / interval)
local filled = math.min(capacity, lastTokens + math.floor(delta * rate));
local allowed = filled >= requested
local newTokens = filled
if allowed then
newTokens = filled - requested
end
redis.call("hset", tokensKey, "tokens", newTokens, "timestamp", timestamp)
redis.call("expire", tokensKey, ttl)
return { allowed, newTokens }

View file

@ -0,0 +1,18 @@
local windowKey = KEYS[1]
local nreq = tonumber(ARGV[1])
local ttl = tonumber(ARGV[2])
local total = tonumber(redis.call("incr", windowKey))
redis.call("expire", windowKey, ttl)
local allowed = total <= nreq
local remaining = nreq - total
if remaining < 0 then
remaining = 0
end
return {allowed, remaining}

View file

@ -0,0 +1,68 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.rpc.semaphore
"Resource usage limits (in other words: semaphores)."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.metrics :as mtx]
[app.util.locks :as locks]
[app.util.services :as-alias sv]
[promesa.core :as p]))
(defprotocol IAsyncSemaphore
(acquire! [_])
(release! [_]))
(defn create
[& {:keys [permits metrics name]}]
(let [name (d/name name)
used (volatile! 0)
queue (volatile! (d/queue))
labels (into-array String [name])
lock (locks/create)]
(reify IAsyncSemaphore
(acquire! [_]
(let [d (p/deferred)]
(locks/locking lock
(if (< @used permits)
(do
(vswap! used inc)
(p/resolve! d))
(vswap! queue conj d)))
(mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels })
(mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels})
(mtx/run! metrics {:id :rpc-semaphore-acquires-total :inc 1 :labels labels})
d))
(release! [_]
(locks/locking lock
(if-let [item (peek @queue)]
(do
(vswap! queue pop)
(p/resolve! item))
(when (pos? @used)
(vswap! used dec))))
(mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels})
(mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels})))))
(defn wrap
[{:keys [metrics executors] :as cfg} f mdata]
(if-let [permits (::permits mdata)]
(let [sem (create {:permits permits
:metrics metrics
:name (::sv/name mdata)})]
(l/debug :hint "wrapping semaphore" :handler (::sv/name mdata) :permits permits)
(fn [cfg params]
(-> (acquire! sem)
(p/then (fn [_] (f cfg params)) (:default executors))
(p/finally (fn [_ _] (release! sem))))))
f))

View file

@ -11,6 +11,7 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.setup.builtin-templates] [app.setup.builtin-templates]
[app.setup.keys :as keys]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
[buddy.core.nonce :as bn] [buddy.core.nonce :as bn]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
@ -59,6 +60,8 @@
"all sessions on each restart, it is hightly recommeded setting up the " "all sessions on each restart, it is hightly recommeded setting up the "
"PENPOT_SECRET_KEY environment variable"))) "PENPOT_SECRET_KEY environment variable")))
(let [stored (-> (retrieve-all conn) (let [secret (or key (generate-random-key))]
(assoc :secret-key (or key (generate-random-key))))] (-> (retrieve-all conn)
(update stored :instance-id handle-instance-id conn (db/read-only? pool))))) (assoc :secret-key secret)
(assoc :tokens-key (keys/derive secret :salt "tokens" :size 32))
(update :instance-id handle-instance-id conn (db/read-only? pool))))))

View file

@ -14,7 +14,7 @@
[clojure.edn :as edn] [clojure.edn :as edn]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.core :as fs] [datoteka.fs :as fs]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare download-all!) (declare download-all!)

View file

@ -6,24 +6,17 @@
(ns app.setup.keys (ns app.setup.keys
"Keys derivation service." "Keys derivation service."
(:refer-clojure :exclude [derive])
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[buddy.core.kdf :as bk] [buddy.core.kdf :as bk]))
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(s/def ::secret-key ::us/string) (defn derive
(s/def ::props (s/keys :req-un [::secret-key])) "Derive a key from secret-key"
[secret-key & {:keys [salt size]}]
(defmethod ig/pre-init-spec :app.setup/keys [_] (us/assert! ::us/not-empty-string secret-key)
(s/keys :req-un [::props])) (let [engine (bk/engine {:key secret-key
(defmethod ig/init-key :app.setup/keys
[_ {:keys [props] :as cfg}]
(fn [& {:keys [salt _]}]
(let [engine (bk/engine {:key (:secret-key props)
:salt salt :salt salt
:alg :hkdf :alg :hkdf
:digest :blake2b-512})] :digest :blake2b-512})]
(bk/get-bytes engine 32)))) (bk/get-bytes engine size)))

View file

@ -20,7 +20,7 @@
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.core :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px])) [promesa.exec :as px]))

View file

@ -14,7 +14,7 @@
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.core :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px]) [promesa.exec :as px])

View file

@ -17,7 +17,7 @@
[app.worker :as wrk] [app.worker :as wrk]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.core :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px]) [promesa.exec :as px])

View file

@ -16,7 +16,7 @@
[app.worker :as wrk] [app.worker :as wrk]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.core :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec :as px])) [promesa.exec :as px]))

View file

@ -5,7 +5,7 @@
;; Copyright (c) UXBOX Labs SL ;; Copyright (c) UXBOX Labs SL
(ns app.tokens (ns app.tokens
"Tokens generation service." "Tokens generation API."
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
@ -13,20 +13,22 @@
[app.common.transit :as t] [app.common.transit :as t]
[app.util.time :as dt] [app.util.time :as dt]
[buddy.sign.jwe :as jwe] [buddy.sign.jwe :as jwe]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]))
[integrant.core :as ig]))
(defn- generate (s/def ::tokens-key bytes?)
[cfg claims]
(defn generate
[{:keys [tokens-key]} claims]
(us/assert! ::tokens-key tokens-key)
(let [payload (-> claims (let [payload (-> claims
(assoc :iat (dt/now)) (assoc :iat (dt/now))
(d/without-nils) (d/without-nils)
(t/encode))] (t/encode))]
(jwe/encrypt payload (::secret cfg) {:alg :a256kw :enc :a256gcm}))) (jwe/encrypt payload tokens-key {:alg :a256kw :enc :a256gcm})))
(defn- verify (defn verify
[cfg {:keys [token] :as params}] [{:keys [tokens-key]} {:keys [token] :as params}]
(let [payload (jwe/decrypt token (::secret cfg) {:alg :a256kw :enc :a256gcm}) (let [payload (jwe/decrypt token tokens-key {:alg :a256kw :enc :a256gcm})
claims (t/decode payload)] claims (t/decode payload)]
(when (and (dt/instant? (:exp claims)) (when (and (dt/instant? (:exp claims))
(dt/is-before? (:exp claims) (dt/now))) (dt/is-before? (:exp claims) (dt/now)))
@ -45,30 +47,7 @@
:params params)) :params params))
claims)) claims))
(defn- generate-predefined
[cfg {:keys [iss profile-id] :as params}]
(case iss
:profile-identity
(do
(us/verify uuid? profile-id)
(generate cfg (assoc params
:exp (dt/in-future {:days 30}))))
(ex/raise :type :internal
:code :not-implemented
:hint "no predefined token")))
(s/def ::keys fn?)
(defmethod ig/pre-init-spec ::tokens [_]
(s/keys :req-un [::keys]))
(defmethod ig/init-key ::tokens
[_ {:keys [keys] :as cfg}]
(let [secret (keys :salt "tokens" :size 32)
cfg (assoc cfg ::secret secret)]
(fn [action params]
(case action
:generate-predefined (generate-predefined cfg params)
:verify (verify cfg params)
:generate (generate cfg params)))))

View file

@ -8,7 +8,7 @@
"Bytes & Byte Streams helpers" "Bytes & Byte Streams helpers"
(:require (:require
[clojure.java.io :as io] [clojure.java.io :as io]
[datoteka.core :as fs] [datoteka.fs :as fs]
[yetti.adapter :as yt]) [yetti.adapter :as yt])
(:import (:import
com.github.luben.zstd.ZstdInputStream com.github.luben.zstd.ZstdInputStream
@ -23,6 +23,8 @@
org.apache.commons.io.IOUtils org.apache.commons.io.IOUtils
org.apache.commons.io.input.BoundedInputStream)) org.apache.commons.io.input.BoundedInputStream))
;; TODO: migrate to datoteka.io
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(def ^:const default-buffer-size (def ^:const default-buffer-size

View file

@ -0,0 +1,26 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.util.locks
"A syntactic helpers for using locks."
(:refer-clojure :exclude [locking])
(:import
java.util.concurrent.locks.ReentrantLock
java.util.concurrent.locks.Lock))
(defn create
[]
(ReentrantLock.))
(defmacro locking
[lsym & body]
(let [lsym (vary-meta lsym assoc :tag `Lock)]
`(do
(.lock ~lsym)
(try
~@body
(finally
(.unlock ~lsym))))))

View file

@ -27,16 +27,29 @@
;; Instant & Duration ;; Instant & Duration
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn temporal-unit
[o]
(if (instance? TemporalUnit o)
o
(case o
:nanos ChronoUnit/NANOS
:millis ChronoUnit/MILLIS
:micros ChronoUnit/MICROS
:seconds ChronoUnit/SECONDS
:minutes ChronoUnit/MINUTES
:hours ChronoUnit/HOURS
:days ChronoUnit/DAYS
:weeks ChronoUnit/WEEKS
:monts ChronoUnit/MONTHS)))
;; --- DURATION ;; --- DURATION
(defn- obj->duration (defn- obj->duration
[{:keys [days minutes seconds hours nanos millis]}] [params]
(cond-> (Duration/ofMillis (if (int? millis) ^long millis 0)) (reduce-kv (fn [o k v]
(int? days) (.plusDays ^long days) (.plus ^Duration o ^long v ^TemporalUnit (temporal-unit k)))
(int? hours) (.plusHours ^long hours) (Duration/ofMillis 0)
(int? minutes) (.plusMinutes ^long minutes) params))
(int? seconds) (.plusSeconds ^long seconds)
(int? nanos) (.plusNanos ^long nanos)))
(defn duration? (defn duration?
[v] [v]
@ -57,20 +70,17 @@
:else :else
(obj->duration ms-or-obj))) (obj->duration ms-or-obj)))
(defn ->seconds
[d]
(-> d inst-ms (/ 1000) int))
(defn diff (defn diff
[t1 t2] [t1 t2]
(Duration/between t1 t2)) (Duration/between t1 t2))
(defn truncate (defn truncate
[o unit] [o unit]
(let [unit (if (instance? TemporalUnit unit) (let [unit (temporal-unit unit)]
unit
(case unit
:nanos ChronoUnit/NANOS
:millis ChronoUnit/MILLIS
:micros ChronoUnit/MICROS
:seconds ChronoUnit/SECONDS
:minutes ChronoUnit/MINUTES))]
(cond (cond
(instance? Instant o) (instance? Instant o)
(.truncatedTo ^Instant o ^TemporalUnit unit) (.truncatedTo ^Instant o ^TemporalUnit unit)
@ -159,11 +169,11 @@
(defn in-future (defn in-future
[v] [v]
(plus (now) (duration v))) (plus (now) v))
(defn in-past (defn in-past
[v] [v]
(minus (now) (duration v))) (minus (now) v))
(defn instant->zoned-date-time (defn instant->zoned-date-time
[v] [v]
@ -315,3 +325,13 @@
CronExpression CronExpression
(-edn [o] (pr-str o))) (-edn [o] (pr-str o)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Measurement Helpers
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn tpoint
"Create a measurement checkpoint for time measurement of potentially
asynchronous flow."
[]
(let [p1 (System/nanoTime)]
#(duration {:nanos (- (System/nanoTime) p1)})))

View file

@ -10,6 +10,7 @@
[app.emails :as emails] [app.emails :as emails]
[app.http.awsns :as awsns] [app.http.awsns :as awsns]
[app.test-helpers :as th] [app.test-helpers :as th]
[app.tokens :as tokens]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.pprint :refer [pprint]] [clojure.pprint :refer [pprint]]
[clojure.test :as t] [clojure.test :as t]
@ -100,9 +101,9 @@
(t/deftest test-parse-bounce-report (t/deftest test-parse-bounce-report
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
tokens (:app.tokens/tokens th/*system*) sprops (:app.setup/props th/*system*)
cfg {:tokens tokens} cfg {:sprops sprops}
report (bounce-report {:token (tokens :generate-predefined report (bounce-report {:token (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})}) :profile-id (:id profile)})})
result (#'awsns/parse-notification cfg report)] result (#'awsns/parse-notification cfg report)]
@ -117,9 +118,9 @@
(t/deftest test-parse-complaint-report (t/deftest test-parse-complaint-report
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
tokens (:app.tokens/tokens th/*system*) sprops (:app.setup/props th/*system*)
cfg {:tokens tokens} cfg {:sprops sprops}
report (complaint-report {:token (tokens :generate-predefined report (complaint-report {:token (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})}) :profile-id (:id profile)})})
result (#'awsns/parse-notification cfg report)] result (#'awsns/parse-notification cfg report)]
@ -132,8 +133,8 @@
)) ))
(t/deftest test-parse-complaint-report-without-token (t/deftest test-parse-complaint-report-without-token
(let [tokens (:app.tokens/tokens th/*system*) (let [sprops (:app.setup/props th/*system*)
cfg {:tokens tokens} cfg {:sprops sprops}
report (complaint-report {:token ""}) report (complaint-report {:token ""})
result (#'awsns/parse-notification cfg report)] result (#'awsns/parse-notification cfg report)]
(t/is (= "complaint" (:type result))) (t/is (= "complaint" (:type result)))
@ -145,10 +146,10 @@
(t/deftest test-process-bounce-report (t/deftest test-process-bounce-report
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
tokens (:app.tokens/tokens th/*system*) sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*) pool (:app.db/pool th/*system*)
cfg {:tokens tokens :pool pool} cfg {:sprops sprops :pool pool}
report (bounce-report {:token (tokens :generate-predefined report (bounce-report {:token (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})}) :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)] report (#'awsns/parse-notification cfg report)]
@ -174,10 +175,10 @@
(t/deftest test-process-complaint-report (t/deftest test-process-complaint-report
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
tokens (:app.tokens/tokens th/*system*) sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*) pool (:app.db/pool th/*system*)
cfg {:tokens tokens :pool pool} cfg {:sprops sprops :pool pool}
report (complaint-report {:token (tokens :generate-predefined report (complaint-report {:token (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})}) :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)] report (#'awsns/parse-notification cfg report)]
@ -205,11 +206,11 @@
(t/deftest test-process-bounce-report-to-self (t/deftest test-process-bounce-report-to-self
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
tokens (:app.tokens/tokens th/*system*) sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*) pool (:app.db/pool th/*system*)
cfg {:tokens tokens :pool pool} cfg {:sprops sprops :pool pool}
report (bounce-report {:email (:email profile) report (bounce-report {:email (:email profile)
:token (tokens :generate-predefined :token (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})}) :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)] report (#'awsns/parse-notification cfg report)]
@ -227,11 +228,11 @@
(t/deftest test-process-complaint-report-to-self (t/deftest test-process-complaint-report-to-self
(let [profile (th/create-profile* 1) (let [profile (th/create-profile* 1)
tokens (:app.tokens/tokens th/*system*) sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*) pool (:app.db/pool th/*system*)
cfg {:tokens tokens :pool pool} cfg {:sprops sprops :pool pool}
report (complaint-report {:email (:email profile) report (complaint-report {:email (:email profile)
:token (tokens :generate-predefined :token (tokens/generate sprops
{:iss :profile-identity {:iss :profile-identity
:profile-id (:id profile)})}) :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)] report (#'awsns/parse-notification cfg report)]

View file

@ -9,9 +9,10 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.rpc.mutations.profile :as profile]
[app.rpc.commands.auth :as cauth] [app.rpc.commands.auth :as cauth]
[app.rpc.mutations.profile :as profile]
[app.test-helpers :as th] [app.test-helpers :as th]
[app.tokens :as tokens]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.test :as t] [clojure.test :as t]
@ -196,8 +197,8 @@
(t/deftest prepare-and-register-with-invitation-and-disabled-registration-1 (t/deftest prepare-and-register-with-invitation-and-disabled-registration-1
(with-redefs [app.config/flags [:disable-registration]] (with-redefs [app.config/flags [:disable-registration]]
(let [tokens-fn (:app.tokens/tokens th/*system*) (let [sprops (:app.setup/props th/*system*)
itoken (tokens-fn :generate itoken (tokens/generate sprops
{:iss :team-invitation {:iss :team-invitation
:exp (dt/in-future "48h") :exp (dt/in-future "48h")
:role :editor :role :editor
@ -226,8 +227,8 @@
(t/deftest prepare-and-register-with-invitation-and-disabled-registration-2 (t/deftest prepare-and-register-with-invitation-and-disabled-registration-2
(with-redefs [app.config/flags [:disable-registration]] (with-redefs [app.config/flags [:disable-registration]]
(let [tokens-fn (:app.tokens/tokens th/*system*) (let [sprops (:app.setup/props th/*system*)
itoken (tokens-fn :generate itoken (tokens/generate sprops
{:iss :team-invitation {:iss :team-invitation
:exp (dt/in-future "48h") :exp (dt/in-future "48h")
:role :editor :role :editor

View file

@ -59,7 +59,7 @@
:path (-> "app/test_files/template.penpot" io/resource fs/path)}] :path (-> "app/test_files/template.penpot" io/resource fs/path)}]
config (-> main/system-config config (-> main/system-config
(merge main/worker-config) (merge main/worker-config)
(assoc-in [:app.msgbus/msgbus :redis-uri] (:redis-uri config)) (assoc-in [:app.redis/redis :uri] (:redis-uri config))
(assoc-in [:app.db/pool :uri] (:database-uri config)) (assoc-in [:app.db/pool :uri] (:database-uri config))
(assoc-in [:app.db/pool :username] (:database-username config)) (assoc-in [:app.db/pool :username] (:database-username config))
(assoc-in [:app.db/pool :password] (:database-password config)) (assoc-in [:app.db/pool :password] (:database-password config))

View file

@ -28,7 +28,8 @@
:exclusions [org.clojure/data.json]} :exclusions [org.clojure/data.json]}
frankiesardo/linked {:mvn/version "1.3.0"} frankiesardo/linked {:mvn/version "1.3.0"}
commons-io/commons-io {:mvn/version "2.11.0"}
funcool/datoteka {:mvn/version "3.0.65"}
com.sun.mail/jakarta.mail {:mvn/version "2.0.1"} com.sun.mail/jakarta.mail {:mvn/version "2.0.1"}
;; exception printing ;; exception printing

View file

@ -1,7 +1,7 @@
#!/usr/bin/env bash #!/usr/bin/env bash
export PENPOT_FLAGS="enable-asserts enable-audit-log $PENPOT_FLAGS" export PENPOT_FLAGS="enable-asserts enable-audit-log $PENPOT_FLAGS"
export OPTIONS="-A:dev -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -J-XX:+UseZGC -J-XX:ConcGCThreads=1 -J-XX:-OmitStackTraceInFastThrow -J-Xms50m -J-Xmx512m"; export OPTIONS="-A:dev -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -J-XX:+UseG1GC -J-XX:-OmitStackTraceInFastThrow -J-Xms50m -J-Xmx512m";
export OPTIONS_EVAL="nil" export OPTIONS_EVAL="nil"
# export OPTIONS_EVAL="(set! *warn-on-reflection* true)" # export OPTIONS_EVAL="(set! *warn-on-reflection* true)"

View file

@ -10,6 +10,7 @@
parse-double group-by iteration]) parse-double group-by iteration])
#?(:cljs #?(:cljs
(:require-macros [app.common.data])) (:require-macros [app.common.data]))
(:require (:require
[app.common.math :as mth] [app.common.math :as mth]
[clojure.set :as set] [clojure.set :as set]

View file

@ -81,8 +81,8 @@
(deref [_] cause))) (deref [_] cause)))
(ns-unmap 'app.common.exceptions '->WrappedException) #?(:clj (ns-unmap 'app.common.exceptions '->WrappedException))
(ns-unmap 'app.common.exceptions 'map->WrappedException) #?(:clj (ns-unmap 'app.common.exceptions 'map->WrappedException))
(defn wrapped? (defn wrapped?
[o] [o]

View file

@ -133,9 +133,9 @@
(dm/str v))] (dm/str v))]
(s/def ::rgb-color-str (s/conformer conformer unformer))) (s/def ::rgb-color-str (s/conformer conformer unformer)))
;; --- SPEC: set/vec of valid Keywords ;; --- SPEC: set/vector of Keywords
(letfn [(conform-fn [dest s] (letfn [(conformer-fn [dest s]
(let [xform (keep (fn [s] (let [xform (keep (fn [s]
(cond (cond
(string? s) (keyword s) (string? s) (keyword s)
@ -144,17 +144,38 @@
(cond (cond
(set? s) (into dest xform s) (set? s) (into dest xform s)
(string? s) (into dest xform (str/words s)) (string? s) (into dest xform (str/words s))
:else ::s/invalid)))] :else ::s/invalid)))
(unformer-fn [v]
(str/join " " (map name v)))]
(s/def ::set-of-valid-keywords (s/def ::set-of-keywords
(s/conformer (s/conformer (partial conformer-fn #{}) unformer-fn))
(fn [s] (conform-fn #{} s))
(fn [s] (str/join " " (map name s)))))
(s/def ::vec-of-valid-keywords (s/def ::vector-of-keywords
(s/conformer (s/conformer (partial conformer-fn []) unformer-fn)))
(fn [s] (conform-fn [] s))
(fn [s] (str/join " " (map name s)))))) ;; --- SPEC: set/vector of strings
(def non-empty-strings-xf
(comp
(filter string?)
(remove str/empty?)
(remove str/blank?)))
(letfn [(conformer-fn [dest v]
(cond
(string? v) (into dest non-empty-strings-xf (str/split v #"[\s,]+"))
(vector? v) (into dest non-empty-strings-xf v)
(set? v) (into dest non-empty-strings-xf v)
:else ::s/invalid))
(unformer-fn [v]
(str/join "," v))]
(s/def ::set-of-strings
(s/conformer (partial conformer-fn #{}) unformer-fn))
(s/def ::vector-of-strings
(s/conformer (partial conformer-fn []) unformer-fn)))
;; --- SPEC: set-of-valid-emails ;; --- SPEC: set-of-valid-emails
@ -173,23 +194,15 @@
(str/join " " v))] (str/join " " v))]
(s/def ::set-of-valid-emails (s/conformer conformer unformer))) (s/def ::set-of-valid-emails (s/conformer conformer unformer)))
;; --- SPEC: set-of-non-empty-strings ;; --- SPEC: query-string
(def non-empty-strings-xf
(comp
(filter string?)
(remove str/empty?)
(remove str/blank?)))
(letfn [(conformer [s] (letfn [(conformer [s]
(cond (if (string? s)
(string? s) (->> (str/split s #"\s*,\s*") (ex/try* #(u/query-string->map s) (constantly ::s/invalid))
(into #{} non-empty-strings-xf)) s))
(set? s) (into #{} non-empty-strings-xf s)
:else ::s/invalid))
(unformer [s] (unformer [s]
(str/join "," s))] (u/map->query-string s))]
(s/def ::set-of-non-empty-strings (s/conformer conformer unformer))) (s/def ::query-string (s/conformer conformer unformer)))
;; --- SPECS WITHOUT CONFORMER ;; --- SPECS WITHOUT CONFORMER

View file

@ -32,7 +32,7 @@
:dev :dev
{:extra-paths ["dev"] {:extra-paths ["dev"]
:extra-deps :extra-deps
{thheller/shadow-cljs {:mvn/version "2.19.8"} {thheller/shadow-cljs {:mvn/version "2.19.9"}
org.clojure/tools.namespace {:mvn/version "RELEASE"} org.clojure/tools.namespace {:mvn/version "RELEASE"}
cider/cider-nrepl {:mvn/version "0.28.4"}}} cider/cider-nrepl {:mvn/version "0.28.4"}}}

View file

@ -48,7 +48,7 @@
"prettier": "^2.7.1", "prettier": "^2.7.1",
"rimraf": "^3.0.0", "rimraf": "^3.0.0",
"sass": "^1.53.0", "sass": "^1.53.0",
"shadow-cljs": "2.19.8" "shadow-cljs": "2.19.9"
}, },
"dependencies": { "dependencies": {
"@sentry/browser": "^6.17.4", "@sentry/browser": "^6.17.4",

View file

@ -116,6 +116,7 @@
(some? position-modifier) (some? position-modifier)
(gpt/transform position-modifier)) (gpt/transform position-modifier))
content (:content draft) content (:content draft)
pos-x (* (:x position) zoom) pos-x (* (:x position) zoom)
pos-y (* (:y position) zoom) pos-y (* (:y position) zoom)

View file

@ -31,6 +31,7 @@
(let [section (get-in route [:data :name]) (let [section (get-in route [:data :name])
profile (mf/deref refs/profile) profile (mf/deref refs/profile)
locale (mf/deref i18n/locale)] locale (mf/deref i18n/locale)]
(mf/use-effect (mf/use-effect
#(when (nil? profile) #(when (nil? profile)
(st/emit! (rt/nav :auth-login)))) (st/emit! (rt/nav :auth-login))))

View file

@ -4711,10 +4711,10 @@ shadow-cljs-jar@1.3.2:
resolved "https://registry.yarnpkg.com/shadow-cljs-jar/-/shadow-cljs-jar-1.3.2.tgz#97273afe1747b6a2311917c1c88d9e243c81957b" resolved "https://registry.yarnpkg.com/shadow-cljs-jar/-/shadow-cljs-jar-1.3.2.tgz#97273afe1747b6a2311917c1c88d9e243c81957b"
integrity sha512-XmeffAZHv8z7451kzeq9oKh8fh278Ak+UIOGGrapyqrFBB773xN8vMQ3O7J7TYLnb9BUwcqadKkmgaq7q6fhZg== integrity sha512-XmeffAZHv8z7451kzeq9oKh8fh278Ak+UIOGGrapyqrFBB773xN8vMQ3O7J7TYLnb9BUwcqadKkmgaq7q6fhZg==
shadow-cljs@2.19.8: shadow-cljs@2.19.9:
version "2.19.8" version "2.19.9"
resolved "https://registry.yarnpkg.com/shadow-cljs/-/shadow-cljs-2.19.8.tgz#1ce96cab3e4903bed8d401ffbe88b8939f5454d3" resolved "https://registry.yarnpkg.com/shadow-cljs/-/shadow-cljs-2.19.9.tgz#1e6b6115241666504c705ca8e6d6c9c1bc64add2"
integrity sha512-6qek3mcAP0hrnC5FxrTebBrgLGpOuhlnp06vdxp6g0M5Gl6w2Y0hzSwa1s2K8fMOkzE4/ciQor75b2y64INgaw== integrity sha512-rLPv98HBIKf/4Kxjxpq0gXaNBVupJ/ii+OApHuqXFuwJOgU94DbNDSP4Bzjo+az2tTD11yTzXiYZsTqyAy+VBQ==
dependencies: dependencies:
node-libs-browser "^2.2.1" node-libs-browser "^2.2.1"
readline-sync "^1.4.7" readline-sync "^1.4.7"