From bb5a4c0fa545f104f387304bb9b5305cd46e392c Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 16 Nov 2023 11:02:25 +0100 Subject: [PATCH 01/14] :sparkles: Update yetti and adapt for ring-2.0 --- backend/deps.edn | 4 +- backend/resources/log4j2-devenv.xml | 2 +- backend/src/app/auth/oidc.clj | 10 +- backend/src/app/http.clj | 46 ++-- backend/src/app/http/access_token.clj | 11 +- backend/src/app/http/assets.clj | 14 +- backend/src/app/http/awsns.clj | 8 +- backend/src/app/http/debug.clj | 138 +++++------ backend/src/app/http/errors.clj | 154 ++++++------ backend/src/app/http/middleware.clj | 113 ++++----- backend/src/app/http/session.clj | 8 +- backend/src/app/http/websocket.clj | 35 +-- backend/src/app/loggers/audit.clj | 8 +- backend/src/app/main.clj | 1 - backend/src/app/rpc.clj | 14 +- backend/src/app/rpc/commands/binfile.clj | 12 +- backend/src/app/rpc/cond.clj | 4 +- backend/src/app/rpc/doc.clj | 24 +- backend/src/app/rpc/helpers.clj | 4 +- backend/src/app/util/websocket.clj | 227 +++++++++--------- .../http_middleware_access_token_test.clj | 6 +- backend/test/backend_tests/rpc_audit_test.clj | 2 +- .../rpc_cond_middleware_test.clj | 2 +- 23 files changed, 407 insertions(+), 440 deletions(-) diff --git a/backend/deps.edn b/backend/deps.edn index d8ff1d16ed..b3cacd663a 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -21,8 +21,8 @@ java-http-clj/java-http-clj {:mvn/version "0.4.3"} funcool/yetti - {:git/tag "v9.16" - :git/sha "7df3e08" + {:git/tag "v10.0" + :git/sha "520613f" :git/url "https://github.com/funcool/yetti.git" :exclusions [org.slf4j/slf4j-api]} diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml index 70e54ba76c..4fd93925c9 100644 --- a/backend/resources/log4j2-devenv.xml +++ b/backend/resources/log4j2-devenv.xml @@ -31,7 +31,7 @@ - + diff --git a/backend/src/app/auth/oidc.clj b/backend/src/app/auth/oidc.clj index 733665151c..206ec39157 100644 --- a/backend/src/app/auth/oidc.clj +++ b/backend/src/app/auth/oidc.clj @@ -31,7 +31,7 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] - [yetti.response :as-alias yrs])) + [ring.response :as-alias rres])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HELPERS @@ -479,8 +479,8 @@ (defn- redirect-response [uri] - {::yrs/status 302 - ::yrs/headers {"location" (str uri)}}) + {::rres/status 302 + ::rres/headers {"location" (str uri)}}) (defn- generate-error-redirect [_ cause] @@ -557,8 +557,8 @@ :props props :exp (dt/in-future "4h")}) uri (build-auth-uri cfg state)] - {::yrs/status 200 - ::yrs/body {:redirect-uri uri}})) + {::rres/status 200 + ::rres/body {:redirect-uri uri}})) (defn- callback-handler [cfg request] diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 59ba338614..5992258273 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -23,15 +23,14 @@ [app.metrics :as mtx] [app.rpc :as-alias rpc] [app.rpc.doc :as-alias rpc.doc] - [app.worker :as wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.exec :as px] [reitit.core :as r] [reitit.middleware :as rr] - [yetti.adapter :as yt] - [yetti.request :as yrq] - [yetti.response :as-alias yrs])) + [ring.request :as rreq] + [ring.response :as-alias rres] + [yetti.adapter :as yt])) (declare router-handler) @@ -63,8 +62,7 @@ ::max-multipart-body-size ::router ::handler - ::io-threads - ::wrk/executor])) + ::io-threads])) (defmethod ig/init-key ::server [_ {:keys [::handler ::router ::host ::port] :as cfg}] @@ -75,11 +73,9 @@ :http/max-multipart-body-size (::max-multipart-body-size cfg) :xnio/io-threads (or (::io-threads cfg) (max 3 (px/get-available-processors))) - :xnio/worker-threads (or (::worker-threads cfg) - (max 6 (px/get-available-processors))) - :xnio/dispatch true - :socket/backlog 4069 - :ring/async true} + :xnio/dispatch :virtual + :ring/compat :ring2 + :socket/backlog 4069} handler (cond (some? router) @@ -102,13 +98,13 @@ (yt/stop! server)) (defn- not-found-handler - [_ respond _] - (respond {::yrs/status 404})) + [_] + {::rres/status 404}) (defn- router-handler [router] (letfn [(resolve-handler [request] - (if-let [match (r/match-by-path router (yrq/path request))] + (if-let [match (r/match-by-path router (rreq/path request))] (let [params (:path-params match) result (:result match) handler (or (:handler result) not-found-handler) @@ -120,18 +116,15 @@ (let [{:keys [body] :as response} (errors/handle cause request)] (cond-> response (map? body) - (-> (update ::yrs/headers assoc "content-type" "application/transit+json") - (assoc ::yrs/body (t/encode-str body {:type :json-verbose}))))))] + (-> (update ::rres/headers assoc "content-type" "application/transit+json") + (assoc ::rres/body (t/encode-str body {:type :json-verbose}))))))] - (fn [request respond _] - (let [handler (resolve-handler request) - exchange (yrq/exchange request)] - (handler - (fn [response] - (yt/dispatch! exchange (partial respond response))) - (fn [cause] - (let [response (on-error cause request)] - (yt/dispatch! exchange (partial respond response))))))))) + (fn [request] + (let [handler (resolve-handler request)] + (try + (handler) + (catch Throwable cause + (on-error cause request))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HTTP ROUTER @@ -160,8 +153,7 @@ [session/soft-auth cfg] [actoken/soft-auth cfg] [mw/errors errors/handle] - [mw/restrict-methods] - [mw/with-dispatch :vthread]]} + [mw/restrict-methods]]} (::mtx/routes cfg) (::assets/routes cfg) diff --git a/backend/src/app/http/access_token.clj b/backend/src/app/http/access_token.clj index 3f39e41211..bfddbb42d7 100644 --- a/backend/src/app/http/access_token.clj +++ b/backend/src/app/http/access_token.clj @@ -11,13 +11,13 @@ [app.db :as db] [app.main :as-alias main] [app.tokens :as tokens] - [yetti.request :as yrq])) + [ring.request :as rreq])) (def header-re #"^Token\s+(.*)") (defn- get-token [request] - (some->> (yrq/get-header request "authorization") + (some->> (rreq/get-header request "authorization") (re-matches header-re) (second))) @@ -30,7 +30,7 @@ "SELECT perms, profile_id, expires_at FROM access_token WHERE id = ? - AND (expires_at IS NULL + AND (expires_at IS NULL OR (expires_at > now()));") (defn- get-token-data @@ -54,9 +54,8 @@ (l/trace :hint "exception on decoding malformed token" :cause cause) request)))] - (fn [request respond raise] - (let [request (handle-request request)] - (handler request respond raise))))) + (fn [request] + (handler (handle-request request))))) (defn- wrap-authz "Authorization middleware, will be executed synchronously on vthread." diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index efd494249f..286cef6556 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -16,7 +16,7 @@ [app.util.time :as dt] [clojure.spec.alpha :as s] [integrant.core :as ig] - [yetti.response :as-alias yrs])) + [ring.response :as-alias rres])) (def ^:private cache-max-age (dt/duration {:hours 24})) @@ -37,8 +37,8 @@ (defn- serve-object-from-s3 [{:keys [::sto/storage] :as cfg} obj] (let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})] - {::yrs/status 307 - ::yrs/headers {"location" (str url) + {::rres/status 307 + ::rres/headers {"location" (str url) "x-host" (cond-> host port (str ":" port)) "x-mtype" (-> obj meta :content-type) "cache-control" (str "max-age=" (inst-ms cache-max-age))}})) @@ -51,8 +51,8 @@ headers {"x-accel-redirect" (:path purl) "content-type" (:content-type mdata) "cache-control" (str "max-age=" (inst-ms cache-max-age))}] - {::yrs/status 204 - ::yrs/headers headers})) + {::rres/status 204 + ::rres/headers headers})) (defn- serve-object "Helper function that returns the appropriate response depending on @@ -70,7 +70,7 @@ obj (sto/get-object storage id)] (if obj (serve-object cfg obj) - {::yrs/status 404}))) + {::rres/status 404}))) (defn- generic-handler "A generic handler helper/common code for file-media based handlers." @@ -81,7 +81,7 @@ sobj (sto/get-object storage (kf mobj))] (if sobj (serve-object cfg sobj) - {::yrs/status 404}))) + {::rres/status 404}))) (defn file-objects-handler "Handler that serves storage objects by file media id." diff --git a/backend/src/app/http/awsns.clj b/backend/src/app/http/awsns.clj index 681e7045f7..f9e9179b11 100644 --- a/backend/src/app/http/awsns.clj +++ b/backend/src/app/http/awsns.clj @@ -20,8 +20,8 @@ [integrant.core :as ig] [jsonista.core :as j] [promesa.exec :as px] - [yetti.request :as yrq] - [yetti.response :as-alias yrs])) + [ring.request :as rreq] + [ring.response :as-alias rres])) (declare parse-json) (declare handle-request) @@ -37,9 +37,9 @@ (defmethod ig/init-key ::routes [_ {:keys [::wrk/executor] :as cfg}] (letfn [(handler [request] - (let [data (-> request yrq/body slurp)] + (let [data (-> request rreq/body slurp)] (px/run! executor #(handle-request cfg data))) - {::yrs/status 200})] + {::rres/status 200})] ["/sns" {:handler handler :allowed-methods #{:post}}])) diff --git a/backend/src/app/http/debug.clj b/backend/src/app/http/debug.clj index a61017edfe..1e18b8517b 100644 --- a/backend/src/app/http/debug.clj +++ b/backend/src/app/http/debug.clj @@ -32,8 +32,8 @@ [integrant.core :as ig] [markdown.core :as md] [markdown.transformers :as mdt] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [ring.request :as rreq] + [ring.response :as rres])) ;; (selmer.parser/cache-off!) @@ -43,10 +43,10 @@ (defn index-handler [_cfg _request] - {::yrs/status 200 - ::yrs/headers {"content-type" "text/html"} - ::yrs/body (-> (io/resource "app/templates/debug.tmpl") - (tmpl/render {}))}) + {::rres/status 200 + ::rres/headers {"content-type" "text/html"} + ::rres/body (-> (io/resource "app/templates/debug.tmpl") + (tmpl/render {}))}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; FILE CHANGES @@ -55,17 +55,17 @@ (defn prepare-response [body] (let [headers {"content-type" "application/transit+json"}] - {::yrs/status 200 - ::yrs/body body - ::yrs/headers headers})) + {::rres/status 200 + ::rres/body body + ::rres/headers headers})) (defn prepare-download-response [body filename] (let [headers {"content-disposition" (str "attachment; filename=" filename) "content-type" "application/octet-stream"}] - {::yrs/status 200 - ::yrs/body body - ::yrs/headers headers})) + {::rres/status 200 + ::rres/body body + ::rres/headers headers})) (def sql:retrieve-range-of-changes "select revn, changes from file_change where file_id=? and revn >= ? and revn <= ? order by revn") @@ -107,8 +107,8 @@ (db/update! conn :file {:data data} {:id file-id}) - {::yrs/status 201 - ::yrs/body "OK CREATED"}))) + {::rres/status 201 + ::rres/body "OK CREATED"}))) :else (prepare-response (blob/decode data)))))) @@ -137,8 +137,8 @@ {:data data :deleted-at nil} {:id file-id}) - {::yrs/status 200 - ::yrs/body "OK UPDATED"}) + {::rres/status 200 + ::rres/body "OK UPDATED"}) (db/run! pool (fn [{:keys [::db/conn]}] (create-file conn {:id file-id @@ -148,15 +148,15 @@ (db/update! conn :file {:data data} {:id file-id}) - {::yrs/status 201 - ::yrs/body "OK CREATED"})))) + {::rres/status 201 + ::rres/body "OK CREATED"})))) - {::yrs/status 500 - ::yrs/body "ERROR"}))) + {::rres/status 500 + ::rres/body "ERROR"}))) (defn file-data-handler [cfg request] - (case (yrq/method request) + (case (rreq/method request) :get (retrieve-file-data cfg request) :post (upload-file-data cfg request) (ex/raise :type :http @@ -238,12 +238,12 @@ 1 (render-template-v1 report) 2 (render-template-v2 report) 3 (render-template-v3 report))] - {::yrs/status 200 - ::yrs/body result - ::yrs/headers {"content-type" "text/html; charset=utf-8" - "x-robots-tag" "noindex"}}) - {::yrs/status 404 - ::yrs/body "not found"}))) + {::rres/status 200 + ::rres/body result + ::rres/headers {"content-type" "text/html; charset=utf-8" + "x-robots-tag" "noindex"}}) + {::rres/status 404 + ::rres/body "not found"}))) (def sql:error-reports "SELECT id, created_at, @@ -256,11 +256,11 @@ [{:keys [::db/pool]} _request] (let [items (->> (db/exec! pool [sql:error-reports]) (map #(update % :created-at dt/format-instant :rfc1123)))] - {::yrs/status 200 - ::yrs/body (-> (io/resource "app/templates/error-list.tmpl") - (tmpl/render {:items items})) - ::yrs/headers {"content-type" "text/html; charset=utf-8" - "x-robots-tag" "noindex"}})) + {::rres/status 200 + ::rres/body (-> (io/resource "app/templates/error-list.tmpl") + (tmpl/render {:items items})) + ::rres/headers {"content-type" "text/html; charset=utf-8" + "x-robots-tag" "noindex"}})) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; EXPORT/IMPORT @@ -296,14 +296,14 @@ ::binf/profile-id profile-id ::binf/project-id project-id)) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "OK CLONED"}) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "OK CLONED"}) - {::yrs/status 200 - ::yrs/body (io/input-stream path) - ::yrs/headers {"content-type" "application/octet-stream" - "content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}})))) + {::rres/status 200 + ::rres/body (io/input-stream path) + ::rres/headers {"content-type" "application/octet-stream" + "content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}})))) @@ -334,9 +334,9 @@ ::binf/profile-id profile-id ::binf/project-id project-id)) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "OK"})) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "OK"})) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; ACTIONS @@ -363,34 +363,34 @@ (db/update! pool :profile {:is-blocked true} {:id (:id profile)}) (db/delete! pool :http-session {:profile-id (:id profile)}) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))}) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))}) (contains? params :unblock) (do (db/update! pool :profile {:is-blocked false} {:id (:id profile)}) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))}) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))}) (contains? params :resend) (if (:is-blocked profile) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "PROFILE ALREADY BLOCKED"} + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "PROFILE ALREADY BLOCKED"} (do (auth/send-email-verification! pool props profile) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "RESENDED FOR '%'" (:email profile))})) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "RESENDED FOR '%'" (:email profile))})) :else (do (db/update! pool :profile {:is-active true} {:id (:id profile)}) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))})))) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))})))) (defn- reset-file-data-version @@ -420,9 +420,9 @@ :migrate? false :inc-revn? false :save? true) - {::yrs/status 200 - ::yrs/headers {"content-type" "text/plain"} - ::yrs/body "OK"})) + {::rres/status 200 + ::rres/headers {"content-type" "text/plain"} + ::rres/body "OK"})) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -434,13 +434,13 @@ [{:keys [::db/pool]} _] (try (db/exec-one! pool ["select count(*) as count from server_prop;"]) - {::yrs/status 200 - ::yrs/body "OK"} + {::rres/status 200 + ::rres/body "OK"} (catch Throwable cause (l/warn :hint "unable to execute query on health handler" :cause cause) - {::yrs/status 503 - ::yrs/body "KO"}))) + {::rres/status 503 + ::rres/body "KO"}))) (defn changelog-handler [_ _] @@ -449,11 +449,11 @@ (md->html [text] (md/md-to-html-string text :replacement-transformers (into [transform-emoji] mdt/transformer-vector)))] (if-let [clog (io/resource "changelog.md")] - {::yrs/status 200 - ::yrs/headers {"content-type" "text/html; charset=utf-8"} - ::yrs/body (-> clog slurp md->html)} - {::yrs/status 404 - ::yrs/body "NOT FOUND"}))) + {::rres/status 200 + ::rres/headers {"content-type" "text/html; charset=utf-8"} + ::rres/body (-> clog slurp md->html)} + {::rres/status 404 + ::rres/body "NOT FOUND"}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; INIT diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj index ce233b142a..5bb14cc37a 100644 --- a/backend/src/app/http/errors.clj +++ b/backend/src/app/http/errors.clj @@ -16,14 +16,14 @@ [app.http.session :as-alias session] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [ring.request :as rreq] + [ring.response :as rres])) (defn- parse-client-ip [request] - (or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) - (yrq/get-header request "x-real-ip") - (yrq/remote-addr request))) + (or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first) + (rreq/get-header request "x-real-ip") + (rreq/remote-addr request))) (defn request->context "Extracts error report relevant context data from request." @@ -34,10 +34,10 @@ {:request/path (:path request) :request/method (:method request) :request/params (:params request) - :request/user-agent (yrq/get-header request "user-agent") + :request/user-agent (rreq/get-header request "user-agent") :request/ip-addr (parse-client-ip request) :request/profile-id (:uid claims) - :version/frontend (or (yrq/get-header request "x-frontend-version") "unknown") + :version/frontend (or (rreq/get-header request "x-frontend-version") "unknown") :version/backend (:full cf/version)})) (defmulti handle-error @@ -50,30 +50,30 @@ (defmethod handle-error :authentication [err _ _] - {::yrs/status 401 - ::yrs/body (ex-data err)}) + {::rres/status 401 + ::rres/body (ex-data err)}) (defmethod handle-error :authorization [err _ _] - {::yrs/status 403 - ::yrs/body (ex-data err)}) + {::rres/status 403 + ::rres/body (ex-data err)}) (defmethod handle-error :restriction [err _ _] - {::yrs/status 400 - ::yrs/body (ex-data err)}) + {::rres/status 400 + ::rres/body (ex-data err)}) (defmethod handle-error :rate-limit [err _ _] (let [headers (-> err ex-data ::http/headers)] - {::yrs/status 429 - ::yrs/headers headers})) + {::rres/status 429 + ::rres/headers headers})) (defmethod handle-error :concurrency-limit [err _ _] (let [headers (-> err ex-data ::http/headers)] - {::yrs/status 429 - ::yrs/headers headers})) + {::rres/status 429 + ::rres/headers headers})) (defmethod handle-error :validation [err request parent-cause] @@ -81,38 +81,38 @@ (cond (= code :spec-validation) (let [explain (ex/explain data)] - {::yrs/status 400 - ::yrs/body (-> data - (dissoc ::s/problems ::s/value ::s/spec) - (cond-> explain (assoc :explain explain)))}) + {::rres/status 400 + ::rres/body (-> data + (dissoc ::s/problems ::s/value ::s/spec) + (cond-> explain (assoc :explain explain)))}) (= code :params-validation) (let [explain (::sm/explain data) explain (sm/humanize-data explain)] - {::yrs/status 400 - ::yrs/body (-> data - (dissoc ::sm/explain) - (assoc :explain explain))}) + {::rres/status 400 + ::rres/body (-> data + (dissoc ::sm/explain) + (assoc :explain explain))}) (= code :data-validation) (let [explain (::sm/explain data) explain (sm/humanize-data explain)] - {::yrs/status 400 - ::yrs/body (-> data - (dissoc ::sm/explain) - (assoc :explain explain))}) + {::rres/status 400 + ::rres/body (-> data + (dissoc ::sm/explain) + (assoc :explain explain))}) (= code :request-body-too-large) - {::yrs/status 413 ::yrs/body data} + {::rres/status 413 ::rres/body data} (= code :invalid-image) (binding [l/*context* (request->context request)] (let [cause (or parent-cause err)] (l/error :hint "unexpected error on processing image" :cause cause) - {::yrs/status 400 ::yrs/body data})) + {::rres/status 400 ::rres/body data})) :else - {::yrs/status 400 ::yrs/body data}))) + {::rres/status 400 ::rres/body data}))) (defmethod handle-error :assertion [error request parent-cause] @@ -123,46 +123,46 @@ (= code :data-validation) (let [explain (ex/explain data)] (l/error :hint "data assertion error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :assertion - :data (-> data - (dissoc ::sm/explain) - (cond-> explain (assoc :explain explain)))}}) + {::rres/status 500 + ::rres/body {:type :server-error + :code :assertion + :data (-> data + (dissoc ::sm/explain) + (cond-> explain (assoc :explain explain)))}}) (= code :spec-validation) (let [explain (ex/explain data)] (l/error :hint "spec assertion error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :assertion - :data (-> data - (dissoc ::s/problems ::s/value ::s/spec) - (cond-> explain (assoc :explain explain)))}}) + {::rres/status 500 + ::rres/body {:type :server-error + :code :assertion + :data (-> data + (dissoc ::s/problems ::s/value ::s/spec) + (cond-> explain (assoc :explain explain)))}}) :else (do (l/error :hint "assertion error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :assertion - :data data}}))))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :assertion + :data data}}))))) (defmethod handle-error :not-found [err _ _] - {::yrs/status 404 - ::yrs/body (ex-data err)}) + {::rres/status 404 + ::rres/body (ex-data err)}) (defmethod handle-error :internal [error request parent-cause] (binding [l/*context* (request->context request)] (let [cause (or parent-cause error)] (l/error :hint "internal error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unhandled - :hint (ex-message error) - :data (ex-data error)}}))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unhandled + :hint (ex-message error) + :data (ex-data error)}}))) (defmethod handle-error :default [error request parent-cause] @@ -186,23 +186,23 @@ :cause cause) (cond (= state "57014") - {::yrs/status 504 - ::yrs/body {:type :server-error - :code :statement-timeout - :hint (ex-message error)}} + {::rres/status 504 + ::rres/body {:type :server-error + :code :statement-timeout + :hint (ex-message error)}} (= state "25P03") - {::yrs/status 504 - ::yrs/body {:type :server-error - :code :idle-in-transaction-timeout - :hint (ex-message error)}} + {::rres/status 504 + ::rres/body {:type :server-error + :code :idle-in-transaction-timeout + :hint (ex-message error)}} :else - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unexpected - :hint (ex-message error) - :state state}})))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unexpected + :hint (ex-message error) + :state state}})))) (defmethod handle-exception :default [error request parent-cause] @@ -213,19 +213,19 @@ (nil? edata) (binding [l/*context* (request->context request)] (l/error :hint "unexpected error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unexpected - :hint (ex-message error)}}) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unexpected + :hint (ex-message error)}}) :else (binding [l/*context* (request->context request)] (l/error :hint "unhandled error" :cause cause) - {::yrs/status 500 - ::yrs/body {:type :server-error - :code :unhandled - :hint (ex-message error) - :data edata}})))) + {::rres/status 500 + ::rres/body {:type :server-error + :code :unhandled + :hint (ex-message error) + :data edata}})))) (defmethod handle-exception java.util.concurrent.CompletionException [cause request _] diff --git a/backend/src/app/http/middleware.clj b/backend/src/app/http/middleware.clj index f71f9da952..4ea815f07f 100644 --- a/backend/src/app/http/middleware.clj +++ b/backend/src/app/http/middleware.clj @@ -12,13 +12,10 @@ [app.config :as cf] [app.util.json :as json] [cuerdas.core :as str] - [promesa.core :as p] - [promesa.exec :as px] - [promesa.util :as pu] + [ring.request :as rreq] + [ring.response :as rres] [yetti.adapter :as yt] - [yetti.middleware :as ymw] - [yetti.request :as yrq] - [yetti.response :as yrs]) + [yetti.middleware :as ymw]) (:import com.fasterxml.jackson.core.JsonParseException com.fasterxml.jackson.core.io.JsonEOFException @@ -46,17 +43,17 @@ (defn wrap-parse-request [handler] (letfn [(process-request [request] - (let [header (yrq/get-header request "content-type")] + (let [header (rreq/get-header request "content-type")] (cond (str/starts-with? header "application/transit+json") - (with-open [^InputStream is (yrq/body request)] + (with-open [^InputStream is (rreq/body request)] (let [params (t/read! (t/reader is))] (-> request (assoc :body-params params) (update :params merge params)))) (str/starts-with? header "application/json") - (with-open [^InputStream is (yrq/body request)] + (with-open [^InputStream is (rreq/body request)] (let [params (json/decode is json-mapper)] (-> request (assoc :body-params params) @@ -65,37 +62,36 @@ :else request))) - (handle-error [raise cause] + (handle-error [cause] (cond (instance? RuntimeException cause) (if-let [cause (ex-cause cause)] - (handle-error raise cause) - (raise cause)) + (handle-error cause) + (throw cause)) (instance? RequestTooBigException cause) - (raise (ex/error :type :validation - :code :request-body-too-large - :hint (ex-message cause))) - + (ex/raise :type :validation + :code :request-body-too-large + :hint (ex-message cause)) (or (instance? JsonEOFException cause) (instance? JsonParseException cause) (instance? MismatchedInputException cause)) - (raise (ex/error :type :validation - :code :malformed-json - :hint (ex-message cause) - :cause cause)) + (ex/raise :type :validation + :code :malformed-json + :hint (ex-message cause) + :cause cause) :else - (raise cause)))] + (throw cause)))] - (fn [request respond raise] - (if (= (yrq/method request) :post) + (fn [request] + (if (= (rreq/method request) :post) (let [request (ex/try! (process-request request))] (if (ex/exception? request) - (handle-error raise request) - (handler request respond raise))) - (handler request respond raise))))) + (handle-error request) + (handler request))) + (handler request))))) (def parse-request {:name ::parse-request @@ -113,7 +109,7 @@ (defn wrap-format-response [handler] (letfn [(transit-streamable-body [data opts] - (reify yrs/StreamableResponseBody + (reify rres/StreamableResponseBody (-write-body-to-stream [_ _ output-stream] (try (with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)] @@ -128,7 +124,7 @@ (.close ^OutputStream output-stream)))))) (json-streamable-body [data] - (reify yrs/StreamableResponseBody + (reify rres/StreamableResponseBody (-write-body-to-stream [_ _ output-stream] (try (with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)] @@ -143,24 +139,24 @@ (.close ^OutputStream output-stream)))))) (format-response-with-json [response _] - (let [body (::yrs/body response)] + (let [body (::rres/body response)] (if (or (boolean? body) (coll? body)) (-> response - (update ::yrs/headers assoc "content-type" "application/json") - (assoc ::yrs/body (json-streamable-body body))) + (update ::rres/headers assoc "content-type" "application/json") + (assoc ::rres/body (json-streamable-body body))) response))) (format-response-with-transit [response request] - (let [body (::yrs/body response)] + (let [body (::rres/body response)] (if (or (boolean? body) (coll? body)) - (let [qs (yrq/query request) + (let [qs (rreq/query request) opts (if (or (contains? cf/flags :transit-readable-response) (str/includes? qs "transit_verbose")) {:type :json-verbose} {:type :json})] (-> response - (update ::yrs/headers assoc "content-type" "application/transit+json") - (assoc ::yrs/body (transit-streamable-body body opts)))) + (update ::rres/headers assoc "content-type" "application/transit+json") + (assoc ::rres/body (transit-streamable-body body opts)))) response))) (format-from-params [{:keys [query-params] :as request}] @@ -169,7 +165,7 @@ (format-response [response request] (let [accept (or (format-from-params request) - (yrq/get-header request "accept"))] + (rreq/get-header request "accept"))] (cond (or (= accept "application/transit+json") (str/includes? accept "application/transit+json")) @@ -186,11 +182,9 @@ (cond-> response (map? response) (format-response request)))] - (fn [request respond raise] - (handler request - (fn [response] - (respond (process-response response request))) - raise)))) + (fn [request] + (let [response (handler request)] + (process-response response request))))) (def format-response {:name ::format-response @@ -198,12 +192,11 @@ (defn wrap-errors [handler on-error] - (fn [request respond raise] - (handler request respond (fn [cause] - (try - (respond (on-error cause request)) - (catch Throwable cause - (raise cause))))))) + (fn [request] + (try + (handler request) + (catch Throwable cause + (on-error cause request))))) (def errors {:name ::errors @@ -221,11 +214,11 @@ (defn wrap-cors [handler] (fn [request] - (let [response (if (= (yrq/method request) :options) - {::yrs/status 200} + (let [response (if (= (rreq/method request) :options) + {::rres/status 200} (handler request)) - origin (yrq/get-header request "origin")] - (update response ::yrs/headers with-cors-headers origin)))) + origin (rreq/get-header request "origin")] + (update response ::rres/headers with-cors-headers origin)))) (def cors {:name ::cors @@ -239,18 +232,8 @@ (fn [data _] (when-let [allowed (:allowed-methods data)] (fn [handler] - (fn [request respond raise] - (let [method (yrq/method request)] + (fn [request] + (let [method (rreq/method request)] (if (contains? allowed method) - (handler request respond raise) - (respond {::yrs/status 405})))))))}) - -(def with-dispatch - {:name ::with-dispatch - :compile - (fn [& _] - (fn [handler executor] - (let [executor (px/resolve-executor executor)] - (fn [request respond raise] - (->> (px/submit! executor (partial handler request)) - (p/fnly (pu/handler respond raise)))))))}) + (handler request) + {::rres/status 405}))))))}) diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj index 5c10816323..696cc6a3a2 100644 --- a/backend/src/app/http/session.clj +++ b/backend/src/app/http/session.clj @@ -20,6 +20,7 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] + [ring.request :as rreq] [yetti.request :as yrq])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -142,7 +143,7 @@ (us/assert! ::us/uuid profile-id) (fn [request response] - (let [uagent (yrq/get-header request "user-agent") + (let [uagent (rreq/get-header request "user-agent") params {:profile-id profile-id :user-agent uagent :created-at (dt/now)} @@ -209,9 +210,8 @@ (l/trace :hint "exception on decoding malformed token" :cause cause) request)))] - (fn [request respond raise] - (let [request (handle-request request)] - (handler request respond raise))))) + (fn [request] + (handler (handle-request request))))) (defn- wrap-authz [handler {:keys [::manager]}] diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index bb29839a11..70a9ad97c6 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -10,7 +10,7 @@ [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.pprint :as pp] - [app.common.spec :as us] + [app.common.schema :as sm] [app.common.uuid :as uuid] [app.db :as db] [app.http.session :as session] @@ -21,6 +21,7 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.exec.csp :as sp] + [ring.websocket :as rws] [yetti.websocket :as yws])) (def recv-labels @@ -277,19 +278,23 @@ :inc 1) message) - -(s/def ::session-id ::us/uuid) -(s/def ::handler-params - (s/keys :req-un [::session-id])) +(def ^:private schema:params + (sm/define + [:map {:title "params"} + [:session-id ::sm/uuid]])) (defn- http-handler [cfg {:keys [params ::session/profile-id] :as request}] - (let [{:keys [session-id]} (us/conform ::handler-params params)] + (let [{:keys [session-id]} (sm/conform! schema:params params)] (cond (not profile-id) (ex/raise :type :authentication :hint "Authentication required.") + ;; WORKAROUND: we use the adapter specific predicate for + ;; performance reasons; for now, the ring default impl for + ;; `upgrade-request?` parses all requests headers before perform + ;; any checking. (not (yws/upgrade-request? request)) (ex/raise :type :validation :code :websocket-request-expected @@ -298,14 +303,13 @@ :else (do (l/trace :hint "websocket request" :profile-id profile-id :session-id session-id) - (->> (ws/handler - ::ws/on-rcv-message (partial on-rcv-message cfg) - ::ws/on-snd-message (partial on-snd-message cfg) - ::ws/on-connect (partial on-connect cfg) - ::ws/handler (partial handle-message cfg) - ::profile-id profile-id - ::session-id session-id) - (yws/upgrade request)))))) + {::rws/listener (ws/listener request + ::ws/on-rcv-message (partial on-rcv-message cfg) + ::ws/on-snd-message (partial on-snd-message cfg) + ::ws/on-connect (partial on-connect cfg) + ::ws/handler (partial handle-message cfg) + ::profile-id profile-id + ::session-id session-id)})))) (defmethod ig/pre-init-spec ::routes [_] (s/keys :req [::mbus/msgbus @@ -318,5 +322,4 @@ (defmethod ig/init-key ::routes [_ cfg] ["/ws/notifications" {:middleware [[session/authz cfg]] - :handler (partial http-handler cfg) - :allowed-methods #{:get}}]) + :handler (partial http-handler cfg)}]) diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index df1ad5bf05..4171f52ab3 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -33,7 +33,7 @@ [integrant.core :as ig] [lambdaisland.uri :as u] [promesa.exec :as px] - [yetti.request :as yrq])) + [ring.request :as rreq])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; HELPERS @@ -41,9 +41,9 @@ (defn parse-client-ip [request] - (or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) - (yrq/get-header request "x-real-ip") - (some-> (yrq/remote-addr request) str))) + (or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first) + (rreq/get-header request "x-real-ip") + (some-> (rreq/remote-addr request) str))) (defn extract-utm-params "Extracts additional data from params and namespace them under diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index b8d9d87020..22c5109dec 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -235,7 +235,6 @@ {::http/port (cf/get :http-server-port) ::http/host (cf/get :http-server-host) ::http/router (ig/ref ::http/router) - ::wrk/executor (ig/ref ::wrk/executor) ::http/io-threads (cf/get :http-server-io-threads) ::http/max-body-size (cf/get :http-server-max-body-size) ::http/max-multipart-body-size (cf/get :http-server-max-multipart-body-size)} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 201e830624..2ede5681f1 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -34,8 +34,8 @@ [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] - [yetti.request :as yrq] - [yetti.response :as yrs])) + [ring.request :as rreq] + [ring.response :as rres])) (s/def ::profile-id ::us/uuid) @@ -61,9 +61,9 @@ (if (fn? result) (result request) (let [mdata (meta result)] - (-> {::yrs/status (::http/status mdata 200) - ::yrs/headers (::http/headers mdata {}) - ::yrs/body (rph/unwrap result)} + (-> {::rres/status (::http/status mdata 200) + ::rres/headers (::http/headers mdata {}) + ::rres/body (rph/unwrap result)} (handle-response-transformation request mdata) (handle-before-comple-hook mdata))))) @@ -72,7 +72,7 @@ internal async flow into ring async flow." [methods {:keys [params path-params] :as request}] (let [type (keyword (:type path-params)) - etag (yrq/get-header request "if-none-match") + etag (rreq/get-header request "if-none-match") profile-id (or (::session/profile-id request) (::actoken/profile-id request)) @@ -138,6 +138,8 @@ (f cfg (us/conform spec params))) f))) +;; TODO: integrate with sm/define + (defn- wrap-params-validation [_ f mdata] (if-let [schema (::sm/params mdata)] diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index 7e0165ef8f..8d0fdc9eac 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -44,8 +44,8 @@ [cuerdas.core :as str] [datoteka.io :as io] [promesa.util :as pu] - [yetti.adapter :as yt] - [yetti.response :as yrs]) + [ring.response :as rres] + [yetti.adapter :as yt]) (:import com.github.luben.zstd.ZstdInputStream com.github.luben.zstd.ZstdOutputStream @@ -1071,7 +1071,7 @@ ::webhooks/event? true} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}] (files/check-read-permissions! pool profile-id file-id) - (let [body (reify yrs/StreamableResponseBody + (let [body (reify rres/StreamableResponseBody (-write-body-to-stream [_ _ output-stream] (-> cfg (assoc ::file-ids [file-id]) @@ -1080,9 +1080,9 @@ (export! output-stream))))] (fn [_] - {::yrs/status 200 - ::yrs/body body - ::yrs/headers {"content-type" "application/octet-stream"}}))) + {::rres/status 200 + ::rres/body body + ::rres/headers {"content-type" "application/octet-stream"}}))) (s/def ::file ::media/upload) (s/def ::import-binfile diff --git a/backend/src/app/rpc/cond.clj b/backend/src/app/rpc/cond.clj index b683ded138..a7db513b8b 100644 --- a/backend/src/app/rpc/cond.clj +++ b/backend/src/app/rpc/cond.clj @@ -29,7 +29,7 @@ [app.util.services :as-alias sv] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] - [yetti.response :as yrs])) + [ring.response :as-alias rres])) (def ^{:dynamic true @@ -57,7 +57,7 @@ (let [key' (when (or key reuse-key?) (some->> (get-object cfg params) (key-fn params) (fmt-key)))] (if (and (some? key) (= key key')) - (fn [_] {::yrs/status 304}) + (fn [_] {::rres/status 304}) (let [result (f cfg params) etag (or (and reuse-key? key') (some-> result meta ::key fmt-key) diff --git a/backend/src/app/rpc/doc.clj b/backend/src/app/rpc/doc.clj index 24451e553d..326cd37271 100644 --- a/backend/src/app/rpc/doc.clj +++ b/backend/src/app/rpc/doc.clj @@ -27,7 +27,7 @@ [integrant.core :as ig] [malli.transform :as mt] [pretty-spec.core :as ps] - [yetti.response :as yrs])) + [ring.response :as-alias rres])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; DOC (human readable) @@ -86,11 +86,11 @@ (let [params (:query-params request) pstyle (:type params "js") context (assoc context :param-style pstyle)] - {::yrs/status 200 - ::yrs/body (-> (io/resource "app/templates/api-doc.tmpl") + {::rres/status 200 + ::rres/body (-> (io/resource "app/templates/api-doc.tmpl") (tmpl/render context))})) (fn [_] - {::yrs/status 404}))) + {::rres/status 404}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; OPENAPI / SWAGGER (v3.1) @@ -173,12 +173,12 @@ [context] (if (contains? cf/flags :backend-openapi-doc) (fn [_] - {::yrs/status 200 - ::yrs/headers {"content-type" "application/json; charset=utf-8"} - ::yrs/body (json/encode context)}) + {::rres/status 200 + ::rres/headers {"content-type" "application/json; charset=utf-8"} + ::rres/body (json/encode context)}) (fn [_] - {::yrs/status 404}))) + {::rres/status 404}))) (defn openapi-handler [] @@ -189,12 +189,12 @@ context {:public-uri (cf/get :public-uri) :swagger-js swagger-js :swagger-css swagger-cs}] - {::yrs/status 200 - ::yrs/headers {"content-type" "text/html"} - ::yrs/body (-> (io/resource "app/templates/openapi.tmpl") + {::rres/status 200 + ::rres/headers {"content-type" "text/html"} + ::rres/body (-> (io/resource "app/templates/openapi.tmpl") (tmpl/render context))})) (fn [_] - {::yrs/status 404}))) + {::rres/status 404}))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MODULE INIT diff --git a/backend/src/app/rpc/helpers.clj b/backend/src/app/rpc/helpers.clj index 69d1a2d717..87b91f545b 100644 --- a/backend/src/app/rpc/helpers.clj +++ b/backend/src/app/rpc/helpers.clj @@ -11,7 +11,7 @@ [app.common.data.macros :as dm] [app.http :as-alias http] [app.rpc :as-alias rpc] - [yetti.response :as-alias yrs])) + [ring.response :as-alias rres])) ;; A utilty wrapper object for wrap service responses that does not ;; implements the IObj interface that make possible attach metadata to @@ -77,4 +77,4 @@ (fn [_ response] (let [exp (if (integer? max-age) max-age (inst-ms max-age)) val (dm/fmt "max-age=%" (int (/ exp 1000.0)))] - (update response ::yrs/headers assoc "cache-control" val))))) + (update response ::rres/headers assoc "cache-control" val))))) diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index 1b8e165609..284ca46023 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -15,8 +15,8 @@ [app.util.time :as dt] [promesa.exec :as px] [promesa.exec.csp :as sp] - [yetti.request :as yr] - [yetti.util :as yu] + [ring.request :as rreq] + [ring.websocket :as rws] [yetti.websocket :as yws]) (:import java.nio.ByteBuffer)) @@ -50,7 +50,7 @@ (declare start-io-loop!) -(defn handler +(defn listener "A WebSocket upgrade handler factory. Returns a handler that can be used to upgrade to websocket connection. This handler implements the basic custom protocol on top of websocket connection with all the @@ -61,37 +61,34 @@ It also accepts some options that allows you parametrize the protocol behavior. The options map will be used as-as for the initial data of the `ws` data structure" - [& {:keys [::on-rcv-message - ::on-snd-message - ::on-connect - ::input-buff-size - ::output-buff-size - ::idle-timeout] - :or {input-buff-size 64 - output-buff-size 64 - idle-timeout 60000 - on-connect identity - on-snd-message identity-3 - on-rcv-message identity-3} - :as options}] + [request & {:keys [::on-rcv-message + ::on-snd-message + ::on-connect + ::input-buff-size + ::output-buff-size + ::idle-timeout] + :or {input-buff-size 64 + output-buff-size 64 + idle-timeout 60000 + on-connect identity + on-snd-message identity-3 + on-rcv-message identity-3} + :as options}] (assert (fn? on-rcv-message) "'on-rcv-message' should be a function") (assert (fn? on-snd-message) "'on-snd-message' should be a function") (assert (fn? on-connect) "'on-connect' should be a function") - (fn [{:keys [::yws/channel] :as request}] - (let [input-ch (sp/chan :buf input-buff-size) - output-ch (sp/chan :buf output-buff-size) - hbeat-ch (sp/chan :buf (sp/sliding-buffer 6)) - close-ch (sp/chan) - - ip-addr (parse-client-ip request) - uagent (yr/get-header request "user-agent") - id (uuid/next) - state (atom {}) - beats (atom #{}) - - options (-> options + (let [input-ch (sp/chan :buf input-buff-size) + output-ch (sp/chan :buf output-buff-size) + hbeat-ch (sp/chan :buf (sp/sliding-buffer 6)) + close-ch (sp/chan) + ip-addr (parse-client-ip request) + uagent (rreq/get-header request "user-agent") + id (uuid/next) + state (atom {}) + beats (atom #{}) + options (-> options (update ::handler wrap-handler) (assoc ::id id) (assoc ::state state) @@ -101,126 +98,118 @@ (assoc ::heartbeat-ch hbeat-ch) (assoc ::output-ch output-ch) (assoc ::close-ch close-ch) - (assoc ::channel channel) (assoc ::remote-addr ip-addr) - (assoc ::user-agent uagent) + (assoc ::user-agent uagent))] + + {:on-open + (fn on-open [channel] + (l/trace :fn "on-open" :conn-id id :channel channel) + (let [options (-> options + (assoc ::channel channel) (on-connect)) + timeout (dt/duration idle-timeout)] - on-ws-open - (fn [channel] - (l/trace :fn "on-ws-open" :conn-id id) - (let [timeout (dt/duration idle-timeout) - name (str "penpot/websocket/io-loop/" id)] - (yws/idle-timeout! channel timeout) - (px/fn->thread (partial start-io-loop! options) - {:name name :virtual true}))) + (yws/set-idle-timeout! channel timeout) + (px/submit! :vthread (partial start-io-loop! options)))) - on-ws-terminate - (fn [_ code reason] - (l/trace :fn "on-ws-terminate" - :conn-id id - :code code - :reason reason) - (sp/close! close-ch)) + :on-close + (fn on-close [_channel code reason] + (l/info :fn "on-ws-terminate" + :conn-id id + :code code + :reason reason) + (sp/close! close-ch)) - on-ws-error - (fn [_ cause] - (sp/close! close-ch cause)) + :on-error + (fn on-error [_channel cause] + (sp/close! close-ch cause)) - on-ws-message - (fn [_ message] - (sp/offer! input-ch message) - (swap! state assoc ::last-activity-at (dt/now))) + :on-message + (fn on-message [_channel message] + (when (string? message) + (sp/offer! input-ch message) + (swap! state assoc ::last-activity-at (dt/now)))) - on-ws-pong - (fn [_ buffers] - ;; (l/trace :fn "on-ws-pong" :buffers (pr-str buffers)) - (sp/put! hbeat-ch (yu/copy-many buffers)))] - - (yws/on-close! channel (fn [_] - (sp/close! close-ch))) - - {:on-open on-ws-open - :on-error on-ws-error - :on-close on-ws-terminate - :on-text on-ws-message - :on-pong on-ws-pong}))) + :on-pong + (fn on-pong [_channel data] + (l/trace :fn "on-pong" :data data) + (sp/put! hbeat-ch data))})) (defn- handle-ping! [{:keys [::id ::beats ::channel] :as wsp} beat-id] - (l/trace :hint "ping" :beat beat-id :conn-id id) - (yws/ping! channel (encode-beat beat-id)) + (l/trace :hint "send ping" :beat beat-id :conn-id id) + (rws/ping channel (encode-beat beat-id)) (let [issued (swap! beats conj (long beat-id))] (not (>= (count issued) max-missed-heartbeats)))) (defn- start-io-loop! [{:keys [::id ::close-ch ::input-ch ::output-ch ::heartbeat-ch ::channel ::handler ::beats ::on-rcv-message ::on-snd-message] :as wsp}] - (px/thread - {:name (str "penpot/websocket/io-loop/" id) - :virtual true} - (try - (handler wsp {:type :open}) - (loop [i 0] - (let [ping-ch (sp/timeout-chan heartbeat-interval) - [msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])] - (when (yws/connected? channel) - (cond - (identical? p ping-ch) - (if (handle-ping! wsp i) - (recur (inc i)) - (yws/close! channel 8802 "missing to many pings")) + (try + (handler wsp {:type :open}) + (loop [i 0] + (let [ping-ch (sp/timeout-chan heartbeat-interval) + [msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])] + (when (rws/open? channel) + (cond + (identical? p ping-ch) + (if (handle-ping! wsp i) + (recur (inc i)) + (rws/close channel 8802 "missing to many pings")) - (or (identical? p close-ch) (nil? msg)) - (do :nothing) + (or (identical? p close-ch) (nil? msg)) + (do :nothing) - (identical? p heartbeat-ch) - (let [beat (decode-beat msg)] - ;; (l/trace :hint "pong" :beat beat :conn-id id) - (swap! beats disj beat) - (recur i)) + (identical? p heartbeat-ch) + (let [beat (decode-beat msg)] + ;; (l/trace :hint "pong" :beat beat :conn-id id) + (swap! beats disj beat) + (recur i)) - (identical? p input-ch) - (let [message (t/decode-str msg) - message (on-rcv-message message) - {:keys [request-id] :as response} (handler wsp message)] - (when (map? response) - (sp/put! output-ch - (cond-> response - (some? request-id) - (assoc :request-id request-id)))) - (recur i)) + (identical? p input-ch) + (let [message (t/decode-str msg) + message (on-rcv-message message) + {:keys [request-id] :as response} (handler wsp message)] + (when (map? response) + (sp/put! output-ch + (cond-> response + (some? request-id) + (assoc :request-id request-id)))) + (recur i)) - (identical? p output-ch) - (let [message (on-snd-message msg) - message (t/encode-str message {:type :json-verbose})] - ;; (l/trace :hint "writing message to output" :message msg) - (yws/send! channel message) - (recur i)))))) + (identical? p output-ch) + (let [message (on-snd-message msg) + message (t/encode-str message {:type :json-verbose})] + ;; (l/trace :hint "writing message to output" :message msg) + (rws/send channel message) + (recur i)))))) - (catch java.nio.channels.ClosedChannelException _) - (catch java.net.SocketException _) - (catch java.io.IOException _) + (catch java.nio.channels.ClosedChannelException _) + (catch java.net.SocketException _) + (catch java.io.IOException _) - (catch InterruptedException _ - (l/debug :hint "websocket thread interrumpted" :conn-id id)) + (catch InterruptedException _cause + (l/debug :hint "websocket thread interrumpted" :conn-id id)) - (catch Throwable cause - (l/error :hint "unhandled exception on websocket thread" - :conn-id id - :cause cause)) - - (finally + (catch Throwable cause + (l/error :hint "unhandled exception on websocket thread" + :conn-id id + :cause cause)) + (finally + (try (handler wsp {:type :close}) - (when (yws/connected? channel) + (when (rws/open? channel) ;; NOTE: we need to ignore all exceptions here because ;; there can be a race condition that first returns that ;; channel is connected but on closing, will raise that ;; channel is already closed. (ex/ignoring - (yws/close! channel 8899 "terminated"))) + (rws/close channel 8899 "terminated"))) (when-let [on-disconnect (::on-disconnect wsp)] (on-disconnect)) - (l/trace :hint "websocket thread terminated" :conn-id id))))) + (catch Throwable cause + (throw cause))) + + (l/trace :hint "websocket thread terminated" :conn-id id)))) diff --git a/backend/test/backend_tests/http_middleware_access_token_test.clj b/backend/test/backend_tests/http_middleware_access_token_test.clj index ddc170355b..0b658d853a 100644 --- a/backend/test/backend_tests/http_middleware_access_token_test.clj +++ b/backend/test/backend_tests/http_middleware_access_token_test.clj @@ -31,17 +31,17 @@ request (volatile! nil) handler (#'app.http.access-token/wrap-soft-auth - (fn [req & _] (vreset! request req)) + (fn [req] (vreset! request req)) system)] (with-mocks [m1 {:target 'app.http.access-token/get-token :return nil}] - (handler {} nil nil) + (handler {}) (t/is (= {} @request))) (with-mocks [m1 {:target 'app.http.access-token/get-token :return (:token token)}] - (handler {} nil nil) + (handler {}) (let [token-id (get @request :app.http.access-token/id)] (t/is (= token-id (:id token)))))))) diff --git a/backend/test/backend_tests/rpc_audit_test.clj b/backend/test/backend_tests/rpc_audit_test.clj index 233728dac3..7a7fc6bae2 100644 --- a/backend/test/backend_tests/rpc_audit_test.clj +++ b/backend/test/backend_tests/rpc_audit_test.clj @@ -25,7 +25,7 @@ (def http-request (reify - yetti.request/Request + ring.request/Request (get-header [_ name] (case name "x-forwarded-for" "127.0.0.44")))) diff --git a/backend/test/backend_tests/rpc_cond_middleware_test.clj b/backend/test/backend_tests/rpc_cond_middleware_test.clj index c2ab68ad01..a19d85a0ce 100644 --- a/backend/test/backend_tests/rpc_cond_middleware_test.clj +++ b/backend/test/backend_tests/rpc_cond_middleware_test.clj @@ -46,6 +46,6 @@ {:keys [error result]} (th/command! (assoc params ::cond/key etag))] (t/is (nil? error)) (t/is (fn? result)) - (t/is (= 304 (-> (result nil) :yetti.response/status)))) + (t/is (= 304 (-> (result nil) :ring.response/status)))) )))) From 54341d5b224bc1e6459eb4720004b455acc9027d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 14:53:16 +0100 Subject: [PATCH 02/14] :sparkles: Make the RPC climit subsystem more robust --- backend/resources/climit.edn | 16 +- backend/src/app/main.clj | 3 +- backend/src/app/rpc/climit.clj | 205 +++++++++--------- backend/src/app/rpc/commands/audit.clj | 2 +- backend/src/app/rpc/commands/files_update.clj | 13 +- backend/src/app/rpc/commands/fonts.clj | 6 +- backend/src/app/rpc/commands/media.clj | 7 +- backend/src/app/rpc/commands/profile.clj | 17 +- 8 files changed, 135 insertions(+), 134 deletions(-) diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn index 4a485b8f7d..6bb330927f 100644 --- a/backend/resources/climit.edn +++ b/backend/resources/climit.edn @@ -3,15 +3,17 @@ ;; Optional: queue, ommited means Integer/MAX_VALUE ;; Optional: timeout, ommited means no timeout ;; Note: queue and timeout are excluding -{:update-file-by-id {:permits 1 :queue 3} - :update-file {:permits 20} +{:update-file/by-profile + {:permits 1 :queue 5} - :derive-password {:permits 8} - :process-font {:permits 4 :queue 32} - :process-image {:permits 8 :queue 32} + :update-file/global {:permits 20} - :file-thumbnail-ops + :derive-password/global {:permits 8} + :process-font/global {:permits 4} + :process-image/global {:permits 8} + + :file-thumbnail-ops/by-profile {:permits 2} - :submit-audit-events-by-profile + :submit-audit-events/by-profile {:permits 1 :queue 3}} diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 22c5109dec..125f238d34 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -310,8 +310,7 @@ ::wrk/executor (ig/ref ::wrk/executor)} :app.rpc/climit - {::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/executor (ig/ref ::wrk/executor)} + {::mtx/metrics (ig/ref ::mtx/metrics)} :app.rpc/rlimit {::wrk/executor (ig/ref ::wrk/executor)} diff --git a/backend/src/app/rpc/climit.clj b/backend/src/app/rpc/climit.clj index 279723d57e..f5004d575a 100644 --- a/backend/src/app/rpc/climit.clj +++ b/backend/src/app/rpc/climit.clj @@ -31,19 +31,24 @@ (set! *warn-on-reflection* true) +(defn- id->str + [id] + (-> (str id) + (subs 1))) + (defn- create-bulkhead-cache - [{:keys [::wrk/executor]} config] - (letfn [(load-fn [key] - (let [config (get config (nth key 0))] - (l/trc :hint "insert into cache" :key key) + [config] + (letfn [(load-fn [[id skey]] + (when-let [config (get config id)] + (l/trc :hint "insert into cache" :id (id->str id) :key skey) (pbh/create :permits (or (:permits config) (:concurrency config)) :queue (or (:queue config) (:queue-size config)) :timeout (:timeout config) - :executor executor - :type (:type config :semaphore)))) + :type :semaphore))) - (on-remove [_ _ cause] - (l/trc :hint "evict from cache" :key key :reason (str cause)))] + (on-remove [key _ cause] + (let [[id skey] key] + (l/trc :hint "evict from cache" :id (id->str id) :key skey :reason (str cause))))] (cache/create :executor :same-thread :on-remove on-remove @@ -65,22 +70,21 @@ (s/def ::path ::fs/path) (defmethod ig/pre-init-spec ::rpc/climit [_] - (s/keys :req [::wrk/executor ::mtx/metrics ::path])) + (s/keys :req [::mtx/metrics ::path])) (defmethod ig/init-key ::rpc/climit - [_ {:keys [::path ::mtx/metrics ::wrk/executor] :as cfg}] + [_ {:keys [::path ::mtx/metrics] :as cfg}] (when (contains? cf/flags :rpc-climit) (when-let [params (some->> path slurp edn/read-string)] (l/inf :hint "initializing concurrency limit" :config (str path)) (us/verify! ::config params) - {::cache (create-bulkhead-cache cfg params) + {::cache (create-bulkhead-cache params) ::config params - ::wrk/executor executor ::mtx/metrics metrics}))) (s/def ::cache cache/cache?) (s/def ::instance - (s/keys :req [::cache ::config ::wrk/executor])) + (s/keys :req [::cache ::config])) (s/def ::rpc/climit (s/nilable ::instance)) @@ -91,107 +95,94 @@ (defn invoke! [cache metrics id key f] - (let [limiter (cache/get cache [id key]) - tpoint (dt/tpoint) - labels (into-array String [(name id)]) + (if-let [limiter (cache/get cache [id key])] + (let [tpoint (dt/tpoint) + labels (into-array String [(id->str id)]) + wrapped (fn [] + (let [elapsed (tpoint) + stats (pbh/get-stats limiter)] + (l/trc :hint "acquired" + :id (id->str id) + :key key + :permits (:permits stats) + :queue (:queue stats) + :max-permits (:max-permits stats) + :max-queue (:max-queue stats) + :elapsed (dt/format-duration elapsed)) - wrapped - (fn [] - (let [elapsed (tpoint) - stats (pbh/get-stats limiter)] - (l/trc :hint "executed" - :id (name id) - :key key - :fnh (hash f) - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed)) + (mtx/run! metrics + :id :rpc-climit-timing + :val (inst-ms elapsed) + :labels labels) + (try + (f) + (finally + (let [elapsed (tpoint)] + (l/trc :hint "finished" + :id (id->str id) + :key key + :permits (:permits stats) + :queue (:queue stats) + :max-permits (:max-permits stats) + :max-queue (:max-queue stats) + :elapsed (dt/format-duration elapsed))))))) + measure! + (fn [stats] (mtx/run! metrics - :id :rpc-climit-timing - :val (inst-ms elapsed) + :id :rpc-climit-queue + :val (:queue stats) :labels labels) - (try - (f) - (finally - (let [elapsed (tpoint)] - (l/trc :hint "finished" - :id (name id) - :key key - :fnh (hash f) - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats) - :elapsed (dt/format-duration elapsed))))))) - measure! - (fn [stats] - (mtx/run! metrics - :id :rpc-climit-queue - :val (:queue stats) - :labels labels) - (mtx/run! metrics - :id :rpc-climit-permits - :val (:permits stats) - :labels labels))] + (mtx/run! metrics + :id :rpc-climit-permits + :val (:permits stats) + :labels labels))] - (try - (let [stats (pbh/get-stats limiter)] - (measure! stats) - (l/trc :hint "enqueued" - :id (name id) - :key key - :fnh (hash f) - :permits (:permits stats) - :queue (:queue stats) - :max-permits (:max-permits stats) - :max-queue (:max-queue stats)) - (pbh/invoke! limiter wrapped)) - (catch ExceptionInfo cause - (let [{:keys [type code]} (ex-data cause)] - (if (= :bulkhead-error type) - (ex/raise :type :concurrency-limit - :code code - :hint "concurrency limit reached") - (throw cause)))) + (try + (let [stats (pbh/get-stats limiter)] + (measure! stats) + (l/trc :hint "enqueued" + :id (id->str id) + :key key + :permits (:permits stats) + :queue (:queue stats) + :max-permits (:max-permits stats) + :max-queue (:max-queue stats)) + (pbh/invoke! limiter wrapped)) + (catch ExceptionInfo cause + (let [{:keys [type code]} (ex-data cause)] + (if (= :bulkhead-error type) + (ex/raise :type :concurrency-limit + :code code + :hint "concurrency limit reached") + (throw cause)))) - (finally - (measure! (pbh/get-stats limiter)))))) + (finally + (measure! (pbh/get-stats limiter))))) - -(defn run! - [{:keys [::id ::cache ::mtx/metrics]} f] - (if (and cache id) - (invoke! cache metrics id nil f) - (f))) - -(defn submit! - [{:keys [::id ::cache ::wrk/executor ::mtx/metrics]} f] - (let [f (partial px/submit! executor (px/wrap-bindings f))] - (if (and cache id) - (p/await! (invoke! cache metrics id nil f)) - (p/await! (f))))) + (do + (l/wrn :hint "unable to load limiter" :id (id->str id)) + (f)))) (defn configure - ([{:keys [::rpc/climit]} id] - (us/assert! ::rpc/climit climit) - (assoc climit ::id id)) - ([{:keys [::rpc/climit]} id executor] - (us/assert! ::rpc/climit climit) - (-> climit - (assoc ::id id) - (assoc ::wrk/executor executor)))) + [{:keys [::rpc/climit]} id] + (us/assert! ::rpc/climit climit) + (assoc climit ::id id)) -(defmacro with-dispatch! - "Dispatch blocking operation to a separated thread protected with the - specified concurrency limiter. If climit is not active, the function - will be scheduled to execute without concurrency monitoring." - [instance & body] - (if (vector? instance) - `(-> (app.rpc.climit/configure ~@instance) - (app.rpc.climit/run! (^:once fn* [] ~@body))) - `(run! ~instance (^:once fn* [] ~@body)))) +(defn run! + "Run a function in context of climit. + Intended to be used in virtual threads." + ([{:keys [::id ::cache ::mtx/metrics]} f] + (if (and cache id) + (invoke! cache metrics id nil f) + (f))) + + ([{:keys [::id ::cache ::mtx/metrics]} f executor] + (let [f (fn [] + (let [f (px/wrap-bindings f)] + (p/await! (px/submit! executor f))))] + (if (and cache id) + (invoke! cache metrics id nil f) + (f))))) (def noop-fn (constantly nil)) @@ -201,7 +192,7 @@ (if-let [config (get-in climit [::config id])] (let [cache (::cache climit)] (l/dbg :hint "instrumenting method" - :limit (name id) + :limit (id->str id) :service-name (::sv/name mdata) :timeout (:timeout config) :permits (:permits config) @@ -212,7 +203,7 @@ (invoke! cache metrics id (key-fn params) (partial f cfg params)))) (do - (l/wrn :hint "no config found for specified queue" :id id) + (l/wrn :hint "no config found for specified queue" :id (id->str id)) f)) f)) diff --git a/backend/src/app/rpc/commands/audit.clj b/backend/src/app/rpc/commands/audit.clj index 8049595c91..fa56087219 100644 --- a/backend/src/app/rpc/commands/audit.clj +++ b/backend/src/app/rpc/commands/audit.clj @@ -64,7 +64,7 @@ [:events [:vector schema:event]]]) (sv/defmethod ::push-audit-events - {::climit/id :submit-audit-events-by-profile + {::climit/id :submit-audit-events/by-profile ::climit/key-fn ::rpc/profile-id ::sm/params schema:push-audit-events ::audit/skip true diff --git a/backend/src/app/rpc/commands/files_update.clj b/backend/src/app/rpc/commands/files_update.clj index f827257cfd..5aabe5fce7 100644 --- a/backend/src/app/rpc/commands/files_update.clj +++ b/backend/src/app/rpc/commands/files_update.clj @@ -34,6 +34,7 @@ [app.util.pointer-map :as pmap] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.set :as set])) ;; --- SCHEMA @@ -133,8 +134,8 @@ ;; database. (sv/defmethod ::update-file - {::climit/id :update-file-by-id - ::climit/key-fn :id + {::climit/id :update-file/by-profile + ::climit/key-fn ::rpc/profile-id ::webhooks/event? true ::webhooks/batch-timeout (dt/duration "2m") ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id) @@ -231,13 +232,15 @@ :team-id (:team-id file)})))))) (defn- update-file* - [{:keys [::db/conn] :as cfg} + [{:keys [::db/conn ::wrk/executor] :as cfg} {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] (let [;; Process the file data in the CLIMIT context; scheduling it ;; to be executed on a separated executor for avoid to do the ;; CPU intensive operation on vthread. - file (-> (climit/configure cfg :update-file) - (climit/submit! (partial update-file-data conn file changes skip-validate)))] + + update-fdata-fn (partial update-file-data conn file changes skip-validate) + file (-> (climit/configure cfg :update-file/global) + (climit/run! update-fdata-fn executor))] (db/insert! conn :file-change {:id (uuid/next) diff --git a/backend/src/app/rpc/commands/fonts.clj b/backend/src/app/rpc/commands/fonts.clj index 256132e847..f22c8c1ef0 100644 --- a/backend/src/app/rpc/commands/fonts.clj +++ b/backend/src/app/rpc/commands/fonts.clj @@ -25,6 +25,7 @@ [app.storage :as sto] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.spec.alpha :as s])) (def valid-weight #{100 200 300 400 500 600 700 800 900 950}) @@ -159,8 +160,9 @@ :ttf-file-id (:id ttf)})) ] - (let [data (-> (climit/configure cfg :process-font) - (climit/submit! (partial generate-missing! data))) + (let [data (-> (climit/configure cfg :process-font/global) + (climit/run! (partial generate-missing! data) + (::wrk/executor cfg))) assets (persist-fonts-files! data) result (insert-font-variant! assets)] (vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys)))))) diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index ef13c969fd..04ad8bc9bc 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -23,6 +23,7 @@ [app.storage :as sto] [app.storage.tmp :as tmp] [app.util.services :as sv] + [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.io :as io])) @@ -142,11 +143,11 @@ (assoc ::image (process-main-image info))))) (defn create-file-media-object - [{:keys [::sto/storage ::db/conn] :as cfg} + [{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg} {:keys [id file-id is-local name content]}] - (let [result (-> (climit/configure cfg :process-image) - (climit/submit! (partial process-image content))) + (let [result (-> (climit/configure cfg :process-image/global) + (climit/run! (partial process-image content) executor)) image (sto/put-object! storage (::image result)) thumb (when-let [params (::thumb result)] diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index 60d1810096..1a11e57f2f 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -26,6 +26,7 @@ [app.tokens :as tokens] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as-alias wrk] [cuerdas.core :as str])) (declare check-profile-existence!) @@ -230,9 +231,9 @@ :content-type (:mtype thumb)})) (defn upload-photo - [{:keys [::sto/storage] :as cfg} {:keys [file]}] - (let [params (-> (climit/configure cfg :process-image) - (climit/submit! (partial generate-thumbnail! file)))] + [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}] + (let [params (-> (climit/configure cfg :process-image/global) + (climit/run! (partial generate-thumbnail! file) executor))] (sto/put-object! storage params))) @@ -426,13 +427,15 @@ (defn derive-password [cfg password] (when password - (-> (climit/configure cfg :derive-password) - (climit/submit! (partial auth/derive-password password))))) + (-> (climit/configure cfg :derive-password/global) + (climit/run! (partial auth/derive-password password) + (::wrk/executor cfg))))) (defn verify-password [cfg password password-data] - (-> (climit/configure cfg :derive-password) - (climit/submit! (partial auth/verify-password password password-data)))) + (-> (climit/configure cfg :derive-password/global) + (climit/run! (partial auth/verify-password password password-data) + (::wrk/executor cfg)))) (defn decode-row [{:keys [props] :as row}] From ca6738d20cce248bd238d7efd42c4c361ad625b8 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 14:54:10 +0100 Subject: [PATCH 03/14] :sparkles: Remove executor dependency from awsns handlers --- backend/src/app/http/awsns.clj | 7 +++---- backend/src/app/main.clj | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/backend/src/app/http/awsns.clj b/backend/src/app/http/awsns.clj index f9e9179b11..7508be8a2d 100644 --- a/backend/src/app/http/awsns.clj +++ b/backend/src/app/http/awsns.clj @@ -31,14 +31,13 @@ (defmethod ig/pre-init-spec ::routes [_] (s/keys :req [::http/client ::main/props - ::db/pool - ::wrk/executor])) + ::db/pool])) (defmethod ig/init-key ::routes - [_ {:keys [::wrk/executor] :as cfg}] + [_ cfg] (letfn [(handler [request] (let [data (-> request rreq/body slurp)] - (px/run! executor #(handle-request cfg data))) + (px/run! :vthread (partial handle-request cfg data))) {::rres/status 200})] ["/sns" {:handler handler :allowed-methods #{:post}}])) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 125f238d34..ebf18468e7 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -228,8 +228,7 @@ ::http.awsns/routes {::props (ig/ref ::setup/props) ::db/pool (ig/ref ::db/pool) - ::http.client/client (ig/ref ::http.client/client) - ::wrk/executor (ig/ref ::wrk/executor)} + ::http.client/client (ig/ref ::http.client/client)} ::http/server {::http/port (cf/get :http-server-port) From d241b73940602c0f7b047ffbfddbc042a792fe07 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 14:59:25 +0100 Subject: [PATCH 04/14] :fire: Remove executor internal dependency on http client module --- backend/src/app/http/client.clj | 8 +++----- backend/src/app/main.clj | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/backend/src/app/http/client.clj b/backend/src/app/http/client.clj index cf30dbb46d..5b4a8541c8 100644 --- a/backend/src/app/http/client.clj +++ b/backend/src/app/http/client.clj @@ -8,7 +8,6 @@ "Http client abstraction layer." (:require [app.common.spec :as us] - [app.worker :as wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] [java-http-clj.core :as http] @@ -21,12 +20,11 @@ (s/keys :req [::client])) (defmethod ig/pre-init-spec ::client [_] - (s/keys :req [::wrk/executor])) + (s/keys :req [])) (defmethod ig/init-key ::client - [_ {:keys [::wrk/executor] :as cfg}] - (http/build-client {:executor executor - :connect-timeout 30000 ;; 10s + [_ _] + (http/build-client {:connect-timeout 30000 ;; 10s :follow-redirects :always})) (defn send! diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ebf18468e7..dc93f1773d 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -217,7 +217,7 @@ {::db/pool (ig/ref ::db/pool)} ::http.client/client - {::wrk/executor (ig/ref ::wrk/executor)} + {} ::session/manager {::db/pool (ig/ref ::db/pool)} From bc01afe158ded5bd1ae3fcbf6ce17a71a973b769 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 15:00:16 +0100 Subject: [PATCH 05/14] :fire: Remove executor internal dependency from debug module --- backend/src/app/main.clj | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index dc93f1773d..11cb9b72ef 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -289,12 +289,10 @@ ::http.debug/routes {::db/pool (ig/ref ::db/pool) - ::wrk/executor (ig/ref ::wrk/executor) ::session/manager (ig/ref ::session/manager) ::sto/storage (ig/ref ::sto/storage) ::props (ig/ref ::setup/props)} - ::http.ws/routes {::db/pool (ig/ref ::db/pool) ::mtx/metrics (ig/ref ::mtx/metrics) From 97f8315cd0000ba007694b0cd5326aaa1ee1f32f Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 15:00:40 +0100 Subject: [PATCH 06/14] :fire: Remove executor internal dependency from http assets module --- backend/src/app/main.clj | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 11cb9b72ef..160f9d1ba6 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -303,8 +303,7 @@ {::http.assets/path (cf/get :assets-path) ::http.assets/cache-max-age (dt/duration {:hours 24}) ::http.assets/cache-max-agesignature-max-age (dt/duration {:hours 24 :minutes 5}) - ::sto/storage (ig/ref ::sto/storage) - ::wrk/executor (ig/ref ::wrk/executor)} + ::sto/storage (ig/ref ::sto/storage)} :app.rpc/climit {::mtx/metrics (ig/ref ::mtx/metrics)} From da7f88c7ca1ba0d8c456af7d3bec447e77401ddc Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 15:14:45 +0100 Subject: [PATCH 07/14] :fire: Remove executor internal dependency on storage module --- backend/src/app/main.clj | 1 - backend/src/app/storage.clj | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 160f9d1ba6..c9725d27ac 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -439,7 +439,6 @@ ::sto/storage {::db/pool (ig/ref ::db/pool) - ::wrk/executor (ig/ref ::wrk/executor) ::sto/backends {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) :assets-fs (ig/ref [::assets :app.storage.fs/backend])}} diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index be0159e0fa..f17d1e245a 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -18,7 +18,6 @@ [app.storage.impl :as impl] [app.storage.s3 :as ss3] [app.util.time :as dt] - [app.worker :as wrk] [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] @@ -40,7 +39,7 @@ :fs ::sfs/backend)))) (defmethod ig/pre-init-spec ::storage [_] - (s/keys :req [::db/pool ::wrk/executor ::backends])) + (s/keys :req [::db/pool ::backends])) (defmethod ig/init-key ::storage [_ {:keys [::backends ::db/pool] :as cfg}] From 1bd32327e5c58daf72ed5ff68717fca777ceb37e Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 15:15:35 +0100 Subject: [PATCH 08/14] :fire: Remove executor internal dependency on rpc routes module --- backend/src/app/main.clj | 1 - backend/src/app/rpc.clj | 5 +---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index c9725d27ac..a37836a71f 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -337,7 +337,6 @@ :app.rpc/routes {::rpc/methods (ig/ref :app.rpc/methods) ::db/pool (ig/ref ::db/pool) - ::wrk/executor (ig/ref ::wrk/executor) ::session/manager (ig/ref ::session/manager) ::props (ig/ref ::setup/props)} diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 2ede5681f1..d514649272 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -30,7 +30,6 @@ [app.storage :as-alias sto] [app.util.services :as sv] [app.util.time :as dt] - [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] @@ -239,8 +238,7 @@ ::ldap/provider ::sto/storage ::mtx/metrics - ::main/props - ::wrk/executor] + ::main/props] :opt [::climit ::rlimit] :req-un [::db/pool])) @@ -259,7 +257,6 @@ (s/keys :req [::methods ::db/pool ::main/props - ::wrk/executor ::session/manager])) (defmethod ig/init-key ::routes From c64e14859caa2d8b460b5ddb96514a1d4967f601 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 15:35:59 +0100 Subject: [PATCH 09/14] :sparkles: Simplify internal executor module --- backend/src/app/main.clj | 14 ++---- backend/src/app/storage/impl.clj | 4 +- backend/src/app/worker.clj | 78 ++++++++++++++++---------------- 3 files changed, 43 insertions(+), 53 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index a37836a71f..4411bea85c 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -161,12 +161,7 @@ ::mdef/help "Current number of threads with state RUNNING." ::mdef/labels ["name"] ::mdef/type :gauge} - - :executors-queued-submissions - {::mdef/name "penpot_executors_queued_submissions" - ::mdef/help "Current number of queued submissions." - ::mdef/labels ["name"] - ::mdef/type :gauge}}) + }) (def system-config {::db/pool @@ -180,13 +175,12 @@ ;; Default thread pool for IO operations ::wrk/executor - {::wrk/parallelism (cf/get :default-executor-parallelism - (+ 3 (* (px/get-available-processors) 3)))} + {} ::wrk/monitor {::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/name "default" - ::wrk/executor (ig/ref ::wrk/executor)} + ::wrk/executor (ig/ref ::wrk/executor) + ::wrk/name "default"} :app.migrations/migrations {::db/pool (ig/ref ::db/pool)} diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj index 4a564b58f3..9dc7facc14 100644 --- a/backend/src/app/storage/impl.clj +++ b/backend/src/app/storage/impl.clj @@ -11,7 +11,6 @@ [app.common.exceptions :as ex] [app.db :as-alias db] [app.storage :as-alias sto] - [app.worker :as-alias wrk] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] [clojure.java.io :as jio] @@ -201,7 +200,7 @@ (str "blake2b:" result))) (defn resolve-backend - [{:keys [::db/pool ::wrk/executor] :as storage} backend-id] + [{:keys [::db/pool] :as storage} backend-id] (let [backend (get-in storage [::sto/backends backend-id])] (when-not backend (ex/raise :type :internal @@ -209,7 +208,6 @@ :hint (dm/fmt "backend '%' not configured" backend-id))) (-> backend (assoc ::sto/id backend-id) - (assoc ::wrk/executor executor) (assoc ::db/pool pool)))) (defrecord StorageObject [id size created-at expired-at touched-at backend]) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 448fbaaec1..a6f9202200 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -25,43 +25,45 @@ [promesa.core :as p] [promesa.exec :as px]) (:import - java.util.concurrent.ExecutorService - java.util.concurrent.ForkJoinPool + java.util.concurrent.ThreadPoolExecutor + java.util.concurrent.Executor java.util.concurrent.Future)) (set! *warn-on-reflection* true) -(s/def ::executor #(instance? ExecutorService %)) +(s/def ::executor #(instance? Executor %)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::parallelism ::us/integer) - (defmethod ig/pre-init-spec ::executor [_] - (s/keys :req [::parallelism])) + (s/keys :req [])) (defmethod ig/init-key ::executor - [skey {:keys [::parallelism]}] - (let [prefix (if (vector? skey) (-> skey first name) "default") - tname (str "penpot/" prefix "/%s") - ttype (cf/get :worker-executor-type :fjoin)] - (case ttype - :fjoin - (let [factory (px/forkjoin-thread-factory :name tname)] - (px/forkjoin-executor {:factory factory - :core-size (px/get-available-processors) - :parallelism parallelism - :async true})) + [_ _] + (let [factory (px/thread-factory :prefix "penpot/default/") + executor (px/cached-executor :factory factory :keepalive 30000)] + (l/inf :hint "starting executor") + (reify + java.lang.AutoCloseable + (close [_] + (l/inf :hint "stoping executor") + (px/shutdown! executor)) - :cached - (let [factory (px/thread-factory :name tname)] - (px/cached-executor :factory factory))))) + clojure.lang.IDeref + (deref [_] + {:active (.getPoolSize ^ThreadPoolExecutor executor) + :running (.getActiveCount ^ThreadPoolExecutor executor) + :completed (.getCompletedTaskCount ^ThreadPoolExecutor executor)}) + + Executor + (execute [_ runnable] + (.execute ^Executor executor ^Runnable runnable))))) (defmethod ig/halt-key! ::executor [_ instance] - (px/shutdown! instance)) + (.close ^java.lang.AutoCloseable instance)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; TASKS REGISTRY @@ -111,42 +113,38 @@ (defmethod ig/init-key ::monitor [_ {:keys [::executor ::mtx/metrics ::interval ::name]}] - (letfn [(monitor! [^ForkJoinPool executor prev-steals] - (let [running (.getRunningThreadCount executor) - queued (.getQueuedSubmissionCount executor) - active (.getPoolSize executor) - steals (.getStealCount executor) - labels (into-array String [(d/name name)]) + (letfn [(monitor! [executor prev-completed] + (let [labels (into-array String [(d/name name)]) + stats (deref executor) - steals-inc (- steals prev-steals) - steals-inc (if (neg? steals-inc) 0 steals-inc)] + completed (:completed stats) + completed-inc (- completed prev-completed) + completed-inc (if (neg? completed-inc) 0 completed-inc)] (mtx/run! metrics :id :executor-active-threads :labels labels - :val active) + :val (:active stats)) + (mtx/run! metrics :id :executor-running-threads - :labels labels :val running) - (mtx/run! metrics - :id :executors-queued-submissions :labels labels - :val queued) + :val (:running stats)) + (mtx/run! metrics :id :executors-completed-tasks :labels labels - :inc steals-inc) + :inc completed-inc) - steals))] + completed-inc))] (px/thread {:name "penpot/executors-monitor" :virtual true} (l/inf :hint "monitor: started" :name name) (try - (loop [steals 0] - (when-not (px/shutdown? executor) - (px/sleep interval) - (recur (long (monitor! executor steals))))) + (loop [completed 0] + (px/sleep interval) + (recur (long (monitor! executor completed)))) (catch InterruptedException _cause (l/trc :hint "monitor: interrupted" :name name)) (catch Throwable cause From 2295d085d39e0733814060b2592d8fc8c9027271 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 27 Nov 2023 09:50:47 +0100 Subject: [PATCH 10/14] :zap: Improve performance on error formating and reporting --- backend/src/app/http/errors.clj | 24 ++++------------ backend/src/app/loggers/database.clj | 16 +++++------ backend/test/backend_tests/helpers.clj | 4 +-- common/deps.edn | 4 +++ common/src/app/common/exceptions.cljc | 35 ++++++++++++------------ common/src/app/common/files/builder.cljc | 2 +- common/src/app/common/pprint.cljc | 20 +++++++------- common/src/app/common/schema.cljc | 13 ++++----- frontend/src/app/main/errors.cljs | 7 ++--- 9 files changed, 56 insertions(+), 69 deletions(-) diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj index 5bb14cc37a..93c67845b9 100644 --- a/backend/src/app/http/errors.clj +++ b/backend/src/app/http/errors.clj @@ -9,7 +9,7 @@ (:require [app.common.exceptions :as ex] [app.common.logging :as l] - [app.common.schema :as sm] + [app.common.schema :as-alias sm] [app.config :as cf] [app.http :as-alias http] [app.http.access-token :as-alias actoken] @@ -79,29 +79,15 @@ [err request parent-cause] (let [{:keys [code] :as data} (ex-data err)] (cond - (= code :spec-validation) + (or (= code :spec-validation) + (= code :params-validation) + (= code :data-validation)) (let [explain (ex/explain data)] {::rres/status 400 ::rres/body (-> data - (dissoc ::s/problems ::s/value ::s/spec) + (dissoc ::s/problems ::s/value ::s/spec ::sm/explain) (cond-> explain (assoc :explain explain)))}) - (= code :params-validation) - (let [explain (::sm/explain data) - explain (sm/humanize-data explain)] - {::rres/status 400 - ::rres/body (-> data - (dissoc ::sm/explain) - (assoc :explain explain))}) - - (= code :data-validation) - (let [explain (::sm/explain data) - explain (sm/humanize-data explain)] - {::rres/status 400 - ::rres/body (-> data - (dissoc ::sm/explain) - (assoc :explain explain))}) - (= code :request-body-too-large) {::rres/status 413 ::rres/body data} diff --git a/backend/src/app/loggers/database.clj b/backend/src/app/loggers/database.clj index 77694da276..e2892d13f8 100644 --- a/backend/src/app/loggers/database.clj +++ b/backend/src/app/loggers/database.clj @@ -56,22 +56,22 @@ (dissoc :request/params :value :params :data))] (merge {:context (-> (into (sorted-map) ctx) - (pp/pprint-str :width 200 :length 50 :level 10)) - :props (pp/pprint-str props :width 200 :length 50) + (pp/pprint-str :length 50)) + :props (pp/pprint-str props :length 50) :hint (or (ex-message cause) @message) :trace (or (::trace record) (ex/format-throwable cause :data? false :explain? false :header? false :summary? false))} (when-let [params (or (:request/params context) (:params context))] - {:params (pp/pprint-str params :width 200 :length 50 :level 10)}) + {:params (pp/pprint-str params :length 30 :level 12)}) (when-let [value (:value context)] - {:value (pp/pprint-str value :width 200 :length 50 :level 10)}) + {:value (pp/pprint-str value :length 30 :level 12)}) (when-let [data (some-> data (dissoc ::s/problems ::s/value ::s/spec ::sm/explain :hint))] - {:data (pp/pprint-str data :width 200)}) + {:data (pp/pprint-str data :length 30 :level 12)}) - (when-let [explain (ex/explain data {:level 8 :length 20})] + (when-let [explain (ex/explain data :length 30 :level 12)] {:explain explain}))))) (defn error-record? @@ -96,11 +96,11 @@ (defmethod ig/init-key ::reporter [_ cfg] - (let [input (sp/chan :buf (sp/sliding-buffer 32) + (let [input (sp/chan :buf (sp/sliding-buffer 64) :xf (filter error-record?))] (add-watch l/log-record ::reporter #(sp/put! input %4)) - (px/thread {:name "penpot/database-reporter" :virtual true} + (px/thread {:name "penpot/database-reporter"} (l/info :hint "initializing database error persistence") (try (loop [] diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 6523477db3..7273ff7dd7 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -429,11 +429,11 @@ (= :params-validation (:code data)) (app.common.pprint/pprint - (sm/humanize-data (::sm/explain data))) + (sm/humanize-explain (::sm/explain data))) (= :data-validation (:code data)) (app.common.pprint/pprint - (sm/humanize-data (::sm/explain data))) + (sm/humanize-explain (::sm/explain data))) (= :service-error (:type data)) (print-error! (.getCause ^Throwable error)) diff --git a/common/deps.edn b/common/deps.edn index 41f1ced96a..29aa6e1d83 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -48,6 +48,10 @@ ;; exception printing fipp/fipp {:mvn/version "0.6.26"} + io.github.eerohele/pp + {:git/tag "2023-11-25.47" + :git/sha "15d572c"} + io.aviso/pretty {:mvn/version "1.4.4"} environ/environ {:mvn/version "1.2.0"}} :paths ["src" "vendor" "target/classes"] diff --git a/common/src/app/common/exceptions.cljc b/common/src/app/common/exceptions.cljc index b68a77d1ae..a5b9e47c69 100644 --- a/common/src/app/common/exceptions.cljc +++ b/common/src/app/common/exceptions.cljc @@ -65,23 +65,22 @@ (instance? RuntimeException v))) (defn explain - ([data] (explain data nil)) - ([data opts] - (cond - ;; NOTE: a special case for spec validation errors on integrant - (and (= (:reason data) :integrant.core/build-failed-spec) - (contains? data :explain)) - (explain (:explain data) opts) + [data & {:as opts}] + (cond + ;; NOTE: a special case for spec validation errors on integrant + (and (= (:reason data) :integrant.core/build-failed-spec) + (contains? data :explain)) + (explain (:explain data) opts) - (and (contains? data ::s/problems) - (contains? data ::s/value) - (contains? data ::s/spec)) - (binding [s/*explain-out* expound/printer] - (with-out-str - (s/explain-out (update data ::s/problems #(take (:length opts 10) %))))) + (and (contains? data ::s/problems) + (contains? data ::s/value) + (contains? data ::s/spec)) + (binding [s/*explain-out* expound/printer] + (with-out-str + (s/explain-out (update data ::s/problems #(take (:length opts 10) %))))) - (contains? data ::sm/explain) - (sm/humanize-data (::sm/explain data) opts)))) + (contains? data ::sm/explain) + (sm/humanize-explain (::sm/explain data) opts))) #?(:clj (defn format-throwable @@ -92,8 +91,8 @@ data? true explain? true chain? true - data-length 10 - data-level 4}}] + data-length 8 + data-level 5}}] (letfn [(print-trace-element [^StackTraceElement e] (let [class (.getClassName e) @@ -115,7 +114,7 @@ (print-data [data] (when (seq data) (print " dt: ") - (let [[line & lines] (str/lines (pp/pprint-str data :level data-level :length data-length ))] + (let [[line & lines] (str/lines (pp/pprint-str data :level data-level :length data-length))] (print line) (newline) (doseq [line lines] diff --git a/common/src/app/common/files/builder.cljc b/common/src/app/common/files/builder.cljc index d999a05fd2..46c1e91317 100644 --- a/common/src/app/common/files/builder.cljc +++ b/common/src/app/common/files/builder.cljc @@ -50,7 +50,7 @@ (when-not valid? (let [explain (sm/explain ::ch/change change)] - (pp/pprint (sm/humanize-data explain)) + (pp/pprint (sm/humanize-explain explain)) (when fail-on-spec? (ex/raise :type :assertion :code :data-validation diff --git a/common/src/app/common/pprint.cljc b/common/src/app/common/pprint.cljc index c17bc54736..66925e7efc 100644 --- a/common/src/app/common/pprint.cljc +++ b/common/src/app/common/pprint.cljc @@ -7,16 +7,16 @@ (ns app.common.pprint (:refer-clojure :exclude [prn]) (:require - [fipp.edn :as fpp])) - -(defn pprint-str - [expr & {:keys [width level length] - :or {width 110 level 8 length 25}}] - (binding [*print-level* level - *print-length* length] - (with-out-str - (fpp/pprint expr {:width width})))) + [me.flowthing.pp :as pp])) (defn pprint + [expr & {:keys [width level length] + :or {width 120 level 8 length 25}}] + (binding [*print-level* level + *print-length* length] + (pp/pprint expr {:max-width width}))) + +(defn pprint-str [expr & {:as opts}] - (println (pprint-str expr opts))) + (with-out-str + (pprint expr opts))) diff --git a/common/src/app/common/schema.cljc b/common/src/app/common/schema.cljc index 1f939f15ec..f1a86f7c2f 100644 --- a/common/src/app/common/schema.cljc +++ b/common/src/app/common/schema.cljc @@ -152,19 +152,18 @@ (let [vfn (delay (decoder (if (delay? s) (deref s) s) transformer))] (fn [v] (@vfn v)))) -(defn humanize-data +(defn humanize-explain [{:keys [schema errors value]} & {:keys [length level]}] (let [errors (mapv #(update % :schema form) errors)] (with-out-str (println "Schema: ") - (println (pp/pprint-str (form schema) {:level (d/nilv level 10) - :length (d/nilv length 10)})) + (println (pp/pprint-str (form schema) {:width 100 :level 15 :length 20})) (println "Errors:") - (println (pp/pprint-str errors {:level (d/nilv level 10) - :length (d/nilv length 10)})) + (println (pp/pprint-str errors {:width 100 :level 15 :length 20})) (println "Value:") - (println (pp/pprint-str value {:level (d/nilv level 5) - :length (d/nilv length 10)}))))) + (println (pp/pprint-str value {:width 160 + :level (d/nilv level 8) + :length (d/nilv length 12)}))))) (defn pretty-explain [s d] diff --git a/frontend/src/app/main/errors.cljs b/frontend/src/app/main/errors.cljs index c6a25644aa..f1c85b9f20 100644 --- a/frontend/src/app/main/errors.cljs +++ b/frontend/src/app/main/errors.cljs @@ -9,7 +9,7 @@ (:require [app.common.exceptions :as ex] [app.common.pprint :as pp] - [app.common.schema :as sm] + [app.common.schema :as-alias sm] [app.main.data.messages :as msg] [app.main.data.modal :as modal] [app.main.data.users :as du] @@ -33,9 +33,8 @@ (defn- print-explain! [data] - (when-let [explain (::sm/explain data)] - (js/console.log (sm/humanize-data explain))) - (when-let [explain (:explain data)] + (when-let [explain (or (ex/explain data) + (:explain data))] (js/console.log explain))) (defn- print-trace! From 81dc76bb146f62f6019ae01e1b63efac6db3550d Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 27 Nov 2023 16:09:18 +0100 Subject: [PATCH 11/14] :zap: Add performance improvements on schema validation system --- common/src/app/common/schema.cljc | 114 ++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 39 deletions(-) diff --git a/common/src/app/common/schema.cljc b/common/src/app/common/schema.cljc index f1a86f7c2f..8fc44587f8 100644 --- a/common/src/app/common/schema.cljc +++ b/common/src/app/common/schema.cljc @@ -16,7 +16,7 @@ [app.common.schema.registry :as sr] [app.common.uri :as u] [app.common.uuid :as uuid] - [clojure.test.check.generators :as tgen] + [clojure.core :as c] [cuerdas.core :as str] [malli.core :as m] [malli.dev.pretty :as mdp] @@ -26,7 +26,12 @@ [malli.transform :as mt] [malli.util :as mu])) -(defprotocol ISchemaOps +(defprotocol ILazySchema + (-get-schema [_]) + (-get-validator [_]) + (-get-explainer [_]) + (-get-decoder [_]) + (-get-encoder [_]) (-validate [_ o]) (-explain [_ o]) (-decode [_ o])) @@ -34,21 +39,31 @@ (def default-options {:registry sr/default-registry}) -(defn explain - [s value] - (m/explain s value default-options)) - (defn schema? [o] (m/schema? o)) +(defn lazy-schema? + [s] + (satisfies? ILazySchema s)) + (defn schema [s] - (m/schema s default-options)) + (if (lazy-schema? s) + (-get-schema s) + (m/schema s default-options))) (defn validate [s value] - (m/validate s value default-options)) + (if (lazy-schema? s) + (-validate s value) + (m/validate s value default-options))) + +(defn explain + [s value] + (if (lazy-schema? s) + (-explain s value) + (m/explain s value default-options))) (defn humanize [exp] @@ -113,11 +128,15 @@ (defn validator [s] - (-> s schema m/validator)) + (if (lazy-schema? s) + (-get-validator s) + (-> s schema m/validator))) (defn explainer [s] - (-> s schema m/explainer)) + (if (lazy-schema? s) + (-get-explainer s) + (-> s schema m/explainer))) (defn encode ([s val transformer] @@ -131,9 +150,23 @@ ([s val options transformer] (m/decode s val options transformer))) -(defn decoder +(defn encoder + ([s] + (if (lazy-schema? s) + (-get-decoder s) + (encoder s default-options default-transformer))) ([s transformer] - (m/decoder s default-options transformer)) + (m/encoder s default-options transformer)) + ([s options transformer] + (m/encoder s options transformer))) + +(defn decoder + ([s] + (if (lazy-schema? s) + (-get-decoder s) + (decoder s default-options default-transformer))) + ([s transformer] + (m/decoder s default-options transformer)) ([s options transformer] (m/decoder s options transformer))) @@ -201,10 +234,8 @@ ([s] (lookup sr/default-registry s)) ([registry s] (schema (mr/schema registry s)))) -(declare define) - (defn fast-check! - "A fast path for checking process, assumes the ISchemaOps protocol + "A fast path for checking process, assumes the ILazySchema protocol implemented on the provided `s` schema. Sould not be used directly." [s value] (when-not ^boolean (-validate s value) @@ -216,10 +247,12 @@ ::explain explain})))) true) +(declare define) + (defn check-fn "Create a predefined check function" [s] - (let [schema (if (satisfies? ISchemaOps s) s (define s))] + (let [schema (if (lazy-schema? s) s (define s))] (partial fast-check! schema))) (defn check! @@ -227,7 +260,7 @@ schema over provided data. Raises an assertion exception, should be used together with `dm/assert!` or `dm/verify!`." [s value] - (if (satisfies? ISchemaOps s) + (if (lazy-schema? s) (fast-check! s value) (do (when-not ^boolean (m/validate s value default-options) @@ -241,7 +274,7 @@ (defn fast-validate! - "A fast path for validation process, assumes the ISchemaOps protocol + "A fast path for validation process, assumes the ILazySchema protocol implemented on the provided `s` schema. Sould not be used directly." ([s value] (fast-validate! s value nil)) ([s value options] @@ -257,14 +290,14 @@ (defn validate-fn "Create a predefined validate function" [s] - (let [schema (if (satisfies? ISchemaOps s) s (define s))] + (let [schema (if (lazy-schema? s) s (define s))] (partial fast-validate! schema))) (defn validate! "A generic validation function for predefined schemas." ([s value] (validate! s value nil)) ([s value options] - (if (satisfies? ISchemaOps s) + (if (lazy-schema? s) (fast-validate! s value options) (when-not ^boolean (m/validate s value default-options) (let [explain (explain s value) @@ -277,7 +310,7 @@ (defn conform! [schema value] - (assert (satisfies? ISchemaOps schema) "expected `schema` to satisfy ISchemaOps protocol") + (assert (lazy-schema? schema) "expected `schema` to satisfy ILazySchema protocol") (let [params (-decode schema value)] (fast-validate! schema params nil) params)) @@ -295,11 +328,16 @@ nil) (defn define - [s] - (let [schema (delay (schema s)) - validator (delay (validator @schema)) - explainer (delay (explainer @schema)) - decoder (delay (decoder @schema default-transformer))] + "Create ans instance of ILazySchema" + [s & {:keys [transformer] :as options}] + (let [schema (delay (schema s)) + validator (delay (m/validator @schema)) + explainer (delay (m/explainer @schema)) + + options (c/merge default-options (dissoc options :transformer)) + transformer (or transformer default-transformer) + decoder (delay (m/decoder @schema options transformer)) + encoder (delay (m/encoder @schema options transformer))] (reify m/AST @@ -341,7 +379,17 @@ (-form [_] (m/-form @schema)) - ISchemaOps + ILazySchema + (-get-schema [_] + @schema) + (-get-validator [_] + @validator) + (-get-explainer [_] + @explainer) + (-get-encoder [_] + @encoder) + (-get-decoder [_] + @decoder) (-validate [_ o] (@validator o)) (-explain [_ o] @@ -349,18 +397,6 @@ (-decode [_ o] (@decoder o))))) -;; --- GENERATORS - -;; FIXME: replace with sg/subseq -(defn gen-set-from-choices - [choices] - (->> tgen/nat - (tgen/fmap (fn [i] - (into #{} - (map (fn [_] (rand-nth choices))) - (range i)))))) - - ;; --- BUILTIN SCHEMAS (define! :merge (mu/-merge)) From 9b36ea99e6854284a17b20d0ed864ac9548315ea Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 27 Nov 2023 16:10:07 +0100 Subject: [PATCH 12/14] :sparkles: Integrate backend rpc handlers to use schema improvements --- backend/src/app/rpc.clj | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index d514649272..d4b3a4baaf 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -142,14 +142,15 @@ (defn- wrap-params-validation [_ f mdata] (if-let [schema (::sm/params mdata)] - (let [schema (sm/schema schema) - valid? (sm/validator schema) - explain (sm/explainer schema) - decode (sm/decoder schema sm/default-transformer)] - + (let [schema (if (sm/lazy-schema? schema) + schema + (sm/define schema)) + validate (sm/validator schema) + explain (sm/explainer schema) + decode (sm/decoder schema)] (fn [cfg params] (let [params (decode params)] - (if (valid? params) + (if (validate params) (f cfg params) (ex/raise :type :validation :code :params-validation @@ -160,13 +161,15 @@ [_ f mdata] (if (contains? cf/flags :rpc-output-validation) (or (when-let [schema (::sm/result mdata)] - (let [schema (sm/schema schema) - valid? (sm/validator schema) - explain (sm/explainer schema)] + (let [schema (if (sm/lazy-schema? schema) + schema + (sm/define schema)) + validate (sm/validator schema) + explain (sm/explainer schema)] (fn [cfg params] (let [response (f cfg params)] (when (map? response) - (when-not (valid? response) + (when-not (validate response) (ex/raise :type :validation :code :data-validation ::sm/explain (explain response)))) From a91b2f1133e1bbb5c1b144545fa285a2e1ccb15b Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 27 Nov 2023 16:10:30 +0100 Subject: [PATCH 13/14] :sparkles: Apply schema improvements to profile rpc methods --- backend/src/app/rpc/commands/profile.clj | 80 ++++++++++++++---------- 1 file changed, 46 insertions(+), 34 deletions(-) diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index 1a11e57f2f..2b8dc18a99 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -37,21 +37,23 @@ (declare strip-private-attrs) (declare verify-password) -(def ^:private schema:profile - [:map {:title "Profile"} - [:id ::sm/uuid] - [:fullname [::sm/word-string {:max 250}]] - [:email ::sm/email] - [:is-active {:optional true} :boolean] - [:is-blocked {:optional true} :boolean] - [:is-demo {:optional true} :boolean] - [:is-muted {:optional true} :boolean] - [:created-at {:optional true} ::sm/inst] - [:modified-at {:optional true} ::sm/inst] - [:default-project-id {:optional true} ::sm/uuid] - [:default-team-id {:optional true} ::sm/uuid] - [:props {:optional true} - [:map-of {:title "ProfileProps"} :keyword :any]]]) +(def ^:private + schema:profile + (sm/define + [:map {:title "Profile"} + [:id ::sm/uuid] + [:fullname [::sm/word-string {:max 250}]] + [:email ::sm/email] + [:is-active {:optional true} :boolean] + [:is-blocked {:optional true} :boolean] + [:is-demo {:optional true} :boolean] + [:is-muted {:optional true} :boolean] + [:created-at {:optional true} ::sm/inst] + [:modified-at {:optional true} ::sm/inst] + [:default-project-id {:optional true} ::sm/uuid] + [:default-team-id {:optional true} ::sm/uuid] + [:props {:optional true} + [:map-of {:title "ProfileProps"} :keyword :any]]])) ;; --- QUERY: Get profile (own) @@ -79,11 +81,13 @@ ;; --- MUTATION: Update Profile (own) -(def schema:update-profile - [:map {:title "update-profile"} - [:fullname [::sm/word-string {:max 250}]] - [:lang {:optional true} [:string {:max 5}]] - [:theme {:optional true} [:string {:max 250}]]]) +(def ^:private + schema:update-profile + (sm/define + [:map {:title "update-profile"} + [:fullname [::sm/word-string {:max 250}]] + [:lang {:optional true} [:string {:max 5}]] + [:theme {:optional true} [:string {:max 250}]]])) (sv/defmethod ::update-profile {::doc/added "1.0" @@ -124,11 +128,13 @@ (declare update-profile-password!) (declare invalidate-profile-session!) -(def schema:update-profile-password - [:map {:title "update-profile-password"} - [:password [::sm/word-string {:max 500}]] - ;; Social registered users don't have old-password - [:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]]) +(def ^:private + schema:update-profile-password + (sm/define + [:map {:title "update-profile-password"} + [:password [::sm/word-string {:max 500}]] + ;; Social registered users don't have old-password + [:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]])) (sv/defmethod ::update-profile-password {:doc/added "1.0" @@ -178,9 +184,11 @@ (declare upload-photo) (declare update-profile-photo) -(def schema:update-profile-photo - [:map {:title "update-profile-photo"} - [:file ::media/upload]]) +(def ^:private + schema:update-profile-photo + (sm/define + [:map {:title "update-profile-photo"} + [:file ::media/upload]])) (sv/defmethod ::update-profile-photo {:doc/added "1.1" @@ -242,9 +250,11 @@ (declare ^:private request-email-change!) (declare ^:private change-email-immediately!) -(def schema:request-email-change - [:map {:title "request-email-change"} - [:email ::sm/email]]) +(def ^:private + schema:request-email-change + (sm/define + [:map {:title "request-email-change"} + [:email ::sm/email]])) (sv/defmethod ::request-email-change {::doc/added "1.0" @@ -309,9 +319,11 @@ ;; --- MUTATION: Update Profile Props -(def schema:update-profile-props - [:map {:title "update-profile-props"} - [:props [:map-of :keyword :any]]]) +(def ^:private + schema:update-profile-props + (sm/define + [:map {:title "update-profile-props"} + [:props [:map-of :keyword :any]]])) (sv/defmethod ::update-profile-props {::doc/added "1.0" From 6c8ea5d8992a0f48be8a65ecbdef67db48ade727 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 27 Nov 2023 16:16:38 +0100 Subject: [PATCH 14/14] :bug: Fix issues on exporter configuration validation --- exporter/src/app/config.cljs | 37 +++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/exporter/src/app/config.cljs b/exporter/src/app/config.cljs index 946f2248a8..4c00880776 100644 --- a/exporter/src/app/config.cljs +++ b/exporter/src/app/config.cljs @@ -24,16 +24,18 @@ :tempdir "/tmp/penpot-exporter" :redis-uri "redis://redis/0"}) -(def ^:private schema:config - [:map {:title "config"} - [:public-uri {:optional true} ::sm/uri] - [:host {:optional true} :string] - [:tenant {:optional true} :string] - [:flags {:optional true} ::sm/set-of-keywords] - [:redis-uri {:optional true} :string] - [:tempdir {:optional true} :string] - [:browser-pool-max {:optional true} :int] - [:browser-pool-min {:optional true} :int]]) +(def ^:private + schema:config + (sm/define + [:map {:title "config"} + [:public-uri {:optional true} ::sm/uri] + [:host {:optional true} :string] + [:tenant {:optional true} :string] + [:flags {:optional true} ::sm/set-of-keywords] + [:redis-uri {:optional true} :string] + [:tempdir {:optional true} :string] + [:browser-pool-max {:optional true} :int] + [:browser-pool-min {:optional true} :int]])) (defn- parse-flags [config] @@ -58,14 +60,15 @@ [] (let [env (read-env "penpot") env (d/without-nils env) - data (merge defaults env) - data (sm/decode schema:config data sm/default-transformer)] + data (merge defaults env)] - (when-not (sm/validate schema:config data) - (println (sm/humanize-data schema:config data)) - (process/exit -1)) - - data)) + (try + (sm/conform! schema:config data) + (catch :default cause + (if-let [explain (some->> cause ex-data ::sm/explain)] + (println (sm/humanize-explain explain)) + (js/console.error cause)) + (process/exit -1))))) (def config (prepare-config))