Merge pull request #3854 from penpot/niwinz-develop-yetti-update

 Update yetti and simplify internal worker module
This commit is contained in:
Alejandro 2023-11-29 12:01:33 +01:00 committed by GitHub
commit 7404933e99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 797 additions and 811 deletions

View file

@ -21,8 +21,8 @@
java-http-clj/java-http-clj {:mvn/version "0.4.3"} java-http-clj/java-http-clj {:mvn/version "0.4.3"}
funcool/yetti funcool/yetti
{:git/tag "v9.16" {:git/tag "v10.0"
:git/sha "7df3e08" :git/sha "520613f"
:git/url "https://github.com/funcool/yetti.git" :git/url "https://github.com/funcool/yetti.git"
:exclusions [org.slf4j/slf4j-api]} :exclusions [org.slf4j/slf4j-api]}

View file

@ -3,15 +3,17 @@
;; Optional: queue, ommited means Integer/MAX_VALUE ;; Optional: queue, ommited means Integer/MAX_VALUE
;; Optional: timeout, ommited means no timeout ;; Optional: timeout, ommited means no timeout
;; Note: queue and timeout are excluding ;; Note: queue and timeout are excluding
{:update-file-by-id {:permits 1 :queue 3} {:update-file/by-profile
:update-file {:permits 20} {:permits 1 :queue 5}
:derive-password {:permits 8} :update-file/global {:permits 20}
:process-font {:permits 4 :queue 32}
:process-image {:permits 8 :queue 32}
:file-thumbnail-ops :derive-password/global {:permits 8}
:process-font/global {:permits 4}
:process-image/global {:permits 8}
:file-thumbnail-ops/by-profile
{:permits 2} {:permits 2}
:submit-audit-events-by-profile :submit-audit-events/by-profile
{:permits 1 :queue 3}} {:permits 1 :queue 3}}

View file

@ -31,7 +31,7 @@
<Logger name="app.rpc.rlimit" level="info" /> <Logger name="app.rpc.rlimit" level="info" />
<Logger name="app.rpc.climit" level="info" /> <Logger name="app.rpc.climit" level="info" />
<Logger name="app.rpc.mutations.files" level="info" /> <Logger name="app.rpc.mutations.files" level="info" />
<Logger name="app.common.files.migrations" level="debug" /> <Logger name="app.common.files.migrations" level="info" />
<Logger name="app.loggers" level="debug" additivity="false"> <Logger name="app.loggers" level="debug" additivity="false">
<AppenderRef ref="main" level="debug" /> <AppenderRef ref="main" level="debug" />

View file

@ -31,7 +31,7 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
[yetti.response :as-alias yrs])) [ring.response :as-alias rres]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HELPERS ;; HELPERS
@ -479,8 +479,8 @@
(defn- redirect-response (defn- redirect-response
[uri] [uri]
{::yrs/status 302 {::rres/status 302
::yrs/headers {"location" (str uri)}}) ::rres/headers {"location" (str uri)}})
(defn- generate-error-redirect (defn- generate-error-redirect
[_ cause] [_ cause]
@ -557,8 +557,8 @@
:props props :props props
:exp (dt/in-future "4h")}) :exp (dt/in-future "4h")})
uri (build-auth-uri cfg state)] uri (build-auth-uri cfg state)]
{::yrs/status 200 {::rres/status 200
::yrs/body {:redirect-uri uri}})) ::rres/body {:redirect-uri uri}}))
(defn- callback-handler (defn- callback-handler
[cfg request] [cfg request]

View file

@ -23,15 +23,14 @@
[app.metrics :as mtx] [app.metrics :as mtx]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[app.rpc.doc :as-alias rpc.doc] [app.rpc.doc :as-alias rpc.doc]
[app.worker :as wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec :as px] [promesa.exec :as px]
[reitit.core :as r] [reitit.core :as r]
[reitit.middleware :as rr] [reitit.middleware :as rr]
[yetti.adapter :as yt] [ring.request :as rreq]
[yetti.request :as yrq] [ring.response :as-alias rres]
[yetti.response :as-alias yrs])) [yetti.adapter :as yt]))
(declare router-handler) (declare router-handler)
@ -63,8 +62,7 @@
::max-multipart-body-size ::max-multipart-body-size
::router ::router
::handler ::handler
::io-threads ::io-threads]))
::wrk/executor]))
(defmethod ig/init-key ::server (defmethod ig/init-key ::server
[_ {:keys [::handler ::router ::host ::port] :as cfg}] [_ {:keys [::handler ::router ::host ::port] :as cfg}]
@ -75,11 +73,9 @@
:http/max-multipart-body-size (::max-multipart-body-size cfg) :http/max-multipart-body-size (::max-multipart-body-size cfg)
:xnio/io-threads (or (::io-threads cfg) :xnio/io-threads (or (::io-threads cfg)
(max 3 (px/get-available-processors))) (max 3 (px/get-available-processors)))
:xnio/worker-threads (or (::worker-threads cfg) :xnio/dispatch :virtual
(max 6 (px/get-available-processors))) :ring/compat :ring2
:xnio/dispatch true :socket/backlog 4069}
:socket/backlog 4069
:ring/async true}
handler (cond handler (cond
(some? router) (some? router)
@ -102,13 +98,13 @@
(yt/stop! server)) (yt/stop! server))
(defn- not-found-handler (defn- not-found-handler
[_ respond _] [_]
(respond {::yrs/status 404})) {::rres/status 404})
(defn- router-handler (defn- router-handler
[router] [router]
(letfn [(resolve-handler [request] (letfn [(resolve-handler [request]
(if-let [match (r/match-by-path router (yrq/path request))] (if-let [match (r/match-by-path router (rreq/path request))]
(let [params (:path-params match) (let [params (:path-params match)
result (:result match) result (:result match)
handler (or (:handler result) not-found-handler) handler (or (:handler result) not-found-handler)
@ -120,18 +116,15 @@
(let [{:keys [body] :as response} (errors/handle cause request)] (let [{:keys [body] :as response} (errors/handle cause request)]
(cond-> response (cond-> response
(map? body) (map? body)
(-> (update ::yrs/headers assoc "content-type" "application/transit+json") (-> (update ::rres/headers assoc "content-type" "application/transit+json")
(assoc ::yrs/body (t/encode-str body {:type :json-verbose}))))))] (assoc ::rres/body (t/encode-str body {:type :json-verbose}))))))]
(fn [request respond _] (fn [request]
(let [handler (resolve-handler request) (let [handler (resolve-handler request)]
exchange (yrq/exchange request)] (try
(handler (handler)
(fn [response] (catch Throwable cause
(yt/dispatch! exchange (partial respond response))) (on-error cause request)))))))
(fn [cause]
(let [response (on-error cause request)]
(yt/dispatch! exchange (partial respond response)))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HTTP ROUTER ;; HTTP ROUTER
@ -160,8 +153,7 @@
[session/soft-auth cfg] [session/soft-auth cfg]
[actoken/soft-auth cfg] [actoken/soft-auth cfg]
[mw/errors errors/handle] [mw/errors errors/handle]
[mw/restrict-methods] [mw/restrict-methods]]}
[mw/with-dispatch :vthread]]}
(::mtx/routes cfg) (::mtx/routes cfg)
(::assets/routes cfg) (::assets/routes cfg)

View file

@ -11,13 +11,13 @@
[app.db :as db] [app.db :as db]
[app.main :as-alias main] [app.main :as-alias main]
[app.tokens :as tokens] [app.tokens :as tokens]
[yetti.request :as yrq])) [ring.request :as rreq]))
(def header-re #"^Token\s+(.*)") (def header-re #"^Token\s+(.*)")
(defn- get-token (defn- get-token
[request] [request]
(some->> (yrq/get-header request "authorization") (some->> (rreq/get-header request "authorization")
(re-matches header-re) (re-matches header-re)
(second))) (second)))
@ -54,9 +54,8 @@
(l/trace :hint "exception on decoding malformed token" :cause cause) (l/trace :hint "exception on decoding malformed token" :cause cause)
request)))] request)))]
(fn [request respond raise] (fn [request]
(let [request (handle-request request)] (handler (handle-request request)))))
(handler request respond raise)))))
(defn- wrap-authz (defn- wrap-authz
"Authorization middleware, will be executed synchronously on vthread." "Authorization middleware, will be executed synchronously on vthread."

View file

@ -16,7 +16,7 @@
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[yetti.response :as-alias yrs])) [ring.response :as-alias rres]))
(def ^:private cache-max-age (def ^:private cache-max-age
(dt/duration {:hours 24})) (dt/duration {:hours 24}))
@ -37,8 +37,8 @@
(defn- serve-object-from-s3 (defn- serve-object-from-s3
[{:keys [::sto/storage] :as cfg} obj] [{:keys [::sto/storage] :as cfg} obj]
(let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})] (let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})]
{::yrs/status 307 {::rres/status 307
::yrs/headers {"location" (str url) ::rres/headers {"location" (str url)
"x-host" (cond-> host port (str ":" port)) "x-host" (cond-> host port (str ":" port))
"x-mtype" (-> obj meta :content-type) "x-mtype" (-> obj meta :content-type)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}})) "cache-control" (str "max-age=" (inst-ms cache-max-age))}}))
@ -51,8 +51,8 @@
headers {"x-accel-redirect" (:path purl) headers {"x-accel-redirect" (:path purl)
"content-type" (:content-type mdata) "content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}] "cache-control" (str "max-age=" (inst-ms cache-max-age))}]
{::yrs/status 204 {::rres/status 204
::yrs/headers headers})) ::rres/headers headers}))
(defn- serve-object (defn- serve-object
"Helper function that returns the appropriate response depending on "Helper function that returns the appropriate response depending on
@ -70,7 +70,7 @@
obj (sto/get-object storage id)] obj (sto/get-object storage id)]
(if obj (if obj
(serve-object cfg obj) (serve-object cfg obj)
{::yrs/status 404}))) {::rres/status 404})))
(defn- generic-handler (defn- generic-handler
"A generic handler helper/common code for file-media based handlers." "A generic handler helper/common code for file-media based handlers."
@ -81,7 +81,7 @@
sobj (sto/get-object storage (kf mobj))] sobj (sto/get-object storage (kf mobj))]
(if sobj (if sobj
(serve-object cfg sobj) (serve-object cfg sobj)
{::yrs/status 404}))) {::rres/status 404})))
(defn file-objects-handler (defn file-objects-handler
"Handler that serves storage objects by file media id." "Handler that serves storage objects by file media id."

View file

@ -20,8 +20,8 @@
[integrant.core :as ig] [integrant.core :as ig]
[jsonista.core :as j] [jsonista.core :as j]
[promesa.exec :as px] [promesa.exec :as px]
[yetti.request :as yrq] [ring.request :as rreq]
[yetti.response :as-alias yrs])) [ring.response :as-alias rres]))
(declare parse-json) (declare parse-json)
(declare handle-request) (declare handle-request)
@ -31,15 +31,14 @@
(defmethod ig/pre-init-spec ::routes [_] (defmethod ig/pre-init-spec ::routes [_]
(s/keys :req [::http/client (s/keys :req [::http/client
::main/props ::main/props
::db/pool ::db/pool]))
::wrk/executor]))
(defmethod ig/init-key ::routes (defmethod ig/init-key ::routes
[_ {:keys [::wrk/executor] :as cfg}] [_ cfg]
(letfn [(handler [request] (letfn [(handler [request]
(let [data (-> request yrq/body slurp)] (let [data (-> request rreq/body slurp)]
(px/run! executor #(handle-request cfg data))) (px/run! :vthread (partial handle-request cfg data)))
{::yrs/status 200})] {::rres/status 200})]
["/sns" {:handler handler ["/sns" {:handler handler
:allowed-methods #{:post}}])) :allowed-methods #{:post}}]))

View file

@ -8,7 +8,6 @@
"Http client abstraction layer." "Http client abstraction layer."
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[app.worker :as wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[java-http-clj.core :as http] [java-http-clj.core :as http]
@ -21,12 +20,11 @@
(s/keys :req [::client])) (s/keys :req [::client]))
(defmethod ig/pre-init-spec ::client [_] (defmethod ig/pre-init-spec ::client [_]
(s/keys :req [::wrk/executor])) (s/keys :req []))
(defmethod ig/init-key ::client (defmethod ig/init-key ::client
[_ {:keys [::wrk/executor] :as cfg}] [_ _]
(http/build-client {:executor executor (http/build-client {:connect-timeout 30000 ;; 10s
:connect-timeout 30000 ;; 10s
:follow-redirects :always})) :follow-redirects :always}))
(defn send! (defn send!

View file

@ -32,8 +32,8 @@
[integrant.core :as ig] [integrant.core :as ig]
[markdown.core :as md] [markdown.core :as md]
[markdown.transformers :as mdt] [markdown.transformers :as mdt]
[yetti.request :as yrq] [ring.request :as rreq]
[yetti.response :as yrs])) [ring.response :as rres]))
;; (selmer.parser/cache-off!) ;; (selmer.parser/cache-off!)
@ -43,10 +43,10 @@
(defn index-handler (defn index-handler
[_cfg _request] [_cfg _request]
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/html"} ::rres/headers {"content-type" "text/html"}
::yrs/body (-> (io/resource "app/templates/debug.tmpl") ::rres/body (-> (io/resource "app/templates/debug.tmpl")
(tmpl/render {}))}) (tmpl/render {}))})
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE CHANGES ;; FILE CHANGES
@ -55,17 +55,17 @@
(defn prepare-response (defn prepare-response
[body] [body]
(let [headers {"content-type" "application/transit+json"}] (let [headers {"content-type" "application/transit+json"}]
{::yrs/status 200 {::rres/status 200
::yrs/body body ::rres/body body
::yrs/headers headers})) ::rres/headers headers}))
(defn prepare-download-response (defn prepare-download-response
[body filename] [body filename]
(let [headers {"content-disposition" (str "attachment; filename=" filename) (let [headers {"content-disposition" (str "attachment; filename=" filename)
"content-type" "application/octet-stream"}] "content-type" "application/octet-stream"}]
{::yrs/status 200 {::rres/status 200
::yrs/body body ::rres/body body
::yrs/headers headers})) ::rres/headers headers}))
(def sql:retrieve-range-of-changes (def sql:retrieve-range-of-changes
"select revn, changes from file_change where file_id=? and revn >= ? and revn <= ? order by revn") "select revn, changes from file_change where file_id=? and revn >= ? and revn <= ? order by revn")
@ -107,8 +107,8 @@
(db/update! conn :file (db/update! conn :file
{:data data} {:data data}
{:id file-id}) {:id file-id})
{::yrs/status 201 {::rres/status 201
::yrs/body "OK CREATED"}))) ::rres/body "OK CREATED"})))
:else :else
(prepare-response (blob/decode data)))))) (prepare-response (blob/decode data))))))
@ -137,8 +137,8 @@
{:data data {:data data
:deleted-at nil} :deleted-at nil}
{:id file-id}) {:id file-id})
{::yrs/status 200 {::rres/status 200
::yrs/body "OK UPDATED"}) ::rres/body "OK UPDATED"})
(db/run! pool (fn [{:keys [::db/conn]}] (db/run! pool (fn [{:keys [::db/conn]}]
(create-file conn {:id file-id (create-file conn {:id file-id
@ -148,15 +148,15 @@
(db/update! conn :file (db/update! conn :file
{:data data} {:data data}
{:id file-id}) {:id file-id})
{::yrs/status 201 {::rres/status 201
::yrs/body "OK CREATED"})))) ::rres/body "OK CREATED"}))))
{::yrs/status 500 {::rres/status 500
::yrs/body "ERROR"}))) ::rres/body "ERROR"})))
(defn file-data-handler (defn file-data-handler
[cfg request] [cfg request]
(case (yrq/method request) (case (rreq/method request)
:get (retrieve-file-data cfg request) :get (retrieve-file-data cfg request)
:post (upload-file-data cfg request) :post (upload-file-data cfg request)
(ex/raise :type :http (ex/raise :type :http
@ -238,12 +238,12 @@
1 (render-template-v1 report) 1 (render-template-v1 report)
2 (render-template-v2 report) 2 (render-template-v2 report)
3 (render-template-v3 report))] 3 (render-template-v3 report))]
{::yrs/status 200 {::rres/status 200
::yrs/body result ::rres/body result
::yrs/headers {"content-type" "text/html; charset=utf-8" ::rres/headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}}) "x-robots-tag" "noindex"}})
{::yrs/status 404 {::rres/status 404
::yrs/body "not found"}))) ::rres/body "not found"})))
(def sql:error-reports (def sql:error-reports
"SELECT id, created_at, "SELECT id, created_at,
@ -256,11 +256,11 @@
[{:keys [::db/pool]} _request] [{:keys [::db/pool]} _request]
(let [items (->> (db/exec! pool [sql:error-reports]) (let [items (->> (db/exec! pool [sql:error-reports])
(map #(update % :created-at dt/format-instant :rfc1123)))] (map #(update % :created-at dt/format-instant :rfc1123)))]
{::yrs/status 200 {::rres/status 200
::yrs/body (-> (io/resource "app/templates/error-list.tmpl") ::rres/body (-> (io/resource "app/templates/error-list.tmpl")
(tmpl/render {:items items})) (tmpl/render {:items items}))
::yrs/headers {"content-type" "text/html; charset=utf-8" ::rres/headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}})) "x-robots-tag" "noindex"}}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; EXPORT/IMPORT ;; EXPORT/IMPORT
@ -296,14 +296,14 @@
::binf/profile-id profile-id ::binf/profile-id profile-id
::binf/project-id project-id)) ::binf/project-id project-id))
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body "OK CLONED"}) ::rres/body "OK CLONED"})
{::yrs/status 200 {::rres/status 200
::yrs/body (io/input-stream path) ::rres/body (io/input-stream path)
::yrs/headers {"content-type" "application/octet-stream" ::rres/headers {"content-type" "application/octet-stream"
"content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}})))) "content-disposition" (str "attachmen; filename=" (first file-ids) ".penpot")}}))))
@ -334,9 +334,9 @@
::binf/profile-id profile-id ::binf/profile-id profile-id
::binf/project-id project-id)) ::binf/project-id project-id))
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body "OK"})) ::rres/body "OK"}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; ACTIONS ;; ACTIONS
@ -363,34 +363,34 @@
(db/update! pool :profile {:is-blocked true} {:id (:id profile)}) (db/update! pool :profile {:is-blocked true} {:id (:id profile)})
(db/delete! pool :http-session {:profile-id (:id profile)}) (db/delete! pool :http-session {:profile-id (:id profile)})
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))}) ::rres/body (str/ffmt "PROFILE '%' BLOCKED" (:email profile))})
(contains? params :unblock) (contains? params :unblock)
(do (do
(db/update! pool :profile {:is-blocked false} {:id (:id profile)}) (db/update! pool :profile {:is-blocked false} {:id (:id profile)})
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))}) ::rres/body (str/ffmt "PROFILE '%' UNBLOCKED" (:email profile))})
(contains? params :resend) (contains? params :resend)
(if (:is-blocked profile) (if (:is-blocked profile)
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body "PROFILE ALREADY BLOCKED"} ::rres/body "PROFILE ALREADY BLOCKED"}
(do (do
(auth/send-email-verification! pool props profile) (auth/send-email-verification! pool props profile)
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "RESENDED FOR '%'" (:email profile))})) ::rres/body (str/ffmt "RESENDED FOR '%'" (:email profile))}))
:else :else
(do (do
(db/update! pool :profile {:is-active true} {:id (:id profile)}) (db/update! pool :profile {:is-active true} {:id (:id profile)})
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))})))) ::rres/body (str/ffmt "PROFILE '%' ACTIVATED" (:email profile))}))))
(defn- reset-file-data-version (defn- reset-file-data-version
@ -420,9 +420,9 @@
:migrate? false :migrate? false
:inc-revn? false :inc-revn? false
:save? true) :save? true)
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/plain"} ::rres/headers {"content-type" "text/plain"}
::yrs/body "OK"})) ::rres/body "OK"}))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -434,13 +434,13 @@
[{:keys [::db/pool]} _] [{:keys [::db/pool]} _]
(try (try
(db/exec-one! pool ["select count(*) as count from server_prop;"]) (db/exec-one! pool ["select count(*) as count from server_prop;"])
{::yrs/status 200 {::rres/status 200
::yrs/body "OK"} ::rres/body "OK"}
(catch Throwable cause (catch Throwable cause
(l/warn :hint "unable to execute query on health handler" (l/warn :hint "unable to execute query on health handler"
:cause cause) :cause cause)
{::yrs/status 503 {::rres/status 503
::yrs/body "KO"}))) ::rres/body "KO"})))
(defn changelog-handler (defn changelog-handler
[_ _] [_ _]
@ -449,11 +449,11 @@
(md->html [text] (md->html [text]
(md/md-to-html-string text :replacement-transformers (into [transform-emoji] mdt/transformer-vector)))] (md/md-to-html-string text :replacement-transformers (into [transform-emoji] mdt/transformer-vector)))]
(if-let [clog (io/resource "changelog.md")] (if-let [clog (io/resource "changelog.md")]
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/html; charset=utf-8"} ::rres/headers {"content-type" "text/html; charset=utf-8"}
::yrs/body (-> clog slurp md->html)} ::rres/body (-> clog slurp md->html)}
{::yrs/status 404 {::rres/status 404
::yrs/body "NOT FOUND"}))) ::rres/body "NOT FOUND"})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; INIT ;; INIT

View file

@ -9,21 +9,21 @@
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.schema :as sm] [app.common.schema :as-alias sm]
[app.config :as cf] [app.config :as cf]
[app.http :as-alias http] [app.http :as-alias http]
[app.http.access-token :as-alias actoken] [app.http.access-token :as-alias actoken]
[app.http.session :as-alias session] [app.http.session :as-alias session]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[yetti.request :as yrq] [ring.request :as rreq]
[yetti.response :as yrs])) [ring.response :as rres]))
(defn- parse-client-ip (defn- parse-client-ip
[request] [request]
(or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) (or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first)
(yrq/get-header request "x-real-ip") (rreq/get-header request "x-real-ip")
(yrq/remote-addr request))) (rreq/remote-addr request)))
(defn request->context (defn request->context
"Extracts error report relevant context data from request." "Extracts error report relevant context data from request."
@ -34,10 +34,10 @@
{:request/path (:path request) {:request/path (:path request)
:request/method (:method request) :request/method (:method request)
:request/params (:params request) :request/params (:params request)
:request/user-agent (yrq/get-header request "user-agent") :request/user-agent (rreq/get-header request "user-agent")
:request/ip-addr (parse-client-ip request) :request/ip-addr (parse-client-ip request)
:request/profile-id (:uid claims) :request/profile-id (:uid claims)
:version/frontend (or (yrq/get-header request "x-frontend-version") "unknown") :version/frontend (or (rreq/get-header request "x-frontend-version") "unknown")
:version/backend (:full cf/version)})) :version/backend (:full cf/version)}))
(defmulti handle-error (defmulti handle-error
@ -50,69 +50,55 @@
(defmethod handle-error :authentication (defmethod handle-error :authentication
[err _ _] [err _ _]
{::yrs/status 401 {::rres/status 401
::yrs/body (ex-data err)}) ::rres/body (ex-data err)})
(defmethod handle-error :authorization (defmethod handle-error :authorization
[err _ _] [err _ _]
{::yrs/status 403 {::rres/status 403
::yrs/body (ex-data err)}) ::rres/body (ex-data err)})
(defmethod handle-error :restriction (defmethod handle-error :restriction
[err _ _] [err _ _]
{::yrs/status 400 {::rres/status 400
::yrs/body (ex-data err)}) ::rres/body (ex-data err)})
(defmethod handle-error :rate-limit (defmethod handle-error :rate-limit
[err _ _] [err _ _]
(let [headers (-> err ex-data ::http/headers)] (let [headers (-> err ex-data ::http/headers)]
{::yrs/status 429 {::rres/status 429
::yrs/headers headers})) ::rres/headers headers}))
(defmethod handle-error :concurrency-limit (defmethod handle-error :concurrency-limit
[err _ _] [err _ _]
(let [headers (-> err ex-data ::http/headers)] (let [headers (-> err ex-data ::http/headers)]
{::yrs/status 429 {::rres/status 429
::yrs/headers headers})) ::rres/headers headers}))
(defmethod handle-error :validation (defmethod handle-error :validation
[err request parent-cause] [err request parent-cause]
(let [{:keys [code] :as data} (ex-data err)] (let [{:keys [code] :as data} (ex-data err)]
(cond (cond
(= code :spec-validation) (or (= code :spec-validation)
(= code :params-validation)
(= code :data-validation))
(let [explain (ex/explain data)] (let [explain (ex/explain data)]
{::yrs/status 400 {::rres/status 400
::yrs/body (-> data ::rres/body (-> data
(dissoc ::s/problems ::s/value ::s/spec) (dissoc ::s/problems ::s/value ::s/spec ::sm/explain)
(cond-> explain (assoc :explain explain)))}) (cond-> explain (assoc :explain explain)))})
(= code :params-validation)
(let [explain (::sm/explain data)
explain (sm/humanize-data explain)]
{::yrs/status 400
::yrs/body (-> data
(dissoc ::sm/explain)
(assoc :explain explain))})
(= code :data-validation)
(let [explain (::sm/explain data)
explain (sm/humanize-data explain)]
{::yrs/status 400
::yrs/body (-> data
(dissoc ::sm/explain)
(assoc :explain explain))})
(= code :request-body-too-large) (= code :request-body-too-large)
{::yrs/status 413 ::yrs/body data} {::rres/status 413 ::rres/body data}
(= code :invalid-image) (= code :invalid-image)
(binding [l/*context* (request->context request)] (binding [l/*context* (request->context request)]
(let [cause (or parent-cause err)] (let [cause (or parent-cause err)]
(l/error :hint "unexpected error on processing image" :cause cause) (l/error :hint "unexpected error on processing image" :cause cause)
{::yrs/status 400 ::yrs/body data})) {::rres/status 400 ::rres/body data}))
:else :else
{::yrs/status 400 ::yrs/body data}))) {::rres/status 400 ::rres/body data})))
(defmethod handle-error :assertion (defmethod handle-error :assertion
[error request parent-cause] [error request parent-cause]
@ -123,46 +109,46 @@
(= code :data-validation) (= code :data-validation)
(let [explain (ex/explain data)] (let [explain (ex/explain data)]
(l/error :hint "data assertion error" :cause cause) (l/error :hint "data assertion error" :cause cause)
{::yrs/status 500 {::rres/status 500
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :assertion :code :assertion
:data (-> data :data (-> data
(dissoc ::sm/explain) (dissoc ::sm/explain)
(cond-> explain (assoc :explain explain)))}}) (cond-> explain (assoc :explain explain)))}})
(= code :spec-validation) (= code :spec-validation)
(let [explain (ex/explain data)] (let [explain (ex/explain data)]
(l/error :hint "spec assertion error" :cause cause) (l/error :hint "spec assertion error" :cause cause)
{::yrs/status 500 {::rres/status 500
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :assertion :code :assertion
:data (-> data :data (-> data
(dissoc ::s/problems ::s/value ::s/spec) (dissoc ::s/problems ::s/value ::s/spec)
(cond-> explain (assoc :explain explain)))}}) (cond-> explain (assoc :explain explain)))}})
:else :else
(do (do
(l/error :hint "assertion error" :cause cause) (l/error :hint "assertion error" :cause cause)
{::yrs/status 500 {::rres/status 500
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :assertion :code :assertion
:data data}}))))) :data data}})))))
(defmethod handle-error :not-found (defmethod handle-error :not-found
[err _ _] [err _ _]
{::yrs/status 404 {::rres/status 404
::yrs/body (ex-data err)}) ::rres/body (ex-data err)})
(defmethod handle-error :internal (defmethod handle-error :internal
[error request parent-cause] [error request parent-cause]
(binding [l/*context* (request->context request)] (binding [l/*context* (request->context request)]
(let [cause (or parent-cause error)] (let [cause (or parent-cause error)]
(l/error :hint "internal error" :cause cause) (l/error :hint "internal error" :cause cause)
{::yrs/status 500 {::rres/status 500
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :unhandled :code :unhandled
:hint (ex-message error) :hint (ex-message error)
:data (ex-data error)}}))) :data (ex-data error)}})))
(defmethod handle-error :default (defmethod handle-error :default
[error request parent-cause] [error request parent-cause]
@ -186,23 +172,23 @@
:cause cause) :cause cause)
(cond (cond
(= state "57014") (= state "57014")
{::yrs/status 504 {::rres/status 504
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :statement-timeout :code :statement-timeout
:hint (ex-message error)}} :hint (ex-message error)}}
(= state "25P03") (= state "25P03")
{::yrs/status 504 {::rres/status 504
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :idle-in-transaction-timeout :code :idle-in-transaction-timeout
:hint (ex-message error)}} :hint (ex-message error)}}
:else :else
{::yrs/status 500 {::rres/status 500
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :unexpected :code :unexpected
:hint (ex-message error) :hint (ex-message error)
:state state}})))) :state state}}))))
(defmethod handle-exception :default (defmethod handle-exception :default
[error request parent-cause] [error request parent-cause]
@ -213,19 +199,19 @@
(nil? edata) (nil? edata)
(binding [l/*context* (request->context request)] (binding [l/*context* (request->context request)]
(l/error :hint "unexpected error" :cause cause) (l/error :hint "unexpected error" :cause cause)
{::yrs/status 500 {::rres/status 500
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :unexpected :code :unexpected
:hint (ex-message error)}}) :hint (ex-message error)}})
:else :else
(binding [l/*context* (request->context request)] (binding [l/*context* (request->context request)]
(l/error :hint "unhandled error" :cause cause) (l/error :hint "unhandled error" :cause cause)
{::yrs/status 500 {::rres/status 500
::yrs/body {:type :server-error ::rres/body {:type :server-error
:code :unhandled :code :unhandled
:hint (ex-message error) :hint (ex-message error)
:data edata}})))) :data edata}}))))
(defmethod handle-exception java.util.concurrent.CompletionException (defmethod handle-exception java.util.concurrent.CompletionException
[cause request _] [cause request _]

View file

@ -12,13 +12,10 @@
[app.config :as cf] [app.config :as cf]
[app.util.json :as json] [app.util.json :as json]
[cuerdas.core :as str] [cuerdas.core :as str]
[promesa.core :as p] [ring.request :as rreq]
[promesa.exec :as px] [ring.response :as rres]
[promesa.util :as pu]
[yetti.adapter :as yt] [yetti.adapter :as yt]
[yetti.middleware :as ymw] [yetti.middleware :as ymw])
[yetti.request :as yrq]
[yetti.response :as yrs])
(:import (:import
com.fasterxml.jackson.core.JsonParseException com.fasterxml.jackson.core.JsonParseException
com.fasterxml.jackson.core.io.JsonEOFException com.fasterxml.jackson.core.io.JsonEOFException
@ -46,17 +43,17 @@
(defn wrap-parse-request (defn wrap-parse-request
[handler] [handler]
(letfn [(process-request [request] (letfn [(process-request [request]
(let [header (yrq/get-header request "content-type")] (let [header (rreq/get-header request "content-type")]
(cond (cond
(str/starts-with? header "application/transit+json") (str/starts-with? header "application/transit+json")
(with-open [^InputStream is (yrq/body request)] (with-open [^InputStream is (rreq/body request)]
(let [params (t/read! (t/reader is))] (let [params (t/read! (t/reader is))]
(-> request (-> request
(assoc :body-params params) (assoc :body-params params)
(update :params merge params)))) (update :params merge params))))
(str/starts-with? header "application/json") (str/starts-with? header "application/json")
(with-open [^InputStream is (yrq/body request)] (with-open [^InputStream is (rreq/body request)]
(let [params (json/decode is json-mapper)] (let [params (json/decode is json-mapper)]
(-> request (-> request
(assoc :body-params params) (assoc :body-params params)
@ -65,37 +62,36 @@
:else :else
request))) request)))
(handle-error [raise cause] (handle-error [cause]
(cond (cond
(instance? RuntimeException cause) (instance? RuntimeException cause)
(if-let [cause (ex-cause cause)] (if-let [cause (ex-cause cause)]
(handle-error raise cause) (handle-error cause)
(raise cause)) (throw cause))
(instance? RequestTooBigException cause) (instance? RequestTooBigException cause)
(raise (ex/error :type :validation (ex/raise :type :validation
:code :request-body-too-large :code :request-body-too-large
:hint (ex-message cause))) :hint (ex-message cause))
(or (instance? JsonEOFException cause) (or (instance? JsonEOFException cause)
(instance? JsonParseException cause) (instance? JsonParseException cause)
(instance? MismatchedInputException cause)) (instance? MismatchedInputException cause))
(raise (ex/error :type :validation (ex/raise :type :validation
:code :malformed-json :code :malformed-json
:hint (ex-message cause) :hint (ex-message cause)
:cause cause)) :cause cause)
:else :else
(raise cause)))] (throw cause)))]
(fn [request respond raise] (fn [request]
(if (= (yrq/method request) :post) (if (= (rreq/method request) :post)
(let [request (ex/try! (process-request request))] (let [request (ex/try! (process-request request))]
(if (ex/exception? request) (if (ex/exception? request)
(handle-error raise request) (handle-error request)
(handler request respond raise))) (handler request)))
(handler request respond raise))))) (handler request)))))
(def parse-request (def parse-request
{:name ::parse-request {:name ::parse-request
@ -113,7 +109,7 @@
(defn wrap-format-response (defn wrap-format-response
[handler] [handler]
(letfn [(transit-streamable-body [data opts] (letfn [(transit-streamable-body [data opts]
(reify yrs/StreamableResponseBody (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream] (-write-body-to-stream [_ _ output-stream]
(try (try
(with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)] (with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)]
@ -128,7 +124,7 @@
(.close ^OutputStream output-stream)))))) (.close ^OutputStream output-stream))))))
(json-streamable-body [data] (json-streamable-body [data]
(reify yrs/StreamableResponseBody (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream] (-write-body-to-stream [_ _ output-stream]
(try (try
(with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)] (with-open [^OutputStream bos (buffered-output-stream output-stream buffer-size)]
@ -143,24 +139,24 @@
(.close ^OutputStream output-stream)))))) (.close ^OutputStream output-stream))))))
(format-response-with-json [response _] (format-response-with-json [response _]
(let [body (::yrs/body response)] (let [body (::rres/body response)]
(if (or (boolean? body) (coll? body)) (if (or (boolean? body) (coll? body))
(-> response (-> response
(update ::yrs/headers assoc "content-type" "application/json") (update ::rres/headers assoc "content-type" "application/json")
(assoc ::yrs/body (json-streamable-body body))) (assoc ::rres/body (json-streamable-body body)))
response))) response)))
(format-response-with-transit [response request] (format-response-with-transit [response request]
(let [body (::yrs/body response)] (let [body (::rres/body response)]
(if (or (boolean? body) (coll? body)) (if (or (boolean? body) (coll? body))
(let [qs (yrq/query request) (let [qs (rreq/query request)
opts (if (or (contains? cf/flags :transit-readable-response) opts (if (or (contains? cf/flags :transit-readable-response)
(str/includes? qs "transit_verbose")) (str/includes? qs "transit_verbose"))
{:type :json-verbose} {:type :json-verbose}
{:type :json})] {:type :json})]
(-> response (-> response
(update ::yrs/headers assoc "content-type" "application/transit+json") (update ::rres/headers assoc "content-type" "application/transit+json")
(assoc ::yrs/body (transit-streamable-body body opts)))) (assoc ::rres/body (transit-streamable-body body opts))))
response))) response)))
(format-from-params [{:keys [query-params] :as request}] (format-from-params [{:keys [query-params] :as request}]
@ -169,7 +165,7 @@
(format-response [response request] (format-response [response request]
(let [accept (or (format-from-params request) (let [accept (or (format-from-params request)
(yrq/get-header request "accept"))] (rreq/get-header request "accept"))]
(cond (cond
(or (= accept "application/transit+json") (or (= accept "application/transit+json")
(str/includes? accept "application/transit+json")) (str/includes? accept "application/transit+json"))
@ -186,11 +182,9 @@
(cond-> response (cond-> response
(map? response) (format-response request)))] (map? response) (format-response request)))]
(fn [request respond raise] (fn [request]
(handler request (let [response (handler request)]
(fn [response] (process-response response request)))))
(respond (process-response response request)))
raise))))
(def format-response (def format-response
{:name ::format-response {:name ::format-response
@ -198,12 +192,11 @@
(defn wrap-errors (defn wrap-errors
[handler on-error] [handler on-error]
(fn [request respond raise] (fn [request]
(handler request respond (fn [cause] (try
(try (handler request)
(respond (on-error cause request)) (catch Throwable cause
(catch Throwable cause (on-error cause request)))))
(raise cause)))))))
(def errors (def errors
{:name ::errors {:name ::errors
@ -221,11 +214,11 @@
(defn wrap-cors (defn wrap-cors
[handler] [handler]
(fn [request] (fn [request]
(let [response (if (= (yrq/method request) :options) (let [response (if (= (rreq/method request) :options)
{::yrs/status 200} {::rres/status 200}
(handler request)) (handler request))
origin (yrq/get-header request "origin")] origin (rreq/get-header request "origin")]
(update response ::yrs/headers with-cors-headers origin)))) (update response ::rres/headers with-cors-headers origin))))
(def cors (def cors
{:name ::cors {:name ::cors
@ -239,18 +232,8 @@
(fn [data _] (fn [data _]
(when-let [allowed (:allowed-methods data)] (when-let [allowed (:allowed-methods data)]
(fn [handler] (fn [handler]
(fn [request respond raise] (fn [request]
(let [method (yrq/method request)] (let [method (rreq/method request)]
(if (contains? allowed method) (if (contains? allowed method)
(handler request respond raise) (handler request)
(respond {::yrs/status 405})))))))}) {::rres/status 405}))))))})
(def with-dispatch
{:name ::with-dispatch
:compile
(fn [& _]
(fn [handler executor]
(let [executor (px/resolve-executor executor)]
(fn [request respond raise]
(->> (px/submit! executor (partial handler request))
(p/fnly (pu/handler respond raise)))))))})

View file

@ -20,6 +20,7 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
[ring.request :as rreq]
[yetti.request :as yrq])) [yetti.request :as yrq]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -142,7 +143,7 @@
(us/assert! ::us/uuid profile-id) (us/assert! ::us/uuid profile-id)
(fn [request response] (fn [request response]
(let [uagent (yrq/get-header request "user-agent") (let [uagent (rreq/get-header request "user-agent")
params {:profile-id profile-id params {:profile-id profile-id
:user-agent uagent :user-agent uagent
:created-at (dt/now)} :created-at (dt/now)}
@ -209,9 +210,8 @@
(l/trace :hint "exception on decoding malformed token" :cause cause) (l/trace :hint "exception on decoding malformed token" :cause cause)
request)))] request)))]
(fn [request respond raise] (fn [request]
(let [request (handle-request request)] (handler (handle-request request)))))
(handler request respond raise)))))
(defn- wrap-authz (defn- wrap-authz
[handler {:keys [::manager]}] [handler {:keys [::manager]}]

View file

@ -10,7 +10,7 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.pprint :as pp] [app.common.pprint :as pp]
[app.common.spec :as us] [app.common.schema :as sm]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.http.session :as session] [app.http.session :as session]
@ -21,6 +21,7 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec.csp :as sp] [promesa.exec.csp :as sp]
[ring.websocket :as rws]
[yetti.websocket :as yws])) [yetti.websocket :as yws]))
(def recv-labels (def recv-labels
@ -277,19 +278,23 @@
:inc 1) :inc 1)
message) message)
(def ^:private schema:params
(s/def ::session-id ::us/uuid) (sm/define
(s/def ::handler-params [:map {:title "params"}
(s/keys :req-un [::session-id])) [:session-id ::sm/uuid]]))
(defn- http-handler (defn- http-handler
[cfg {:keys [params ::session/profile-id] :as request}] [cfg {:keys [params ::session/profile-id] :as request}]
(let [{:keys [session-id]} (us/conform ::handler-params params)] (let [{:keys [session-id]} (sm/conform! schema:params params)]
(cond (cond
(not profile-id) (not profile-id)
(ex/raise :type :authentication (ex/raise :type :authentication
:hint "Authentication required.") :hint "Authentication required.")
;; WORKAROUND: we use the adapter specific predicate for
;; performance reasons; for now, the ring default impl for
;; `upgrade-request?` parses all requests headers before perform
;; any checking.
(not (yws/upgrade-request? request)) (not (yws/upgrade-request? request))
(ex/raise :type :validation (ex/raise :type :validation
:code :websocket-request-expected :code :websocket-request-expected
@ -298,14 +303,13 @@
:else :else
(do (do
(l/trace :hint "websocket request" :profile-id profile-id :session-id session-id) (l/trace :hint "websocket request" :profile-id profile-id :session-id session-id)
(->> (ws/handler {::rws/listener (ws/listener request
::ws/on-rcv-message (partial on-rcv-message cfg) ::ws/on-rcv-message (partial on-rcv-message cfg)
::ws/on-snd-message (partial on-snd-message cfg) ::ws/on-snd-message (partial on-snd-message cfg)
::ws/on-connect (partial on-connect cfg) ::ws/on-connect (partial on-connect cfg)
::ws/handler (partial handle-message cfg) ::ws/handler (partial handle-message cfg)
::profile-id profile-id ::profile-id profile-id
::session-id session-id) ::session-id session-id)}))))
(yws/upgrade request))))))
(defmethod ig/pre-init-spec ::routes [_] (defmethod ig/pre-init-spec ::routes [_]
(s/keys :req [::mbus/msgbus (s/keys :req [::mbus/msgbus
@ -318,5 +322,4 @@
(defmethod ig/init-key ::routes (defmethod ig/init-key ::routes
[_ cfg] [_ cfg]
["/ws/notifications" {:middleware [[session/authz cfg]] ["/ws/notifications" {:middleware [[session/authz cfg]]
:handler (partial http-handler cfg) :handler (partial http-handler cfg)}])
:allowed-methods #{:get}}])

View file

@ -33,7 +33,7 @@
[integrant.core :as ig] [integrant.core :as ig]
[lambdaisland.uri :as u] [lambdaisland.uri :as u]
[promesa.exec :as px] [promesa.exec :as px]
[yetti.request :as yrq])) [ring.request :as rreq]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HELPERS ;; HELPERS
@ -41,9 +41,9 @@
(defn parse-client-ip (defn parse-client-ip
[request] [request]
(or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first) (or (some-> (rreq/get-header request "x-forwarded-for") (str/split ",") first)
(yrq/get-header request "x-real-ip") (rreq/get-header request "x-real-ip")
(some-> (yrq/remote-addr request) str))) (some-> (rreq/remote-addr request) str)))
(defn extract-utm-params (defn extract-utm-params
"Extracts additional data from params and namespace them under "Extracts additional data from params and namespace them under

View file

@ -56,22 +56,22 @@
(dissoc :request/params :value :params :data))] (dissoc :request/params :value :params :data))]
(merge (merge
{:context (-> (into (sorted-map) ctx) {:context (-> (into (sorted-map) ctx)
(pp/pprint-str :width 200 :length 50 :level 10)) (pp/pprint-str :length 50))
:props (pp/pprint-str props :width 200 :length 50) :props (pp/pprint-str props :length 50)
:hint (or (ex-message cause) @message) :hint (or (ex-message cause) @message)
:trace (or (::trace record) :trace (or (::trace record)
(ex/format-throwable cause :data? false :explain? false :header? false :summary? false))} (ex/format-throwable cause :data? false :explain? false :header? false :summary? false))}
(when-let [params (or (:request/params context) (:params context))] (when-let [params (or (:request/params context) (:params context))]
{:params (pp/pprint-str params :width 200 :length 50 :level 10)}) {:params (pp/pprint-str params :length 30 :level 12)})
(when-let [value (:value context)] (when-let [value (:value context)]
{:value (pp/pprint-str value :width 200 :length 50 :level 10)}) {:value (pp/pprint-str value :length 30 :level 12)})
(when-let [data (some-> data (dissoc ::s/problems ::s/value ::s/spec ::sm/explain :hint))] (when-let [data (some-> data (dissoc ::s/problems ::s/value ::s/spec ::sm/explain :hint))]
{:data (pp/pprint-str data :width 200)}) {:data (pp/pprint-str data :length 30 :level 12)})
(when-let [explain (ex/explain data {:level 8 :length 20})] (when-let [explain (ex/explain data :length 30 :level 12)]
{:explain explain}))))) {:explain explain})))))
(defn error-record? (defn error-record?
@ -96,11 +96,11 @@
(defmethod ig/init-key ::reporter (defmethod ig/init-key ::reporter
[_ cfg] [_ cfg]
(let [input (sp/chan :buf (sp/sliding-buffer 32) (let [input (sp/chan :buf (sp/sliding-buffer 64)
:xf (filter error-record?))] :xf (filter error-record?))]
(add-watch l/log-record ::reporter #(sp/put! input %4)) (add-watch l/log-record ::reporter #(sp/put! input %4))
(px/thread {:name "penpot/database-reporter" :virtual true} (px/thread {:name "penpot/database-reporter"}
(l/info :hint "initializing database error persistence") (l/info :hint "initializing database error persistence")
(try (try
(loop [] (loop []

View file

@ -161,12 +161,7 @@
::mdef/help "Current number of threads with state RUNNING." ::mdef/help "Current number of threads with state RUNNING."
::mdef/labels ["name"] ::mdef/labels ["name"]
::mdef/type :gauge} ::mdef/type :gauge}
})
:executors-queued-submissions
{::mdef/name "penpot_executors_queued_submissions"
::mdef/help "Current number of queued submissions."
::mdef/labels ["name"]
::mdef/type :gauge}})
(def system-config (def system-config
{::db/pool {::db/pool
@ -180,13 +175,12 @@
;; Default thread pool for IO operations ;; Default thread pool for IO operations
::wrk/executor ::wrk/executor
{::wrk/parallelism (cf/get :default-executor-parallelism {}
(+ 3 (* (px/get-available-processors) 3)))}
::wrk/monitor ::wrk/monitor
{::mtx/metrics (ig/ref ::mtx/metrics) {::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/name "default" ::wrk/executor (ig/ref ::wrk/executor)
::wrk/executor (ig/ref ::wrk/executor)} ::wrk/name "default"}
:app.migrations/migrations :app.migrations/migrations
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
@ -217,7 +211,7 @@
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
::http.client/client ::http.client/client
{::wrk/executor (ig/ref ::wrk/executor)} {}
::session/manager ::session/manager
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
@ -228,14 +222,12 @@
::http.awsns/routes ::http.awsns/routes
{::props (ig/ref ::setup/props) {::props (ig/ref ::setup/props)
::db/pool (ig/ref ::db/pool) ::db/pool (ig/ref ::db/pool)
::http.client/client (ig/ref ::http.client/client) ::http.client/client (ig/ref ::http.client/client)}
::wrk/executor (ig/ref ::wrk/executor)}
::http/server ::http/server
{::http/port (cf/get :http-server-port) {::http/port (cf/get :http-server-port)
::http/host (cf/get :http-server-host) ::http/host (cf/get :http-server-host)
::http/router (ig/ref ::http/router) ::http/router (ig/ref ::http/router)
::wrk/executor (ig/ref ::wrk/executor)
::http/io-threads (cf/get :http-server-io-threads) ::http/io-threads (cf/get :http-server-io-threads)
::http/max-body-size (cf/get :http-server-max-body-size) ::http/max-body-size (cf/get :http-server-max-body-size)
::http/max-multipart-body-size (cf/get :http-server-max-multipart-body-size)} ::http/max-multipart-body-size (cf/get :http-server-max-multipart-body-size)}
@ -291,12 +283,10 @@
::http.debug/routes ::http.debug/routes
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::session/manager (ig/ref ::session/manager) ::session/manager (ig/ref ::session/manager)
::sto/storage (ig/ref ::sto/storage) ::sto/storage (ig/ref ::sto/storage)
::props (ig/ref ::setup/props)} ::props (ig/ref ::setup/props)}
::http.ws/routes ::http.ws/routes
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
::mtx/metrics (ig/ref ::mtx/metrics) ::mtx/metrics (ig/ref ::mtx/metrics)
@ -307,12 +297,10 @@
{::http.assets/path (cf/get :assets-path) {::http.assets/path (cf/get :assets-path)
::http.assets/cache-max-age (dt/duration {:hours 24}) ::http.assets/cache-max-age (dt/duration {:hours 24})
::http.assets/cache-max-agesignature-max-age (dt/duration {:hours 24 :minutes 5}) ::http.assets/cache-max-agesignature-max-age (dt/duration {:hours 24 :minutes 5})
::sto/storage (ig/ref ::sto/storage) ::sto/storage (ig/ref ::sto/storage)}
::wrk/executor (ig/ref ::wrk/executor)}
:app.rpc/climit :app.rpc/climit
{::mtx/metrics (ig/ref ::mtx/metrics) {::mtx/metrics (ig/ref ::mtx/metrics)}
::wrk/executor (ig/ref ::wrk/executor)}
:app.rpc/rlimit :app.rpc/rlimit
{::wrk/executor (ig/ref ::wrk/executor)} {::wrk/executor (ig/ref ::wrk/executor)}
@ -343,7 +331,6 @@
:app.rpc/routes :app.rpc/routes
{::rpc/methods (ig/ref :app.rpc/methods) {::rpc/methods (ig/ref :app.rpc/methods)
::db/pool (ig/ref ::db/pool) ::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::session/manager (ig/ref ::session/manager) ::session/manager (ig/ref ::session/manager)
::props (ig/ref ::setup/props)} ::props (ig/ref ::setup/props)}
@ -445,7 +432,6 @@
::sto/storage ::sto/storage
{::db/pool (ig/ref ::db/pool) {::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::sto/backends ::sto/backends
{:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) {:assets-s3 (ig/ref [::assets :app.storage.s3/backend])
:assets-fs (ig/ref [::assets :app.storage.fs/backend])}} :assets-fs (ig/ref [::assets :app.storage.fs/backend])}}

View file

@ -30,12 +30,11 @@
[app.storage :as-alias sto] [app.storage :as-alias sto]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.core :as p] [promesa.core :as p]
[yetti.request :as yrq] [ring.request :as rreq]
[yetti.response :as yrs])) [ring.response :as rres]))
(s/def ::profile-id ::us/uuid) (s/def ::profile-id ::us/uuid)
@ -61,9 +60,9 @@
(if (fn? result) (if (fn? result)
(result request) (result request)
(let [mdata (meta result)] (let [mdata (meta result)]
(-> {::yrs/status (::http/status mdata 200) (-> {::rres/status (::http/status mdata 200)
::yrs/headers (::http/headers mdata {}) ::rres/headers (::http/headers mdata {})
::yrs/body (rph/unwrap result)} ::rres/body (rph/unwrap result)}
(handle-response-transformation request mdata) (handle-response-transformation request mdata)
(handle-before-comple-hook mdata))))) (handle-before-comple-hook mdata)))))
@ -72,7 +71,7 @@
internal async flow into ring async flow." internal async flow into ring async flow."
[methods {:keys [params path-params] :as request}] [methods {:keys [params path-params] :as request}]
(let [type (keyword (:type path-params)) (let [type (keyword (:type path-params))
etag (yrq/get-header request "if-none-match") etag (rreq/get-header request "if-none-match")
profile-id (or (::session/profile-id request) profile-id (or (::session/profile-id request)
(::actoken/profile-id request)) (::actoken/profile-id request))
@ -138,17 +137,20 @@
(f cfg (us/conform spec params))) (f cfg (us/conform spec params)))
f))) f)))
;; TODO: integrate with sm/define
(defn- wrap-params-validation (defn- wrap-params-validation
[_ f mdata] [_ f mdata]
(if-let [schema (::sm/params mdata)] (if-let [schema (::sm/params mdata)]
(let [schema (sm/schema schema) (let [schema (if (sm/lazy-schema? schema)
valid? (sm/validator schema) schema
explain (sm/explainer schema) (sm/define schema))
decode (sm/decoder schema sm/default-transformer)] validate (sm/validator schema)
explain (sm/explainer schema)
decode (sm/decoder schema)]
(fn [cfg params] (fn [cfg params]
(let [params (decode params)] (let [params (decode params)]
(if (valid? params) (if (validate params)
(f cfg params) (f cfg params)
(ex/raise :type :validation (ex/raise :type :validation
:code :params-validation :code :params-validation
@ -159,13 +161,15 @@
[_ f mdata] [_ f mdata]
(if (contains? cf/flags :rpc-output-validation) (if (contains? cf/flags :rpc-output-validation)
(or (when-let [schema (::sm/result mdata)] (or (when-let [schema (::sm/result mdata)]
(let [schema (sm/schema schema) (let [schema (if (sm/lazy-schema? schema)
valid? (sm/validator schema) schema
explain (sm/explainer schema)] (sm/define schema))
validate (sm/validator schema)
explain (sm/explainer schema)]
(fn [cfg params] (fn [cfg params]
(let [response (f cfg params)] (let [response (f cfg params)]
(when (map? response) (when (map? response)
(when-not (valid? response) (when-not (validate response)
(ex/raise :type :validation (ex/raise :type :validation
:code :data-validation :code :data-validation
::sm/explain (explain response)))) ::sm/explain (explain response))))
@ -237,8 +241,7 @@
::ldap/provider ::ldap/provider
::sto/storage ::sto/storage
::mtx/metrics ::mtx/metrics
::main/props ::main/props]
::wrk/executor]
:opt [::climit :opt [::climit
::rlimit] ::rlimit]
:req-un [::db/pool])) :req-un [::db/pool]))
@ -257,7 +260,6 @@
(s/keys :req [::methods (s/keys :req [::methods
::db/pool ::db/pool
::main/props ::main/props
::wrk/executor
::session/manager])) ::session/manager]))
(defmethod ig/init-key ::routes (defmethod ig/init-key ::routes

View file

@ -31,19 +31,24 @@
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(defn- id->str
[id]
(-> (str id)
(subs 1)))
(defn- create-bulkhead-cache (defn- create-bulkhead-cache
[{:keys [::wrk/executor]} config] [config]
(letfn [(load-fn [key] (letfn [(load-fn [[id skey]]
(let [config (get config (nth key 0))] (when-let [config (get config id)]
(l/trc :hint "insert into cache" :key key) (l/trc :hint "insert into cache" :id (id->str id) :key skey)
(pbh/create :permits (or (:permits config) (:concurrency config)) (pbh/create :permits (or (:permits config) (:concurrency config))
:queue (or (:queue config) (:queue-size config)) :queue (or (:queue config) (:queue-size config))
:timeout (:timeout config) :timeout (:timeout config)
:executor executor :type :semaphore)))
:type (:type config :semaphore))))
(on-remove [_ _ cause] (on-remove [key _ cause]
(l/trc :hint "evict from cache" :key key :reason (str cause)))] (let [[id skey] key]
(l/trc :hint "evict from cache" :id (id->str id) :key skey :reason (str cause))))]
(cache/create :executor :same-thread (cache/create :executor :same-thread
:on-remove on-remove :on-remove on-remove
@ -65,22 +70,21 @@
(s/def ::path ::fs/path) (s/def ::path ::fs/path)
(defmethod ig/pre-init-spec ::rpc/climit [_] (defmethod ig/pre-init-spec ::rpc/climit [_]
(s/keys :req [::wrk/executor ::mtx/metrics ::path])) (s/keys :req [::mtx/metrics ::path]))
(defmethod ig/init-key ::rpc/climit (defmethod ig/init-key ::rpc/climit
[_ {:keys [::path ::mtx/metrics ::wrk/executor] :as cfg}] [_ {:keys [::path ::mtx/metrics] :as cfg}]
(when (contains? cf/flags :rpc-climit) (when (contains? cf/flags :rpc-climit)
(when-let [params (some->> path slurp edn/read-string)] (when-let [params (some->> path slurp edn/read-string)]
(l/inf :hint "initializing concurrency limit" :config (str path)) (l/inf :hint "initializing concurrency limit" :config (str path))
(us/verify! ::config params) (us/verify! ::config params)
{::cache (create-bulkhead-cache cfg params) {::cache (create-bulkhead-cache params)
::config params ::config params
::wrk/executor executor
::mtx/metrics metrics}))) ::mtx/metrics metrics})))
(s/def ::cache cache/cache?) (s/def ::cache cache/cache?)
(s/def ::instance (s/def ::instance
(s/keys :req [::cache ::config ::wrk/executor])) (s/keys :req [::cache ::config]))
(s/def ::rpc/climit (s/def ::rpc/climit
(s/nilable ::instance)) (s/nilable ::instance))
@ -91,107 +95,94 @@
(defn invoke! (defn invoke!
[cache metrics id key f] [cache metrics id key f]
(let [limiter (cache/get cache [id key]) (if-let [limiter (cache/get cache [id key])]
tpoint (dt/tpoint) (let [tpoint (dt/tpoint)
labels (into-array String [(name id)]) labels (into-array String [(id->str id)])
wrapped (fn []
(let [elapsed (tpoint)
stats (pbh/get-stats limiter)]
(l/trc :hint "acquired"
:id (id->str id)
:key key
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed))
wrapped (mtx/run! metrics
(fn [] :id :rpc-climit-timing
(let [elapsed (tpoint) :val (inst-ms elapsed)
stats (pbh/get-stats limiter)] :labels labels)
(l/trc :hint "executed" (try
:id (name id) (f)
:key key (finally
:fnh (hash f) (let [elapsed (tpoint)]
:permits (:permits stats) (l/trc :hint "finished"
:queue (:queue stats) :id (id->str id)
:max-permits (:max-permits stats) :key key
:max-queue (:max-queue stats) :permits (:permits stats)
:elapsed (dt/format-duration elapsed)) :queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed)))))))
measure!
(fn [stats]
(mtx/run! metrics (mtx/run! metrics
:id :rpc-climit-timing :id :rpc-climit-queue
:val (inst-ms elapsed) :val (:queue stats)
:labels labels) :labels labels)
(try (mtx/run! metrics
(f) :id :rpc-climit-permits
(finally :val (:permits stats)
(let [elapsed (tpoint)] :labels labels))]
(l/trc :hint "finished"
:id (name id)
:key key
:fnh (hash f)
:permits (:permits stats)
:queue (:queue stats)
:max-permits (:max-permits stats)
:max-queue (:max-queue stats)
:elapsed (dt/format-duration elapsed)))))))
measure!
(fn [stats]
(mtx/run! metrics
:id :rpc-climit-queue
:val (:queue stats)
:labels labels)
(mtx/run! metrics
:id :rpc-climit-permits
:val (:permits stats)
:labels labels))]
(try (try
(let [stats (pbh/get-stats limiter)] (let [stats (pbh/get-stats limiter)]
(measure! stats) (measure! stats)
(l/trc :hint "enqueued" (l/trc :hint "enqueued"
:id (name id) :id (id->str id)
:key key :key key
:fnh (hash f) :permits (:permits stats)
:permits (:permits stats) :queue (:queue stats)
:queue (:queue stats) :max-permits (:max-permits stats)
:max-permits (:max-permits stats) :max-queue (:max-queue stats))
:max-queue (:max-queue stats)) (pbh/invoke! limiter wrapped))
(pbh/invoke! limiter wrapped)) (catch ExceptionInfo cause
(catch ExceptionInfo cause (let [{:keys [type code]} (ex-data cause)]
(let [{:keys [type code]} (ex-data cause)] (if (= :bulkhead-error type)
(if (= :bulkhead-error type) (ex/raise :type :concurrency-limit
(ex/raise :type :concurrency-limit :code code
:code code :hint "concurrency limit reached")
:hint "concurrency limit reached") (throw cause))))
(throw cause))))
(finally (finally
(measure! (pbh/get-stats limiter)))))) (measure! (pbh/get-stats limiter)))))
(do
(defn run! (l/wrn :hint "unable to load limiter" :id (id->str id))
[{:keys [::id ::cache ::mtx/metrics]} f] (f))))
(if (and cache id)
(invoke! cache metrics id nil f)
(f)))
(defn submit!
[{:keys [::id ::cache ::wrk/executor ::mtx/metrics]} f]
(let [f (partial px/submit! executor (px/wrap-bindings f))]
(if (and cache id)
(p/await! (invoke! cache metrics id nil f))
(p/await! (f)))))
(defn configure (defn configure
([{:keys [::rpc/climit]} id] [{:keys [::rpc/climit]} id]
(us/assert! ::rpc/climit climit) (us/assert! ::rpc/climit climit)
(assoc climit ::id id)) (assoc climit ::id id))
([{:keys [::rpc/climit]} id executor]
(us/assert! ::rpc/climit climit)
(-> climit
(assoc ::id id)
(assoc ::wrk/executor executor))))
(defmacro with-dispatch! (defn run!
"Dispatch blocking operation to a separated thread protected with the "Run a function in context of climit.
specified concurrency limiter. If climit is not active, the function Intended to be used in virtual threads."
will be scheduled to execute without concurrency monitoring." ([{:keys [::id ::cache ::mtx/metrics]} f]
[instance & body] (if (and cache id)
(if (vector? instance) (invoke! cache metrics id nil f)
`(-> (app.rpc.climit/configure ~@instance) (f)))
(app.rpc.climit/run! (^:once fn* [] ~@body)))
`(run! ~instance (^:once fn* [] ~@body)))) ([{:keys [::id ::cache ::mtx/metrics]} f executor]
(let [f (fn []
(let [f (px/wrap-bindings f)]
(p/await! (px/submit! executor f))))]
(if (and cache id)
(invoke! cache metrics id nil f)
(f)))))
(def noop-fn (constantly nil)) (def noop-fn (constantly nil))
@ -201,7 +192,7 @@
(if-let [config (get-in climit [::config id])] (if-let [config (get-in climit [::config id])]
(let [cache (::cache climit)] (let [cache (::cache climit)]
(l/dbg :hint "instrumenting method" (l/dbg :hint "instrumenting method"
:limit (name id) :limit (id->str id)
:service-name (::sv/name mdata) :service-name (::sv/name mdata)
:timeout (:timeout config) :timeout (:timeout config)
:permits (:permits config) :permits (:permits config)
@ -212,7 +203,7 @@
(invoke! cache metrics id (key-fn params) (partial f cfg params)))) (invoke! cache metrics id (key-fn params) (partial f cfg params))))
(do (do
(l/wrn :hint "no config found for specified queue" :id id) (l/wrn :hint "no config found for specified queue" :id (id->str id))
f)) f))
f)) f))

View file

@ -64,7 +64,7 @@
[:events [:vector schema:event]]]) [:events [:vector schema:event]]])
(sv/defmethod ::push-audit-events (sv/defmethod ::push-audit-events
{::climit/id :submit-audit-events-by-profile {::climit/id :submit-audit-events/by-profile
::climit/key-fn ::rpc/profile-id ::climit/key-fn ::rpc/profile-id
::sm/params schema:push-audit-events ::sm/params schema:push-audit-events
::audit/skip true ::audit/skip true

View file

@ -44,8 +44,8 @@
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.io :as io] [datoteka.io :as io]
[promesa.util :as pu] [promesa.util :as pu]
[yetti.adapter :as yt] [ring.response :as rres]
[yetti.response :as yrs]) [yetti.adapter :as yt])
(:import (:import
com.github.luben.zstd.ZstdInputStream com.github.luben.zstd.ZstdInputStream
com.github.luben.zstd.ZstdOutputStream com.github.luben.zstd.ZstdOutputStream
@ -1069,7 +1069,7 @@
::webhooks/event? true} ::webhooks/event? true}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}] [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}]
(files/check-read-permissions! pool profile-id file-id) (files/check-read-permissions! pool profile-id file-id)
(let [body (reify yrs/StreamableResponseBody (let [body (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream] (-write-body-to-stream [_ _ output-stream]
(-> cfg (-> cfg
(assoc ::file-ids [file-id]) (assoc ::file-ids [file-id])
@ -1078,9 +1078,9 @@
(export! output-stream))))] (export! output-stream))))]
(fn [_] (fn [_]
{::yrs/status 200 {::rres/status 200
::yrs/body body ::rres/body body
::yrs/headers {"content-type" "application/octet-stream"}}))) ::rres/headers {"content-type" "application/octet-stream"}})))
(s/def ::file ::media/upload) (s/def ::file ::media/upload)
(s/def ::import-binfile (s/def ::import-binfile

View file

@ -34,6 +34,7 @@
[app.util.pointer-map :as pmap] [app.util.pointer-map :as pmap]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.set :as set])) [clojure.set :as set]))
;; --- SCHEMA ;; --- SCHEMA
@ -133,8 +134,8 @@
;; database. ;; database.
(sv/defmethod ::update-file (sv/defmethod ::update-file
{::climit/id :update-file-by-id {::climit/id :update-file/by-profile
::climit/key-fn :id ::climit/key-fn ::rpc/profile-id
::webhooks/event? true ::webhooks/event? true
::webhooks/batch-timeout (dt/duration "2m") ::webhooks/batch-timeout (dt/duration "2m")
::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id) ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id)
@ -231,13 +232,15 @@
:team-id (:team-id file)})))))) :team-id (:team-id file)}))))))
(defn- update-file* (defn- update-file*
[{:keys [::db/conn] :as cfg} [{:keys [::db/conn ::wrk/executor] :as cfg}
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}] {:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
(let [;; Process the file data in the CLIMIT context; scheduling it (let [;; Process the file data in the CLIMIT context; scheduling it
;; to be executed on a separated executor for avoid to do the ;; to be executed on a separated executor for avoid to do the
;; CPU intensive operation on vthread. ;; CPU intensive operation on vthread.
file (-> (climit/configure cfg :update-file)
(climit/submit! (partial update-file-data conn file changes skip-validate)))] update-fdata-fn (partial update-file-data conn file changes skip-validate)
file (-> (climit/configure cfg :update-file/global)
(climit/run! update-fdata-fn executor))]
(db/insert! conn :file-change (db/insert! conn :file-change
{:id (uuid/next) {:id (uuid/next)

View file

@ -25,6 +25,7 @@
[app.storage :as sto] [app.storage :as sto]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s])) [clojure.spec.alpha :as s]))
(def valid-weight #{100 200 300 400 500 600 700 800 900 950}) (def valid-weight #{100 200 300 400 500 600 700 800 900 950})
@ -159,8 +160,9 @@
:ttf-file-id (:id ttf)})) :ttf-file-id (:id ttf)}))
] ]
(let [data (-> (climit/configure cfg :process-font) (let [data (-> (climit/configure cfg :process-font/global)
(climit/submit! (partial generate-missing! data))) (climit/run! (partial generate-missing! data)
(::wrk/executor cfg)))
assets (persist-fonts-files! data) assets (persist-fonts-files! data)
result (insert-font-variant! assets)] result (insert-font-variant! assets)]
(vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys)))))) (vary-meta result assoc ::audit/replace-props (update params :data (comp vec keys))))))

View file

@ -23,6 +23,7 @@
[app.storage :as sto] [app.storage :as sto]
[app.storage.tmp :as tmp] [app.storage.tmp :as tmp]
[app.util.services :as sv] [app.util.services :as sv]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.io :as io])) [datoteka.io :as io]))
@ -142,11 +143,11 @@
(assoc ::image (process-main-image info))))) (assoc ::image (process-main-image info)))))
(defn create-file-media-object (defn create-file-media-object
[{:keys [::sto/storage ::db/conn] :as cfg} [{:keys [::sto/storage ::db/conn ::wrk/executor] :as cfg}
{:keys [id file-id is-local name content]}] {:keys [id file-id is-local name content]}]
(let [result (-> (climit/configure cfg :process-image) (let [result (-> (climit/configure cfg :process-image/global)
(climit/submit! (partial process-image content))) (climit/run! (partial process-image content) executor))
image (sto/put-object! storage (::image result)) image (sto/put-object! storage (::image result))
thumb (when-let [params (::thumb result)] thumb (when-let [params (::thumb result)]

View file

@ -26,6 +26,7 @@
[app.tokens :as tokens] [app.tokens :as tokens]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as-alias wrk]
[cuerdas.core :as str])) [cuerdas.core :as str]))
(declare check-profile-existence!) (declare check-profile-existence!)
@ -36,21 +37,23 @@
(declare strip-private-attrs) (declare strip-private-attrs)
(declare verify-password) (declare verify-password)
(def ^:private schema:profile (def ^:private
[:map {:title "Profile"} schema:profile
[:id ::sm/uuid] (sm/define
[:fullname [::sm/word-string {:max 250}]] [:map {:title "Profile"}
[:email ::sm/email] [:id ::sm/uuid]
[:is-active {:optional true} :boolean] [:fullname [::sm/word-string {:max 250}]]
[:is-blocked {:optional true} :boolean] [:email ::sm/email]
[:is-demo {:optional true} :boolean] [:is-active {:optional true} :boolean]
[:is-muted {:optional true} :boolean] [:is-blocked {:optional true} :boolean]
[:created-at {:optional true} ::sm/inst] [:is-demo {:optional true} :boolean]
[:modified-at {:optional true} ::sm/inst] [:is-muted {:optional true} :boolean]
[:default-project-id {:optional true} ::sm/uuid] [:created-at {:optional true} ::sm/inst]
[:default-team-id {:optional true} ::sm/uuid] [:modified-at {:optional true} ::sm/inst]
[:props {:optional true} [:default-project-id {:optional true} ::sm/uuid]
[:map-of {:title "ProfileProps"} :keyword :any]]]) [:default-team-id {:optional true} ::sm/uuid]
[:props {:optional true}
[:map-of {:title "ProfileProps"} :keyword :any]]]))
;; --- QUERY: Get profile (own) ;; --- QUERY: Get profile (own)
@ -78,11 +81,13 @@
;; --- MUTATION: Update Profile (own) ;; --- MUTATION: Update Profile (own)
(def schema:update-profile (def ^:private
[:map {:title "update-profile"} schema:update-profile
[:fullname [::sm/word-string {:max 250}]] (sm/define
[:lang {:optional true} [:string {:max 5}]] [:map {:title "update-profile"}
[:theme {:optional true} [:string {:max 250}]]]) [:fullname [::sm/word-string {:max 250}]]
[:lang {:optional true} [:string {:max 5}]]
[:theme {:optional true} [:string {:max 250}]]]))
(sv/defmethod ::update-profile (sv/defmethod ::update-profile
{::doc/added "1.0" {::doc/added "1.0"
@ -123,11 +128,13 @@
(declare update-profile-password!) (declare update-profile-password!)
(declare invalidate-profile-session!) (declare invalidate-profile-session!)
(def schema:update-profile-password (def ^:private
[:map {:title "update-profile-password"} schema:update-profile-password
[:password [::sm/word-string {:max 500}]] (sm/define
;; Social registered users don't have old-password [:map {:title "update-profile-password"}
[:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]]) [:password [::sm/word-string {:max 500}]]
;; Social registered users don't have old-password
[:old-password {:optional true} [:maybe [::sm/word-string {:max 500}]]]]))
(sv/defmethod ::update-profile-password (sv/defmethod ::update-profile-password
{:doc/added "1.0" {:doc/added "1.0"
@ -177,9 +184,11 @@
(declare upload-photo) (declare upload-photo)
(declare update-profile-photo) (declare update-profile-photo)
(def schema:update-profile-photo (def ^:private
[:map {:title "update-profile-photo"} schema:update-profile-photo
[:file ::media/upload]]) (sm/define
[:map {:title "update-profile-photo"}
[:file ::media/upload]]))
(sv/defmethod ::update-profile-photo (sv/defmethod ::update-profile-photo
{:doc/added "1.1" {:doc/added "1.1"
@ -230,9 +239,9 @@
:content-type (:mtype thumb)})) :content-type (:mtype thumb)}))
(defn upload-photo (defn upload-photo
[{:keys [::sto/storage] :as cfg} {:keys [file]}] [{:keys [::sto/storage ::wrk/executor] :as cfg} {:keys [file]}]
(let [params (-> (climit/configure cfg :process-image) (let [params (-> (climit/configure cfg :process-image/global)
(climit/submit! (partial generate-thumbnail! file)))] (climit/run! (partial generate-thumbnail! file) executor))]
(sto/put-object! storage params))) (sto/put-object! storage params)))
@ -241,9 +250,11 @@
(declare ^:private request-email-change!) (declare ^:private request-email-change!)
(declare ^:private change-email-immediately!) (declare ^:private change-email-immediately!)
(def schema:request-email-change (def ^:private
[:map {:title "request-email-change"} schema:request-email-change
[:email ::sm/email]]) (sm/define
[:map {:title "request-email-change"}
[:email ::sm/email]]))
(sv/defmethod ::request-email-change (sv/defmethod ::request-email-change
{::doc/added "1.0" {::doc/added "1.0"
@ -308,9 +319,11 @@
;; --- MUTATION: Update Profile Props ;; --- MUTATION: Update Profile Props
(def schema:update-profile-props (def ^:private
[:map {:title "update-profile-props"} schema:update-profile-props
[:props [:map-of :keyword :any]]]) (sm/define
[:map {:title "update-profile-props"}
[:props [:map-of :keyword :any]]]))
(sv/defmethod ::update-profile-props (sv/defmethod ::update-profile-props
{::doc/added "1.0" {::doc/added "1.0"
@ -426,13 +439,15 @@
(defn derive-password (defn derive-password
[cfg password] [cfg password]
(when password (when password
(-> (climit/configure cfg :derive-password) (-> (climit/configure cfg :derive-password/global)
(climit/submit! (partial auth/derive-password password))))) (climit/run! (partial auth/derive-password password)
(::wrk/executor cfg)))))
(defn verify-password (defn verify-password
[cfg password password-data] [cfg password password-data]
(-> (climit/configure cfg :derive-password) (-> (climit/configure cfg :derive-password/global)
(climit/submit! (partial auth/verify-password password password-data)))) (climit/run! (partial auth/verify-password password password-data)
(::wrk/executor cfg))))
(defn decode-row (defn decode-row
[{:keys [props] :as row}] [{:keys [props] :as row}]

View file

@ -29,7 +29,7 @@
[app.util.services :as-alias sv] [app.util.services :as-alias sv]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
[buddy.core.hash :as bh] [buddy.core.hash :as bh]
[yetti.response :as yrs])) [ring.response :as-alias rres]))
(def (def
^{:dynamic true ^{:dynamic true
@ -57,7 +57,7 @@
(let [key' (when (or key reuse-key?) (let [key' (when (or key reuse-key?)
(some->> (get-object cfg params) (key-fn params) (fmt-key)))] (some->> (get-object cfg params) (key-fn params) (fmt-key)))]
(if (and (some? key) (= key key')) (if (and (some? key) (= key key'))
(fn [_] {::yrs/status 304}) (fn [_] {::rres/status 304})
(let [result (f cfg params) (let [result (f cfg params)
etag (or (and reuse-key? key') etag (or (and reuse-key? key')
(some-> result meta ::key fmt-key) (some-> result meta ::key fmt-key)

View file

@ -27,7 +27,7 @@
[integrant.core :as ig] [integrant.core :as ig]
[malli.transform :as mt] [malli.transform :as mt]
[pretty-spec.core :as ps] [pretty-spec.core :as ps]
[yetti.response :as yrs])) [ring.response :as-alias rres]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; DOC (human readable) ;; DOC (human readable)
@ -86,11 +86,11 @@
(let [params (:query-params request) (let [params (:query-params request)
pstyle (:type params "js") pstyle (:type params "js")
context (assoc context :param-style pstyle)] context (assoc context :param-style pstyle)]
{::yrs/status 200 {::rres/status 200
::yrs/body (-> (io/resource "app/templates/api-doc.tmpl") ::rres/body (-> (io/resource "app/templates/api-doc.tmpl")
(tmpl/render context))})) (tmpl/render context))}))
(fn [_] (fn [_]
{::yrs/status 404}))) {::rres/status 404})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; OPENAPI / SWAGGER (v3.1) ;; OPENAPI / SWAGGER (v3.1)
@ -173,12 +173,12 @@
[context] [context]
(if (contains? cf/flags :backend-openapi-doc) (if (contains? cf/flags :backend-openapi-doc)
(fn [_] (fn [_]
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "application/json; charset=utf-8"} ::rres/headers {"content-type" "application/json; charset=utf-8"}
::yrs/body (json/encode context)}) ::rres/body (json/encode context)})
(fn [_] (fn [_]
{::yrs/status 404}))) {::rres/status 404})))
(defn openapi-handler (defn openapi-handler
[] []
@ -189,12 +189,12 @@
context {:public-uri (cf/get :public-uri) context {:public-uri (cf/get :public-uri)
:swagger-js swagger-js :swagger-js swagger-js
:swagger-css swagger-cs}] :swagger-css swagger-cs}]
{::yrs/status 200 {::rres/status 200
::yrs/headers {"content-type" "text/html"} ::rres/headers {"content-type" "text/html"}
::yrs/body (-> (io/resource "app/templates/openapi.tmpl") ::rres/body (-> (io/resource "app/templates/openapi.tmpl")
(tmpl/render context))})) (tmpl/render context))}))
(fn [_] (fn [_]
{::yrs/status 404}))) {::rres/status 404})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; MODULE INIT ;; MODULE INIT

View file

@ -11,7 +11,7 @@
[app.common.data.macros :as dm] [app.common.data.macros :as dm]
[app.http :as-alias http] [app.http :as-alias http]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
[yetti.response :as-alias yrs])) [ring.response :as-alias rres]))
;; A utilty wrapper object for wrap service responses that does not ;; A utilty wrapper object for wrap service responses that does not
;; implements the IObj interface that make possible attach metadata to ;; implements the IObj interface that make possible attach metadata to
@ -77,4 +77,4 @@
(fn [_ response] (fn [_ response]
(let [exp (if (integer? max-age) max-age (inst-ms max-age)) (let [exp (if (integer? max-age) max-age (inst-ms max-age))
val (dm/fmt "max-age=%" (int (/ exp 1000.0)))] val (dm/fmt "max-age=%" (int (/ exp 1000.0)))]
(update response ::yrs/headers assoc "cache-control" val))))) (update response ::rres/headers assoc "cache-control" val)))))

View file

@ -18,7 +18,6 @@
[app.storage.impl :as impl] [app.storage.impl :as impl]
[app.storage.s3 :as ss3] [app.storage.s3 :as ss3]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.fs :as fs] [datoteka.fs :as fs]
[integrant.core :as ig] [integrant.core :as ig]
@ -40,7 +39,7 @@
:fs ::sfs/backend)))) :fs ::sfs/backend))))
(defmethod ig/pre-init-spec ::storage [_] (defmethod ig/pre-init-spec ::storage [_]
(s/keys :req [::db/pool ::wrk/executor ::backends])) (s/keys :req [::db/pool ::backends]))
(defmethod ig/init-key ::storage (defmethod ig/init-key ::storage
[_ {:keys [::backends ::db/pool] :as cfg}] [_ {:keys [::backends ::db/pool] :as cfg}]

View file

@ -11,7 +11,6 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.db :as-alias db] [app.db :as-alias db]
[app.storage :as-alias sto] [app.storage :as-alias sto]
[app.worker :as-alias wrk]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
[buddy.core.hash :as bh] [buddy.core.hash :as bh]
[clojure.java.io :as jio] [clojure.java.io :as jio]
@ -201,7 +200,7 @@
(str "blake2b:" result))) (str "blake2b:" result)))
(defn resolve-backend (defn resolve-backend
[{:keys [::db/pool ::wrk/executor] :as storage} backend-id] [{:keys [::db/pool] :as storage} backend-id]
(let [backend (get-in storage [::sto/backends backend-id])] (let [backend (get-in storage [::sto/backends backend-id])]
(when-not backend (when-not backend
(ex/raise :type :internal (ex/raise :type :internal
@ -209,7 +208,6 @@
:hint (dm/fmt "backend '%' not configured" backend-id))) :hint (dm/fmt "backend '%' not configured" backend-id)))
(-> backend (-> backend
(assoc ::sto/id backend-id) (assoc ::sto/id backend-id)
(assoc ::wrk/executor executor)
(assoc ::db/pool pool)))) (assoc ::db/pool pool))))
(defrecord StorageObject [id size created-at expired-at touched-at backend]) (defrecord StorageObject [id size created-at expired-at touched-at backend])

View file

@ -15,8 +15,8 @@
[app.util.time :as dt] [app.util.time :as dt]
[promesa.exec :as px] [promesa.exec :as px]
[promesa.exec.csp :as sp] [promesa.exec.csp :as sp]
[yetti.request :as yr] [ring.request :as rreq]
[yetti.util :as yu] [ring.websocket :as rws]
[yetti.websocket :as yws]) [yetti.websocket :as yws])
(:import (:import
java.nio.ByteBuffer)) java.nio.ByteBuffer))
@ -50,7 +50,7 @@
(declare start-io-loop!) (declare start-io-loop!)
(defn handler (defn listener
"A WebSocket upgrade handler factory. Returns a handler that can be "A WebSocket upgrade handler factory. Returns a handler that can be
used to upgrade to websocket connection. This handler implements the used to upgrade to websocket connection. This handler implements the
basic custom protocol on top of websocket connection with all the basic custom protocol on top of websocket connection with all the
@ -61,37 +61,34 @@
It also accepts some options that allows you parametrize the It also accepts some options that allows you parametrize the
protocol behavior. The options map will be used as-as for the protocol behavior. The options map will be used as-as for the
initial data of the `ws` data structure" initial data of the `ws` data structure"
[& {:keys [::on-rcv-message [request & {:keys [::on-rcv-message
::on-snd-message ::on-snd-message
::on-connect ::on-connect
::input-buff-size ::input-buff-size
::output-buff-size ::output-buff-size
::idle-timeout] ::idle-timeout]
:or {input-buff-size 64 :or {input-buff-size 64
output-buff-size 64 output-buff-size 64
idle-timeout 60000 idle-timeout 60000
on-connect identity on-connect identity
on-snd-message identity-3 on-snd-message identity-3
on-rcv-message identity-3} on-rcv-message identity-3}
:as options}] :as options}]
(assert (fn? on-rcv-message) "'on-rcv-message' should be a function") (assert (fn? on-rcv-message) "'on-rcv-message' should be a function")
(assert (fn? on-snd-message) "'on-snd-message' should be a function") (assert (fn? on-snd-message) "'on-snd-message' should be a function")
(assert (fn? on-connect) "'on-connect' should be a function") (assert (fn? on-connect) "'on-connect' should be a function")
(fn [{:keys [::yws/channel] :as request}] (let [input-ch (sp/chan :buf input-buff-size)
(let [input-ch (sp/chan :buf input-buff-size) output-ch (sp/chan :buf output-buff-size)
output-ch (sp/chan :buf output-buff-size) hbeat-ch (sp/chan :buf (sp/sliding-buffer 6))
hbeat-ch (sp/chan :buf (sp/sliding-buffer 6)) close-ch (sp/chan)
close-ch (sp/chan) ip-addr (parse-client-ip request)
uagent (rreq/get-header request "user-agent")
ip-addr (parse-client-ip request) id (uuid/next)
uagent (yr/get-header request "user-agent") state (atom {})
id (uuid/next) beats (atom #{})
state (atom {}) options (-> options
beats (atom #{})
options (-> options
(update ::handler wrap-handler) (update ::handler wrap-handler)
(assoc ::id id) (assoc ::id id)
(assoc ::state state) (assoc ::state state)
@ -101,126 +98,118 @@
(assoc ::heartbeat-ch hbeat-ch) (assoc ::heartbeat-ch hbeat-ch)
(assoc ::output-ch output-ch) (assoc ::output-ch output-ch)
(assoc ::close-ch close-ch) (assoc ::close-ch close-ch)
(assoc ::channel channel)
(assoc ::remote-addr ip-addr) (assoc ::remote-addr ip-addr)
(assoc ::user-agent uagent) (assoc ::user-agent uagent))]
{:on-open
(fn on-open [channel]
(l/trace :fn "on-open" :conn-id id :channel channel)
(let [options (-> options
(assoc ::channel channel)
(on-connect)) (on-connect))
timeout (dt/duration idle-timeout)]
on-ws-open (yws/set-idle-timeout! channel timeout)
(fn [channel] (px/submit! :vthread (partial start-io-loop! options))))
(l/trace :fn "on-ws-open" :conn-id id)
(let [timeout (dt/duration idle-timeout)
name (str "penpot/websocket/io-loop/" id)]
(yws/idle-timeout! channel timeout)
(px/fn->thread (partial start-io-loop! options)
{:name name :virtual true})))
on-ws-terminate :on-close
(fn [_ code reason] (fn on-close [_channel code reason]
(l/trace :fn "on-ws-terminate" (l/info :fn "on-ws-terminate"
:conn-id id :conn-id id
:code code :code code
:reason reason) :reason reason)
(sp/close! close-ch)) (sp/close! close-ch))
on-ws-error :on-error
(fn [_ cause] (fn on-error [_channel cause]
(sp/close! close-ch cause)) (sp/close! close-ch cause))
on-ws-message :on-message
(fn [_ message] (fn on-message [_channel message]
(sp/offer! input-ch message) (when (string? message)
(swap! state assoc ::last-activity-at (dt/now))) (sp/offer! input-ch message)
(swap! state assoc ::last-activity-at (dt/now))))
on-ws-pong :on-pong
(fn [_ buffers] (fn on-pong [_channel data]
;; (l/trace :fn "on-ws-pong" :buffers (pr-str buffers)) (l/trace :fn "on-pong" :data data)
(sp/put! hbeat-ch (yu/copy-many buffers)))] (sp/put! hbeat-ch data))}))
(yws/on-close! channel (fn [_]
(sp/close! close-ch)))
{:on-open on-ws-open
:on-error on-ws-error
:on-close on-ws-terminate
:on-text on-ws-message
:on-pong on-ws-pong})))
(defn- handle-ping! (defn- handle-ping!
[{:keys [::id ::beats ::channel] :as wsp} beat-id] [{:keys [::id ::beats ::channel] :as wsp} beat-id]
(l/trace :hint "ping" :beat beat-id :conn-id id) (l/trace :hint "send ping" :beat beat-id :conn-id id)
(yws/ping! channel (encode-beat beat-id)) (rws/ping channel (encode-beat beat-id))
(let [issued (swap! beats conj (long beat-id))] (let [issued (swap! beats conj (long beat-id))]
(not (>= (count issued) max-missed-heartbeats)))) (not (>= (count issued) max-missed-heartbeats))))
(defn- start-io-loop! (defn- start-io-loop!
[{:keys [::id ::close-ch ::input-ch ::output-ch ::heartbeat-ch ::channel ::handler ::beats ::on-rcv-message ::on-snd-message] :as wsp}] [{:keys [::id ::close-ch ::input-ch ::output-ch ::heartbeat-ch ::channel ::handler ::beats ::on-rcv-message ::on-snd-message] :as wsp}]
(px/thread (try
{:name (str "penpot/websocket/io-loop/" id) (handler wsp {:type :open})
:virtual true} (loop [i 0]
(try (let [ping-ch (sp/timeout-chan heartbeat-interval)
(handler wsp {:type :open}) [msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])]
(loop [i 0] (when (rws/open? channel)
(let [ping-ch (sp/timeout-chan heartbeat-interval) (cond
[msg p] (sp/alts! [close-ch input-ch output-ch heartbeat-ch ping-ch])] (identical? p ping-ch)
(when (yws/connected? channel) (if (handle-ping! wsp i)
(cond (recur (inc i))
(identical? p ping-ch) (rws/close channel 8802 "missing to many pings"))
(if (handle-ping! wsp i)
(recur (inc i))
(yws/close! channel 8802 "missing to many pings"))
(or (identical? p close-ch) (nil? msg)) (or (identical? p close-ch) (nil? msg))
(do :nothing) (do :nothing)
(identical? p heartbeat-ch) (identical? p heartbeat-ch)
(let [beat (decode-beat msg)] (let [beat (decode-beat msg)]
;; (l/trace :hint "pong" :beat beat :conn-id id) ;; (l/trace :hint "pong" :beat beat :conn-id id)
(swap! beats disj beat) (swap! beats disj beat)
(recur i)) (recur i))
(identical? p input-ch) (identical? p input-ch)
(let [message (t/decode-str msg) (let [message (t/decode-str msg)
message (on-rcv-message message) message (on-rcv-message message)
{:keys [request-id] :as response} (handler wsp message)] {:keys [request-id] :as response} (handler wsp message)]
(when (map? response) (when (map? response)
(sp/put! output-ch (sp/put! output-ch
(cond-> response (cond-> response
(some? request-id) (some? request-id)
(assoc :request-id request-id)))) (assoc :request-id request-id))))
(recur i)) (recur i))
(identical? p output-ch) (identical? p output-ch)
(let [message (on-snd-message msg) (let [message (on-snd-message msg)
message (t/encode-str message {:type :json-verbose})] message (t/encode-str message {:type :json-verbose})]
;; (l/trace :hint "writing message to output" :message msg) ;; (l/trace :hint "writing message to output" :message msg)
(yws/send! channel message) (rws/send channel message)
(recur i)))))) (recur i))))))
(catch java.nio.channels.ClosedChannelException _) (catch java.nio.channels.ClosedChannelException _)
(catch java.net.SocketException _) (catch java.net.SocketException _)
(catch java.io.IOException _) (catch java.io.IOException _)
(catch InterruptedException _ (catch InterruptedException _cause
(l/debug :hint "websocket thread interrumpted" :conn-id id)) (l/debug :hint "websocket thread interrumpted" :conn-id id))
(catch Throwable cause (catch Throwable cause
(l/error :hint "unhandled exception on websocket thread" (l/error :hint "unhandled exception on websocket thread"
:conn-id id :conn-id id
:cause cause)) :cause cause))
(finally
(finally (try
(handler wsp {:type :close}) (handler wsp {:type :close})
(when (yws/connected? channel) (when (rws/open? channel)
;; NOTE: we need to ignore all exceptions here because ;; NOTE: we need to ignore all exceptions here because
;; there can be a race condition that first returns that ;; there can be a race condition that first returns that
;; channel is connected but on closing, will raise that ;; channel is connected but on closing, will raise that
;; channel is already closed. ;; channel is already closed.
(ex/ignoring (ex/ignoring
(yws/close! channel 8899 "terminated"))) (rws/close channel 8899 "terminated")))
(when-let [on-disconnect (::on-disconnect wsp)] (when-let [on-disconnect (::on-disconnect wsp)]
(on-disconnect)) (on-disconnect))
(l/trace :hint "websocket thread terminated" :conn-id id))))) (catch Throwable cause
(throw cause)))
(l/trace :hint "websocket thread terminated" :conn-id id))))

View file

@ -25,43 +25,45 @@
[promesa.core :as p] [promesa.core :as p]
[promesa.exec :as px]) [promesa.exec :as px])
(:import (:import
java.util.concurrent.ExecutorService java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ForkJoinPool java.util.concurrent.Executor
java.util.concurrent.Future)) java.util.concurrent.Future))
(set! *warn-on-reflection* true) (set! *warn-on-reflection* true)
(s/def ::executor #(instance? ExecutorService %)) (s/def ::executor #(instance? Executor %))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor ;; Executor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::parallelism ::us/integer)
(defmethod ig/pre-init-spec ::executor [_] (defmethod ig/pre-init-spec ::executor [_]
(s/keys :req [::parallelism])) (s/keys :req []))
(defmethod ig/init-key ::executor (defmethod ig/init-key ::executor
[skey {:keys [::parallelism]}] [_ _]
(let [prefix (if (vector? skey) (-> skey first name) "default") (let [factory (px/thread-factory :prefix "penpot/default/")
tname (str "penpot/" prefix "/%s") executor (px/cached-executor :factory factory :keepalive 30000)]
ttype (cf/get :worker-executor-type :fjoin)] (l/inf :hint "starting executor")
(case ttype (reify
:fjoin java.lang.AutoCloseable
(let [factory (px/forkjoin-thread-factory :name tname)] (close [_]
(px/forkjoin-executor {:factory factory (l/inf :hint "stoping executor")
:core-size (px/get-available-processors) (px/shutdown! executor))
:parallelism parallelism
:async true}))
:cached clojure.lang.IDeref
(let [factory (px/thread-factory :name tname)] (deref [_]
(px/cached-executor :factory factory))))) {:active (.getPoolSize ^ThreadPoolExecutor executor)
:running (.getActiveCount ^ThreadPoolExecutor executor)
:completed (.getCompletedTaskCount ^ThreadPoolExecutor executor)})
Executor
(execute [_ runnable]
(.execute ^Executor executor ^Runnable runnable)))))
(defmethod ig/halt-key! ::executor (defmethod ig/halt-key! ::executor
[_ instance] [_ instance]
(px/shutdown! instance)) (.close ^java.lang.AutoCloseable instance))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; TASKS REGISTRY ;; TASKS REGISTRY
@ -111,42 +113,38 @@
(defmethod ig/init-key ::monitor (defmethod ig/init-key ::monitor
[_ {:keys [::executor ::mtx/metrics ::interval ::name]}] [_ {:keys [::executor ::mtx/metrics ::interval ::name]}]
(letfn [(monitor! [^ForkJoinPool executor prev-steals] (letfn [(monitor! [executor prev-completed]
(let [running (.getRunningThreadCount executor) (let [labels (into-array String [(d/name name)])
queued (.getQueuedSubmissionCount executor) stats (deref executor)
active (.getPoolSize executor)
steals (.getStealCount executor)
labels (into-array String [(d/name name)])
steals-inc (- steals prev-steals) completed (:completed stats)
steals-inc (if (neg? steals-inc) 0 steals-inc)] completed-inc (- completed prev-completed)
completed-inc (if (neg? completed-inc) 0 completed-inc)]
(mtx/run! metrics (mtx/run! metrics
:id :executor-active-threads :id :executor-active-threads
:labels labels :labels labels
:val active) :val (:active stats))
(mtx/run! metrics (mtx/run! metrics
:id :executor-running-threads :id :executor-running-threads
:labels labels :val running)
(mtx/run! metrics
:id :executors-queued-submissions
:labels labels :labels labels
:val queued) :val (:running stats))
(mtx/run! metrics (mtx/run! metrics
:id :executors-completed-tasks :id :executors-completed-tasks
:labels labels :labels labels
:inc steals-inc) :inc completed-inc)
steals))] completed-inc))]
(px/thread (px/thread
{:name "penpot/executors-monitor" :virtual true} {:name "penpot/executors-monitor" :virtual true}
(l/inf :hint "monitor: started" :name name) (l/inf :hint "monitor: started" :name name)
(try (try
(loop [steals 0] (loop [completed 0]
(when-not (px/shutdown? executor) (px/sleep interval)
(px/sleep interval) (recur (long (monitor! executor completed))))
(recur (long (monitor! executor steals)))))
(catch InterruptedException _cause (catch InterruptedException _cause
(l/trc :hint "monitor: interrupted" :name name)) (l/trc :hint "monitor: interrupted" :name name))
(catch Throwable cause (catch Throwable cause

View file

@ -429,11 +429,11 @@
(= :params-validation (:code data)) (= :params-validation (:code data))
(app.common.pprint/pprint (app.common.pprint/pprint
(sm/humanize-data (::sm/explain data))) (sm/humanize-explain (::sm/explain data)))
(= :data-validation (:code data)) (= :data-validation (:code data))
(app.common.pprint/pprint (app.common.pprint/pprint
(sm/humanize-data (::sm/explain data))) (sm/humanize-explain (::sm/explain data)))
(= :service-error (:type data)) (= :service-error (:type data))
(print-error! (.getCause ^Throwable error)) (print-error! (.getCause ^Throwable error))

View file

@ -31,17 +31,17 @@
request (volatile! nil) request (volatile! nil)
handler (#'app.http.access-token/wrap-soft-auth handler (#'app.http.access-token/wrap-soft-auth
(fn [req & _] (vreset! request req)) (fn [req] (vreset! request req))
system)] system)]
(with-mocks [m1 {:target 'app.http.access-token/get-token (with-mocks [m1 {:target 'app.http.access-token/get-token
:return nil}] :return nil}]
(handler {} nil nil) (handler {})
(t/is (= {} @request))) (t/is (= {} @request)))
(with-mocks [m1 {:target 'app.http.access-token/get-token (with-mocks [m1 {:target 'app.http.access-token/get-token
:return (:token token)}] :return (:token token)}]
(handler {} nil nil) (handler {})
(let [token-id (get @request :app.http.access-token/id)] (let [token-id (get @request :app.http.access-token/id)]
(t/is (= token-id (:id token)))))))) (t/is (= token-id (:id token))))))))

View file

@ -25,7 +25,7 @@
(def http-request (def http-request
(reify (reify
yetti.request/Request ring.request/Request
(get-header [_ name] (get-header [_ name]
(case name (case name
"x-forwarded-for" "127.0.0.44")))) "x-forwarded-for" "127.0.0.44"))))

View file

@ -46,6 +46,6 @@
{:keys [error result]} (th/command! (assoc params ::cond/key etag))] {:keys [error result]} (th/command! (assoc params ::cond/key etag))]
(t/is (nil? error)) (t/is (nil? error))
(t/is (fn? result)) (t/is (fn? result))
(t/is (= 304 (-> (result nil) :yetti.response/status)))) (t/is (= 304 (-> (result nil) :ring.response/status))))
)))) ))))

View file

@ -48,6 +48,10 @@
;; exception printing ;; exception printing
fipp/fipp {:mvn/version "0.6.26"} fipp/fipp {:mvn/version "0.6.26"}
io.github.eerohele/pp
{:git/tag "2023-11-25.47"
:git/sha "15d572c"}
io.aviso/pretty {:mvn/version "1.4.4"} io.aviso/pretty {:mvn/version "1.4.4"}
environ/environ {:mvn/version "1.2.0"}} environ/environ {:mvn/version "1.2.0"}}
:paths ["src" "vendor" "target/classes"] :paths ["src" "vendor" "target/classes"]

View file

@ -65,23 +65,22 @@
(instance? RuntimeException v))) (instance? RuntimeException v)))
(defn explain (defn explain
([data] (explain data nil)) [data & {:as opts}]
([data opts] (cond
(cond ;; NOTE: a special case for spec validation errors on integrant
;; NOTE: a special case for spec validation errors on integrant (and (= (:reason data) :integrant.core/build-failed-spec)
(and (= (:reason data) :integrant.core/build-failed-spec) (contains? data :explain))
(contains? data :explain)) (explain (:explain data) opts)
(explain (:explain data) opts)
(and (contains? data ::s/problems) (and (contains? data ::s/problems)
(contains? data ::s/value) (contains? data ::s/value)
(contains? data ::s/spec)) (contains? data ::s/spec))
(binding [s/*explain-out* expound/printer] (binding [s/*explain-out* expound/printer]
(with-out-str (with-out-str
(s/explain-out (update data ::s/problems #(take (:length opts 10) %))))) (s/explain-out (update data ::s/problems #(take (:length opts 10) %)))))
(contains? data ::sm/explain) (contains? data ::sm/explain)
(sm/humanize-data (::sm/explain data) opts)))) (sm/humanize-explain (::sm/explain data) opts)))
#?(:clj #?(:clj
(defn format-throwable (defn format-throwable
@ -92,8 +91,8 @@
data? true data? true
explain? true explain? true
chain? true chain? true
data-length 10 data-length 8
data-level 4}}] data-level 5}}]
(letfn [(print-trace-element [^StackTraceElement e] (letfn [(print-trace-element [^StackTraceElement e]
(let [class (.getClassName e) (let [class (.getClassName e)
@ -115,7 +114,7 @@
(print-data [data] (print-data [data]
(when (seq data) (when (seq data)
(print " dt: ") (print " dt: ")
(let [[line & lines] (str/lines (pp/pprint-str data :level data-level :length data-length ))] (let [[line & lines] (str/lines (pp/pprint-str data :level data-level :length data-length))]
(print line) (print line)
(newline) (newline)
(doseq [line lines] (doseq [line lines]

View file

@ -50,7 +50,7 @@
(when-not valid? (when-not valid?
(let [explain (sm/explain ::ch/change change)] (let [explain (sm/explain ::ch/change change)]
(pp/pprint (sm/humanize-data explain)) (pp/pprint (sm/humanize-explain explain))
(when fail-on-spec? (when fail-on-spec?
(ex/raise :type :assertion (ex/raise :type :assertion
:code :data-validation :code :data-validation

View file

@ -7,16 +7,16 @@
(ns app.common.pprint (ns app.common.pprint
(:refer-clojure :exclude [prn]) (:refer-clojure :exclude [prn])
(:require (:require
[fipp.edn :as fpp])) [me.flowthing.pp :as pp]))
(defn pprint-str
[expr & {:keys [width level length]
:or {width 110 level 8 length 25}}]
(binding [*print-level* level
*print-length* length]
(with-out-str
(fpp/pprint expr {:width width}))))
(defn pprint (defn pprint
[expr & {:keys [width level length]
:or {width 120 level 8 length 25}}]
(binding [*print-level* level
*print-length* length]
(pp/pprint expr {:max-width width})))
(defn pprint-str
[expr & {:as opts}] [expr & {:as opts}]
(println (pprint-str expr opts))) (with-out-str
(pprint expr opts)))

View file

@ -16,7 +16,7 @@
[app.common.schema.registry :as sr] [app.common.schema.registry :as sr]
[app.common.uri :as u] [app.common.uri :as u]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[clojure.test.check.generators :as tgen] [clojure.core :as c]
[cuerdas.core :as str] [cuerdas.core :as str]
[malli.core :as m] [malli.core :as m]
[malli.dev.pretty :as mdp] [malli.dev.pretty :as mdp]
@ -26,7 +26,12 @@
[malli.transform :as mt] [malli.transform :as mt]
[malli.util :as mu])) [malli.util :as mu]))
(defprotocol ISchemaOps (defprotocol ILazySchema
(-get-schema [_])
(-get-validator [_])
(-get-explainer [_])
(-get-decoder [_])
(-get-encoder [_])
(-validate [_ o]) (-validate [_ o])
(-explain [_ o]) (-explain [_ o])
(-decode [_ o])) (-decode [_ o]))
@ -34,21 +39,31 @@
(def default-options (def default-options
{:registry sr/default-registry}) {:registry sr/default-registry})
(defn explain
[s value]
(m/explain s value default-options))
(defn schema? (defn schema?
[o] [o]
(m/schema? o)) (m/schema? o))
(defn lazy-schema?
[s]
(satisfies? ILazySchema s))
(defn schema (defn schema
[s] [s]
(m/schema s default-options)) (if (lazy-schema? s)
(-get-schema s)
(m/schema s default-options)))
(defn validate (defn validate
[s value] [s value]
(m/validate s value default-options)) (if (lazy-schema? s)
(-validate s value)
(m/validate s value default-options)))
(defn explain
[s value]
(if (lazy-schema? s)
(-explain s value)
(m/explain s value default-options)))
(defn humanize (defn humanize
[exp] [exp]
@ -113,11 +128,15 @@
(defn validator (defn validator
[s] [s]
(-> s schema m/validator)) (if (lazy-schema? s)
(-get-validator s)
(-> s schema m/validator)))
(defn explainer (defn explainer
[s] [s]
(-> s schema m/explainer)) (if (lazy-schema? s)
(-get-explainer s)
(-> s schema m/explainer)))
(defn encode (defn encode
([s val transformer] ([s val transformer]
@ -131,9 +150,23 @@
([s val options transformer] ([s val options transformer]
(m/decode s val options transformer))) (m/decode s val options transformer)))
(defn decoder (defn encoder
([s]
(if (lazy-schema? s)
(-get-decoder s)
(encoder s default-options default-transformer)))
([s transformer] ([s transformer]
(m/decoder s default-options transformer)) (m/encoder s default-options transformer))
([s options transformer]
(m/encoder s options transformer)))
(defn decoder
([s]
(if (lazy-schema? s)
(-get-decoder s)
(decoder s default-options default-transformer)))
([s transformer]
(m/decoder s default-options transformer))
([s options transformer] ([s options transformer]
(m/decoder s options transformer))) (m/decoder s options transformer)))
@ -152,19 +185,18 @@
(let [vfn (delay (decoder (if (delay? s) (deref s) s) transformer))] (let [vfn (delay (decoder (if (delay? s) (deref s) s) transformer))]
(fn [v] (@vfn v)))) (fn [v] (@vfn v))))
(defn humanize-data (defn humanize-explain
[{:keys [schema errors value]} & {:keys [length level]}] [{:keys [schema errors value]} & {:keys [length level]}]
(let [errors (mapv #(update % :schema form) errors)] (let [errors (mapv #(update % :schema form) errors)]
(with-out-str (with-out-str
(println "Schema: ") (println "Schema: ")
(println (pp/pprint-str (form schema) {:level (d/nilv level 10) (println (pp/pprint-str (form schema) {:width 100 :level 15 :length 20}))
:length (d/nilv length 10)}))
(println "Errors:") (println "Errors:")
(println (pp/pprint-str errors {:level (d/nilv level 10) (println (pp/pprint-str errors {:width 100 :level 15 :length 20}))
:length (d/nilv length 10)}))
(println "Value:") (println "Value:")
(println (pp/pprint-str value {:level (d/nilv level 5) (println (pp/pprint-str value {:width 160
:length (d/nilv length 10)}))))) :level (d/nilv level 8)
:length (d/nilv length 12)})))))
(defn pretty-explain (defn pretty-explain
[s d] [s d]
@ -202,10 +234,8 @@
([s] (lookup sr/default-registry s)) ([s] (lookup sr/default-registry s))
([registry s] (schema (mr/schema registry s)))) ([registry s] (schema (mr/schema registry s))))
(declare define)
(defn fast-check! (defn fast-check!
"A fast path for checking process, assumes the ISchemaOps protocol "A fast path for checking process, assumes the ILazySchema protocol
implemented on the provided `s` schema. Sould not be used directly." implemented on the provided `s` schema. Sould not be used directly."
[s value] [s value]
(when-not ^boolean (-validate s value) (when-not ^boolean (-validate s value)
@ -217,10 +247,12 @@
::explain explain})))) ::explain explain}))))
true) true)
(declare define)
(defn check-fn (defn check-fn
"Create a predefined check function" "Create a predefined check function"
[s] [s]
(let [schema (if (satisfies? ISchemaOps s) s (define s))] (let [schema (if (lazy-schema? s) s (define s))]
(partial fast-check! schema))) (partial fast-check! schema)))
(defn check! (defn check!
@ -228,7 +260,7 @@
schema over provided data. Raises an assertion exception, should be schema over provided data. Raises an assertion exception, should be
used together with `dm/assert!` or `dm/verify!`." used together with `dm/assert!` or `dm/verify!`."
[s value] [s value]
(if (satisfies? ISchemaOps s) (if (lazy-schema? s)
(fast-check! s value) (fast-check! s value)
(do (do
(when-not ^boolean (m/validate s value default-options) (when-not ^boolean (m/validate s value default-options)
@ -242,7 +274,7 @@
(defn fast-validate! (defn fast-validate!
"A fast path for validation process, assumes the ISchemaOps protocol "A fast path for validation process, assumes the ILazySchema protocol
implemented on the provided `s` schema. Sould not be used directly." implemented on the provided `s` schema. Sould not be used directly."
([s value] (fast-validate! s value nil)) ([s value] (fast-validate! s value nil))
([s value options] ([s value options]
@ -258,14 +290,14 @@
(defn validate-fn (defn validate-fn
"Create a predefined validate function" "Create a predefined validate function"
[s] [s]
(let [schema (if (satisfies? ISchemaOps s) s (define s))] (let [schema (if (lazy-schema? s) s (define s))]
(partial fast-validate! schema))) (partial fast-validate! schema)))
(defn validate! (defn validate!
"A generic validation function for predefined schemas." "A generic validation function for predefined schemas."
([s value] (validate! s value nil)) ([s value] (validate! s value nil))
([s value options] ([s value options]
(if (satisfies? ISchemaOps s) (if (lazy-schema? s)
(fast-validate! s value options) (fast-validate! s value options)
(when-not ^boolean (m/validate s value default-options) (when-not ^boolean (m/validate s value default-options)
(let [explain (explain s value) (let [explain (explain s value)
@ -278,7 +310,7 @@
(defn conform! (defn conform!
[schema value] [schema value]
(assert (satisfies? ISchemaOps schema) "expected `schema` to satisfy ISchemaOps protocol") (assert (lazy-schema? schema) "expected `schema` to satisfy ILazySchema protocol")
(let [params (-decode schema value)] (let [params (-decode schema value)]
(fast-validate! schema params nil) (fast-validate! schema params nil)
params)) params))
@ -296,11 +328,16 @@
nil) nil)
(defn define (defn define
[s] "Create ans instance of ILazySchema"
(let [schema (delay (schema s)) [s & {:keys [transformer] :as options}]
validator (delay (validator @schema)) (let [schema (delay (schema s))
explainer (delay (explainer @schema)) validator (delay (m/validator @schema))
decoder (delay (decoder @schema default-transformer))] explainer (delay (m/explainer @schema))
options (c/merge default-options (dissoc options :transformer))
transformer (or transformer default-transformer)
decoder (delay (m/decoder @schema options transformer))
encoder (delay (m/encoder @schema options transformer))]
(reify (reify
m/AST m/AST
@ -342,7 +379,17 @@
(-form [_] (-form [_]
(m/-form @schema)) (m/-form @schema))
ISchemaOps ILazySchema
(-get-schema [_]
@schema)
(-get-validator [_]
@validator)
(-get-explainer [_]
@explainer)
(-get-encoder [_]
@encoder)
(-get-decoder [_]
@decoder)
(-validate [_ o] (-validate [_ o]
(@validator o)) (@validator o))
(-explain [_ o] (-explain [_ o]
@ -350,18 +397,6 @@
(-decode [_ o] (-decode [_ o]
(@decoder o))))) (@decoder o)))))
;; --- GENERATORS
;; FIXME: replace with sg/subseq
(defn gen-set-from-choices
[choices]
(->> tgen/nat
(tgen/fmap (fn [i]
(into #{}
(map (fn [_] (rand-nth choices)))
(range i))))))
;; --- BUILTIN SCHEMAS ;; --- BUILTIN SCHEMAS
(define! :merge (mu/-merge)) (define! :merge (mu/-merge))

View file

@ -24,16 +24,18 @@
:tempdir "/tmp/penpot-exporter" :tempdir "/tmp/penpot-exporter"
:redis-uri "redis://redis/0"}) :redis-uri "redis://redis/0"})
(def ^:private schema:config (def ^:private
[:map {:title "config"} schema:config
[:public-uri {:optional true} ::sm/uri] (sm/define
[:host {:optional true} :string] [:map {:title "config"}
[:tenant {:optional true} :string] [:public-uri {:optional true} ::sm/uri]
[:flags {:optional true} ::sm/set-of-keywords] [:host {:optional true} :string]
[:redis-uri {:optional true} :string] [:tenant {:optional true} :string]
[:tempdir {:optional true} :string] [:flags {:optional true} ::sm/set-of-keywords]
[:browser-pool-max {:optional true} :int] [:redis-uri {:optional true} :string]
[:browser-pool-min {:optional true} :int]]) [:tempdir {:optional true} :string]
[:browser-pool-max {:optional true} :int]
[:browser-pool-min {:optional true} :int]]))
(defn- parse-flags (defn- parse-flags
[config] [config]
@ -58,14 +60,15 @@
[] []
(let [env (read-env "penpot") (let [env (read-env "penpot")
env (d/without-nils env) env (d/without-nils env)
data (merge defaults env) data (merge defaults env)]
data (sm/decode schema:config data sm/default-transformer)]
(when-not (sm/validate schema:config data) (try
(println (sm/humanize-data schema:config data)) (sm/conform! schema:config data)
(process/exit -1)) (catch :default cause
(if-let [explain (some->> cause ex-data ::sm/explain)]
data)) (println (sm/humanize-explain explain))
(js/console.error cause))
(process/exit -1)))))
(def config (def config
(prepare-config)) (prepare-config))

View file

@ -9,7 +9,7 @@
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.pprint :as pp] [app.common.pprint :as pp]
[app.common.schema :as sm] [app.common.schema :as-alias sm]
[app.main.data.messages :as msg] [app.main.data.messages :as msg]
[app.main.data.modal :as modal] [app.main.data.modal :as modal]
[app.main.data.users :as du] [app.main.data.users :as du]
@ -33,9 +33,8 @@
(defn- print-explain! (defn- print-explain!
[data] [data]
(when-let [explain (::sm/explain data)] (when-let [explain (or (ex/explain data)
(js/console.log (sm/humanize-data explain))) (:explain data))]
(when-let [explain (:explain data)]
(js/console.log explain))) (js/console.log explain)))
(defn- print-trace! (defn- print-trace!