From 6f42f4ec45033f3e24839c11dd24e4c2f6ef3f65 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 19 Sep 2022 12:25:44 +0200 Subject: [PATCH] :recycle: Refactor semaphore and executors --- backend/src/app/config.clj | 19 ++-- backend/src/app/main.clj | 31 +++-- backend/src/app/metrics.clj | 20 ++-- backend/src/app/rpc.clj | 44 +++----- backend/src/app/rpc/commands/auth.clj | 6 +- backend/src/app/rpc/mutations/files.clj | 2 +- backend/src/app/rpc/mutations/fonts.clj | 19 ++-- backend/src/app/rpc/mutations/media.clj | 35 +++--- backend/src/app/rpc/mutations/profile.clj | 36 ++++-- backend/src/app/rpc/mutations/teams.clj | 27 ++--- backend/src/app/rpc/semaphore.clj | 131 +++++++++++++++++----- backend/src/app/worker.clj | 130 ++++++++++++--------- 12 files changed, 293 insertions(+), 207 deletions(-) diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 25bbe3bf0d..47bd1d86ce 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -170,12 +170,11 @@ (s/def ::redis-uri ::us/string) (s/def ::registration-domain-whitelist ::us/set-of-strings) +(s/def ::semaphore-font-process ::us/integer) +(s/def ::semaphore-file-update ::us/integer) +(s/def ::semaphore-image-process ::us/integer) +(s/def ::semaphore-authentication ::us/integer) - -(s/def ::rpc-semaphore-permits-font ::us/integer) -(s/def ::rpc-semaphore-permits-file-update ::us/integer) -(s/def ::rpc-semaphore-permits-image ::us/integer) -(s/def ::rpc-semaphore-permits-password ::us/integer) (s/def ::smtp-default-from ::us/string) (s/def ::smtp-default-reply-to ::us/string) (s/def ::smtp-host ::us/string) @@ -278,10 +277,12 @@ ::public-uri ::redis-uri ::registration-domain-whitelist - ::rpc-semaphore-permits-font - ::rpc-semaphore-permits-file-update - ::rpc-semaphore-permits-image - ::rpc-semaphore-permits-password + + ::semaphore-process-font + ::semaphore-process-image + ::semaphore-update-file + ::semaphore-auth + ::rpc-rlimit-config ::sentry-dsn ::sentry-debug diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 816c32a367..cdff9252bb 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -27,32 +27,22 @@ ;; Default thread pool for IO operations [::default :app.worker/executor] - {:parallelism (cf/get :default-executor-parallelism 60) - :prefix :default} - - ;; Constrained thread pool. Should only be used from high resources - ;; demanding operations. - [::blocking :app.worker/executor] - {:parallelism (cf/get :blocking-executor-parallelism 10) - :prefix :blocking} + {:parallelism (cf/get :default-executor-parallelism 70)} ;; Dedicated thread pool for backround tasks execution. [::worker :app.worker/executor] - {:parallelism (cf/get :worker-executor-parallelism 10) - :prefix :worker} + {:parallelism (cf/get :worker-executor-parallelism 20)} :app.worker/scheduler {:parallelism 1 :prefix :scheduler} :app.worker/executors - {:default (ig/ref [::default :app.worker/executor]) - :worker (ig/ref [::worker :app.worker/executor]) - :blocking (ig/ref [::blocking :app.worker/executor])} + {:default (ig/ref [::default :app.worker/executor]) + :worker (ig/ref [::worker :app.worker/executor])} - :app.worker/executors-monitor + :app.worker/executor-monitor {:metrics (ig/ref :app.metrics/metrics) - :scheduler (ig/ref :app.worker/scheduler) :executors (ig/ref :app.worker/executors)} :app.migrations/migrations @@ -216,6 +206,10 @@ {:pool (ig/ref :app.db/pool) :executor (ig/ref [::default :app.worker/executor])} + :app.rpc/semaphores + {:metrics (ig/ref :app.metrics/metrics) + :executor (ig/ref [::default :app.worker/executor])} + :app.rpc/rlimit {:executor (ig/ref [::worker :app.worker/executor]) :scheduler (ig/ref :app.worker/scheduler)} @@ -234,7 +228,10 @@ :http-client (ig/ref :app.http/client) :rlimit (ig/ref :app.rpc/rlimit) :executors (ig/ref :app.worker/executors) - :templates (ig/ref :app.setup/builtin-templates)} + :executor (ig/ref [::default :app.worker/executor]) + :templates (ig/ref :app.setup/builtin-templates) + :semaphores (ig/ref :app.rpc/semaphores) + } :app.rpc.doc/routes {:methods (ig/ref :app.rpc/methods)} @@ -359,7 +356,7 @@ (def worker-config - { :app.worker/cron + {:app.worker/cron {:executor (ig/ref [::worker :app.worker/executor]) :scheduler (ig/ref :app.worker/scheduler) :tasks (ig/ref :app.worker/registry) diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index 589fc5f063..ee4a67045d 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -100,23 +100,23 @@ ::mdef/labels ["name"] ::mdef/type :summary} - :rpc-semaphore-queued-submissions - {::mdef/name "penpot_rpc_semaphore_queued_submissions" - ::mdef/help "Current number of queued submissions on RPC-SEMAPHORE." + :semaphore-queued-submissions + {::mdef/name "penpot_semaphore_queued_submissions" + ::mdef/help "Current number of queued submissions on SEMAPHORE." ::mdef/labels ["name"] ::mdef/type :gauge} - :rpc-semaphore-used-permits - {::mdef/name "penpot_rpc_semaphore_used_permits" - ::mdef/help "Current number of used permits on RPC-SEMAPHORE." + :semaphore-used-permits + {::mdef/name "penpot_semaphore_used_permits" + ::mdef/help "Current number of used permits on SEMAPHORE." ::mdef/labels ["name"] ::mdef/type :gauge} - :rpc-semaphore-acquires-total - {::mdef/name "penpot_rpc_semaphore_acquires_total" - ::mdef/help "Total number of acquire operations on RPC-SEMAPHORE." + :semaphore-timing + {::mdef/name "penpot_semaphore_timing" + ::mdef/help "Total timing of SEMAPHORE." ::mdef/labels ["name"] - ::mdef/type :counter} + ::mdef/type :summary} :executors-active-threads {::mdef/name "penpot_executors_active_threads" diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 91b84211ba..485d311655 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -16,10 +16,9 @@ [app.msgbus :as-alias mbus] [app.rpc.retry :as retry] [app.rpc.rlimit :as rlimit] - [app.rpc.semaphore :as rsem] - [app.util.async :as async] + [app.rpc.semaphore :as-alias rsem] [app.util.services :as sv] - [app.worker :as wrk] + [app.util.time :as ts] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] @@ -107,38 +106,25 @@ "Wrap service method with metrics measurement." [{:keys [metrics ::metrics-id]} f mdata] (let [labels (into-array String [(::sv/name mdata)])] - (fn [cfg params] - (let [start (System/nanoTime)] + (let [tp (ts/tpoint)] (p/finally (f cfg params) (fn [_ _] (mtx/run! metrics - {:id metrics-id - :val (/ (- (System/nanoTime) start) 1000000) - :labels labels}))))))) + :id metrics-id + :val (inst-ms (tp)) + :labels labels))))))) (defn- wrap-dispatch "Wraps service method into async flow, with the ability to dispatching it to a preconfigured executor service." - [{:keys [executors] :as cfg} f mdata] - (let [dname (::async/dispatch mdata :default)] - (if (= :none dname) - (with-meta - (fn [cfg params] - (p/do (f cfg params))) - mdata) - - (let [executor (get executors dname)] - (when-not executor - (ex/raise :type :internal - :code :executor-not-configured - :hint (format "executor %s not configured" dname))) - (with-meta - (fn [cfg params] - (-> (px/submit! executor #(f cfg params)) - (p/bind p/wrap))) - mdata))))) + [{:keys [executor] :as cfg} f mdata] + (with-meta + (fn [cfg params] + (-> (px/submit! executor #(f cfg params)) + (p/bind p/wrap))) + mdata)) (defn- wrap-audit [{:keys [audit] :as cfg} f mdata] @@ -171,8 +157,8 @@ [cfg f mdata] (let [f (as-> f $ (wrap-dispatch cfg $ mdata) - (rsem/wrap cfg $ mdata) (rlimit/wrap cfg $ mdata) + (rsem/wrap cfg $ mdata) (retry/wrap-retry cfg $ mdata) (wrap-audit cfg $ mdata) (wrap-metrics cfg $ mdata) @@ -245,8 +231,6 @@ (into {})))) (s/def ::audit (s/nilable fn?)) -(s/def ::executors (s/map-of keyword? ::wrk/executor)) -(s/def ::executors map?) (s/def ::http-client fn?) (s/def ::ldap (s/nilable map?)) (s/def ::msgbus ::mbus/msgbus) @@ -260,10 +244,10 @@ ::session ::sprops ::audit - ::executors ::public-uri ::msgbus ::http-client + ::rsem/semaphores ::rlimit/rlimit ::mtx/metrics ::db/pool diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj index d052c6b203..35326d8d6c 100644 --- a/backend/src/app/rpc/commands/auth.clj +++ b/backend/src/app/rpc/commands/auth.clj @@ -136,7 +136,7 @@ (sv/defmethod ::login-with-password "Performs authentication using penpot password." {:auth false - ::rsem/permits (cf/get :rpc-semaphore-permits-password) + ::rsem/queue :auth ::doc/added "1.15"} [cfg params] (login-with-password cfg params)) @@ -177,7 +177,7 @@ (sv/defmethod ::recover-profile {:auth false - ::rsem/permits (cf/get :rpc-semaphore-permits-password) + ::rsem/queue :auth ::doc/added "1.15"} [cfg params] (recover-profile cfg params)) @@ -368,7 +368,7 @@ (sv/defmethod ::register-profile {:auth false - ::rsem/permits (cf/get :rpc-semaphore-permits-password) + ::rsem/queue :auth ::doc/added "1.15"} [{:keys [pool] :as cfg} params] (db/with-atomic [conn pool] diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index 6c6cc1b049..2bb5ba8f24 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -315,7 +315,7 @@ (contains? o :changes-with-metadata))))) (sv/defmethod ::update-file - {::rsem/permits (cf/get :rpc-semaphore-permits-file-update)} + {::rsem/queue :update-file} [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] (db/with-atomic [conn pool] (db/xact-lock! conn id) diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj index 2fa930b6e4..ec680e84da 100644 --- a/backend/src/app/rpc/mutations/fonts.clj +++ b/backend/src/app/rpc/mutations/fonts.clj @@ -10,7 +10,6 @@ [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uuid :as uuid] - [app.config :as cf] [app.db :as db] [app.media :as media] [app.rpc.doc :as-alias doc] @@ -20,8 +19,7 @@ [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s] - [promesa.core :as p] - [promesa.exec :as px])) + [promesa.core :as p])) (declare create-font-variant) @@ -42,24 +40,21 @@ ::font-id ::font-family ::font-weight ::font-style])) (sv/defmethod ::create-font-variant - {::rsem/permits (cf/get :rpc-semaphore-permits-font)} [{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}] (let [cfg (update cfg :storage media/configure-assets-storage)] (teams/check-edition-permissions! pool profile-id team-id) (create-font-variant cfg params))) (defn create-font-variant - [{:keys [storage pool executors] :as cfg} {:keys [data] :as params}] + [{:keys [storage pool executor semaphores] :as cfg} {:keys [data] :as params}] (letfn [(generate-fonts [data] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-font semaphores) (media/run {:cmd :generate-fonts :input data}))) ;; Function responsible of calculating cryptographyc hash of - ;; the provided data. Even though it uses the hight - ;; performance BLAKE2b algorithm, we prefer to schedule it - ;; to be executed on the blocking executor. + ;; the provided data. (calculate-hash [data] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-font semaphores) (sto/calculate-hash data))) (validate-data [data] @@ -110,8 +105,8 @@ (-> (generate-fonts data) (p/then validate-data) - (p/then persist-fonts (:default executors)) - (p/then insert-into-db (:default executors))))) + (p/then persist-fonts executor) + (p/then insert-into-db executor)))) ;; --- UPDATE FONT FAMILY diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj index 30df19922c..6f4bbb1314 100644 --- a/backend/src/app/rpc/mutations/media.clj +++ b/backend/src/app/rpc/mutations/media.clj @@ -23,8 +23,7 @@ [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.io :as io] - [promesa.core :as p] - [promesa.exec :as px])) + [promesa.core :as p])) (def default-max-file-size (* 1024 1024 10)) ; 10 MiB @@ -53,7 +52,6 @@ :opt-un [::id])) (sv/defmethod ::upload-file-media-object - {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [{:keys [pool] :as cfg} {:keys [profile-id file-id content] :as params}] (let [file (select-file pool file-id) cfg (update cfg :storage media/configure-assets-storage)] @@ -106,26 +104,25 @@ ;; inverse, soft referential integrity). (defn create-file-media-object - [{:keys [storage pool executors] :as cfg} {:keys [id file-id is-local name content] :as params}] + [{:keys [storage pool semaphores] :as cfg} + {:keys [id file-id is-local name content] :as params}] (letfn [;; Function responsible to retrieve the file information, as ;; it is synchronous operation it should be wrapped into ;; with-dispatch macro. (get-info [content] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-image semaphores) (media/run {:cmd :info :input content}))) ;; Function responsible of calculating cryptographyc hash of - ;; the provided data. Even though it uses the hight - ;; performance BLAKE2b algorithm, we prefer to schedule it - ;; to be executed on the blocking executor. + ;; the provided data. (calculate-hash [data] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-image semaphores) (sto/calculate-hash data))) ;; Function responsible of generating thumnail. As it is synchronous ;; opetation, it should be wrapped into with-dispatch macro (generate-thumbnail [info] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-image semaphores) (media/run (assoc thumbnail-options :cmd :generic-thumbnail :input info)))) @@ -157,15 +154,14 @@ :bucket "file-media-object"}))) (insert-into-database [info image thumb] - (px/with-dispatch (:default executors) - (db/exec-one! pool [sql:create-file-media-object - (or id (uuid/next)) - file-id is-local name - (:id image) - (:id thumb) - (:width info) - (:height info) - (:mtype info)])))] + (db/exec-one! pool [sql:create-file-media-object + (or id (uuid/next)) + file-id is-local name + (:id image) + (:id thumb) + (:width info) + (:height info) + (:mtype info)]))] (p/let [info (get-info content) thumb (create-thumbnail info) @@ -181,7 +177,6 @@ :opt-un [::id ::name])) (sv/defmethod ::create-file-media-object-from-url - {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] (let [file (select-file pool file-id) cfg (update cfg :storage media/configure-assets-storage)] diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj index a72952be92..95b607471e 100644 --- a/backend/src/app/rpc/mutations/profile.clj +++ b/backend/src/app/rpc/mutations/profile.clj @@ -15,6 +15,7 @@ [app.loggers.audit :as audit] [app.media :as media] [app.rpc.commands.auth :as cmd.auth] + [app.rpc.doc :as-alias doc] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] [app.rpc.semaphore :as rsem] @@ -87,7 +88,7 @@ (s/keys :req-un [::profile-id ::password ::old-password])) (sv/defmethod ::update-profile-password - {::rsem/permits (cf/get :rpc-semaphore-permits-password)} + {::rsem/queue :auth} [{:keys [pool] :as cfg} {:keys [password] :as params}] (db/with-atomic [conn pool] (let [profile (validate-password! conn params) @@ -130,7 +131,6 @@ (s/keys :req-un [::profile-id ::file])) (sv/defmethod ::update-profile-photo - {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [cfg {:keys [file] :as params}] ;; Validate incoming mime type (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) @@ -138,8 +138,8 @@ (update-profile-photo cfg params))) (defn update-profile-photo - [{:keys [pool storage executors] :as cfg} {:keys [profile-id] :as params}] - (p/let [profile (px/with-dispatch (:default executors) + [{:keys [pool storage executor] :as cfg} {:keys [profile-id] :as params}] + (p/let [profile (px/with-dispatch executor (db/get-by-id pool :profile profile-id)) photo (teams/upload-photo cfg params)] @@ -305,7 +305,10 @@ (s/def ::login ::cmd.auth/login-with-password) (sv/defmethod ::login - {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)} + {:auth false + ::rsem/queue :auth + ::doc/added "1.0" + ::doc/deprecated "1.15"} [cfg params] (cmd.auth/login-with-password cfg params)) @@ -313,7 +316,10 @@ (s/def ::logout ::cmd.auth/logout) -(sv/defmethod ::logout {:auth false} +(sv/defmethod ::logout + {:auth false + ::doc/added "1.0" + ::doc/deprecated "1.15"} [{:keys [session] :as cfg} _] (with-meta {} {:transform-response (:delete session)})) @@ -323,7 +329,8 @@ (s/def ::recover-profile ::cmd.auth/recover-profile) (sv/defmethod ::recover-profile - {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)} + {::doc/added "1.0" + ::doc/deprecated "1.15"} [cfg params] (cmd.auth/recover-profile cfg params)) @@ -331,7 +338,10 @@ (s/def ::prepare-register-profile ::cmd.auth/prepare-register-profile) -(sv/defmethod ::prepare-register-profile {:auth false} +(sv/defmethod ::prepare-register-profile + {:auth false + ::doc/added "1.0" + ::doc/deprecated "1.15"} [cfg params] (cmd.auth/prepare-register cfg params)) @@ -340,7 +350,10 @@ (s/def ::register-profile ::cmd.auth/register-profile) (sv/defmethod ::register-profile - {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)} + {:auth false + ::rsem/queue :auth + ::doc/added "1.0" + ::doc/deprecated "1.15"} [{:keys [pool] :as cfg} params] (db/with-atomic [conn pool] (-> (assoc cfg :conn conn) @@ -350,6 +363,9 @@ (s/def ::request-profile-recovery ::cmd.auth/request-profile-recovery) -(sv/defmethod ::request-profile-recovery {:auth false} +(sv/defmethod ::request-profile-recovery + {:auth false + ::doc/added "1.0" + ::doc/deprecated "1.15"} [cfg params] (cmd.auth/request-profile-recovery cfg params)) diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj index 8e9c1d2c53..edcd93f0e2 100644 --- a/backend/src/app/rpc/mutations/teams.clj +++ b/backend/src/app/rpc/mutations/teams.clj @@ -290,7 +290,6 @@ (s/keys :req-un [::profile-id ::team-id ::file])) (sv/defmethod ::update-team-photo - {::rsem/permits (cf/get :rpc-semaphore-permits-image)} [cfg {:keys [file] :as params}] ;; Validate incoming mime type (media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"}) @@ -298,8 +297,8 @@ (update-team-photo cfg params))) (defn update-team-photo - [{:keys [pool storage executors] :as cfg} {:keys [profile-id team-id] :as params}] - (p/let [team (px/with-dispatch (:default executors) + [{:keys [pool storage executor] :as cfg} {:keys [profile-id team-id] :as params}] + (p/let [team (px/with-dispatch executor (teams/retrieve-team pool profile-id team-id)) photo (upload-photo cfg params)] @@ -316,13 +315,13 @@ (assoc team :photo-id (:id photo)))) (defn upload-photo - [{:keys [storage executors] :as cfg} {:keys [file]}] + [{:keys [storage semaphores] :as cfg} {:keys [file]}] (letfn [(get-info [content] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-image semaphores) (media/run {:cmd :info :input content}))) (generate-thumbnail [info] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-image semaphores) (media/run {:cmd :profile-thumbnail :format :jpeg :quality 85 @@ -331,11 +330,9 @@ :input info}))) ;; Function responsible of calculating cryptographyc hash of - ;; the provided data. Even though it uses the hight - ;; performance BLAKE2b algorithm, we prefer to schedule it - ;; to be executed on the blocking executor. + ;; the provided data. (calculate-hash [data] - (px/with-dispatch (:blocking executors) + (rsem/with-dispatch (:process-image semaphores) (sto/calculate-hash data)))] (p/let [info (get-info file) @@ -343,11 +340,11 @@ hash (calculate-hash (:data thumb)) content (-> (sto/content (:data thumb) (:size thumb)) (sto/wrap-with-hash hash))] - (sto/put-object! storage {::sto/content content - ::sto/deduplicate? true - :bucket "profile" - :content-type (:mtype thumb)})))) - + (rsem/with-dispatch (:process-image semaphores) + (sto/put-object! storage {::sto/content content + ::sto/deduplicate? true + :bucket "profile" + :content-type (:mtype thumb)}))))) ;; --- Mutation: Invite Member diff --git a/backend/src/app/rpc/semaphore.clj b/backend/src/app/rpc/semaphore.clj index 45f90839d5..49760321e5 100644 --- a/backend/src/app/rpc/semaphore.clj +++ b/backend/src/app/rpc/semaphore.clj @@ -9,23 +9,38 @@ (:require [app.common.data :as d] [app.common.logging :as l] + [app.common.spec :as us] + [app.config :as cf] [app.metrics :as mtx] + [app.rpc :as-alias rpc] [app.util.locks :as locks] - [app.util.services :as-alias sv] + [app.util.time :as ts] + [app.worker :as-alias wrk] + [clojure.spec.alpha :as s] + [integrant.core :as ig] [promesa.core :as p])) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; ASYNC SEMAPHORE IMPL +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + (defprotocol IAsyncSemaphore (acquire! [_]) - (release! [_])) + (release! [_ tp])) (defn create - [& {:keys [permits metrics name]}] - (let [name (d/name name) - used (volatile! 0) - queue (volatile! (d/queue)) - labels (into-array String [name]) - lock (locks/create)] + [& {:keys [permits metrics name executor]}] + (let [used (volatile! 0) + queue (volatile! (d/queue)) + labels (into-array String [(d/name name)]) + lock (locks/create) + permits (or permits Long/MAX_VALUE)] + (when (>= permits Long/MAX_VALUE) + (l/warn :hint "permits value too hight" :permits permits :semaphore name)) + + ^{::wrk/executor executor + ::name name} (reify IAsyncSemaphore (acquire! [_] (let [d (p/deferred)] @@ -36,12 +51,17 @@ (p/resolve! d)) (vswap! queue conj d))) - (mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels }) - (mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels}) - (mtx/run! metrics {:id :rpc-semaphore-acquires-total :inc 1 :labels labels}) + (mtx/run! metrics + :id :semaphore-used-permits + :val @used + :labels labels) + (mtx/run! metrics + :id :semaphore-queued-submissions + :val (count @queue) + :labels labels) d)) - (release! [_] + (release! [_ tp] (locks/locking lock (if-let [item (peek @queue)] (do @@ -50,19 +70,80 @@ (when (pos? @used) (vswap! used dec)))) - (mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels}) - (mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels}))))) + (mtx/run! metrics + :id :semaphore-timing + :val (inst-ms (tp)) + :labels labels) + (mtx/run! metrics + :id :semaphore-used-permits + :val @used + :labels labels) + (mtx/run! metrics + :id :semaphore-queued-submissions + :val (count @queue) + :labels labels))))) + +(defn semaphore? + [v] + (satisfies? IAsyncSemaphore v)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; PREDEFINED SEMAPHORES +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(s/def ::semaphore semaphore?) +(s/def ::semaphores + (s/map-of ::us/keyword ::semaphore)) + +(defmethod ig/pre-init-spec ::rpc/semaphores [_] + (s/keys :req-un [::mtx/metrics])) + +(defn- create-default-semaphores + [metrics executor] + [(create :permits (cf/get :semaphore-process-font) + :metrics metrics + :name :process-font + :executor executor) + (create :permits (cf/get :semaphore-update-file) + :metrics metrics + :name :update-file + :executor executor) + (create :permits (cf/get :semaphore-process-image) + :metrics metrics + :name :process-image + :executor executor) + (create :permits (cf/get :semaphore-auth) + :metrics metrics + :name :auth + :executor executor)]) + +(defmethod ig/init-key ::rpc/semaphores + [_ {:keys [metrics executor]}] + (->> (create-default-semaphores metrics executor) + (d/index-by (comp ::name meta)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; PUBLIC API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defmacro with-dispatch + [queue & body] + `(let [tpoint# (ts/tpoint) + queue# ~queue + executor# (-> queue# meta ::wrk/executor)] + (-> (acquire! queue#) + (p/then (fn [_#] ~@body) executor#) + (p/finally (fn [_# _#] + (release! queue# tpoint#)))))) (defn wrap - [{:keys [metrics executors] :as cfg} f mdata] - (if-let [permits (::permits mdata)] - (let [sem (create {:permits permits - :metrics metrics - :name (::sv/name mdata)})] - (l/debug :hint "wrapping semaphore" :handler (::sv/name mdata) :permits permits) + [{:keys [semaphores]} f {:keys [::queue]}] + (let [queue' (get semaphores queue)] + (if (semaphore? queue') (fn [cfg params] - (-> (acquire! sem) - (p/then (fn [_] (f cfg params)) (:default executors)) - (p/finally (fn [_ _] (release! sem)))))) - f)) - + (with-dispatch queue' + (f cfg params))) + (do + (when (some? queue) + (l/warn :hint "undefined semaphore" :name queue)) + f)))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index c7259b24a2..52dd41ab78 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -44,20 +44,17 @@ (declare ^:private get-fj-thread-factory) (declare ^:private get-thread-factory) -(s/def ::prefix keyword?) (s/def ::parallelism ::us/integer) -(s/def ::idle-timeout ::us/integer) (defmethod ig/pre-init-spec ::executor [_] - (s/keys :req-un [::prefix] - :opt-un [::parallelism])) + (s/keys :opt-un [::parallelism])) (defmethod ig/init-key ::executor - [_ {:keys [parallelism prefix]}] - (let [counter (AtomicLong. 0)] + [skey {:keys [parallelism]}] + (let [prefix (if (vector? skey) (-> skey first name keyword) :default)] (if parallelism - (ForkJoinPool. (int parallelism) (get-fj-thread-factory prefix counter) nil false) - (Executors/newCachedThreadPool (get-thread-factory prefix counter))))) + (ForkJoinPool. (int parallelism) (get-fj-thread-factory prefix) nil false) + (Executors/newCachedThreadPool (get-thread-factory prefix))))) (defmethod ig/halt-key! ::executor [_ instance] @@ -69,8 +66,7 @@ (defmethod ig/init-key ::scheduler [_ {:keys [parallelism prefix] :or {parallelism 1}}] - (let [counter (AtomicLong. 0)] - (px/scheduled-pool parallelism (get-thread-factory prefix counter)))) + (px/scheduled-pool parallelism (get-thread-factory prefix))) (defmethod ig/halt-key! ::scheduler [_ instance] @@ -78,66 +74,90 @@ (defn- get-fj-thread-factory ^ForkJoinPool$ForkJoinWorkerThreadFactory - [prefix counter] - (reify ForkJoinPool$ForkJoinWorkerThreadFactory - (newThread [_ pool] - (let [^ForkJoinWorkerThread thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool) - ^String thread-name (str "penpot/" (name prefix) "-" (.getAndIncrement ^AtomicLong counter))] - (.setName thread thread-name) - thread)))) + [prefix] + (let [^AtomicLong counter (AtomicLong. 0)] + (reify ForkJoinPool$ForkJoinWorkerThreadFactory + (newThread [_ pool] + (let [thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool) + tname (str "penpot/" (name prefix) "-" (.getAndIncrement counter))] + (.setName ^ForkJoinWorkerThread thread ^String tname) + thread))))) (defn- get-thread-factory ^ThreadFactory - [prefix counter] - (reify ThreadFactory - (newThread [_ runnable] - (doto (Thread. runnable) - (.setDaemon true) - (.setName (str "penpot/" (name prefix) "-" (.getAndIncrement ^AtomicLong counter))))))) + [prefix] + (let [^AtomicLong counter (AtomicLong. 0)] + (reify ThreadFactory + (newThread [_ runnable] + (doto (Thread. runnable) + (.setDaemon true) + (.setName (str "penpot/" (name prefix) "-" (.getAndIncrement counter)))))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Executor Monitor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::executors (s/map-of keyword? ::executor)) +(s/def ::executors + (s/map-of keyword? ::executor)) -(defmethod ig/pre-init-spec ::executors-monitor [_] - (s/keys :req-un [::executors ::scheduler ::mtx/metrics])) +(defmethod ig/pre-init-spec ::executor-monitor [_] + (s/keys :req-un [::executors ::mtx/metrics])) -(defmethod ig/init-key ::executors-monitor - [_ {:keys [executors metrics interval scheduler] :or {interval 3000}}] - (letfn [(log-stats [state] - (doseq [[key ^ForkJoinPool executor] executors] - (let [labels (into-array String [(name key)]) - running (.getRunningThreadCount executor) - queued (.getQueuedSubmissionCount executor) - active (.getPoolSize executor) - steals (.getStealCount executor) - steals-increment (- steals (or (get-in @state [key :steals]) 0)) - steals-increment (if (neg? steals-increment) 0 steals-increment)] +(defmethod ig/init-key ::executor-monitor + [_ {:keys [executors metrics interval] :or {interval 3000}}] + (letfn [(monitor! [state skey ^ForkJoinPool executor] + (let [prev-steals (get state skey 0) + running (.getRunningThreadCount executor) + queued (.getQueuedSubmissionCount executor) + active (.getPoolSize executor) + steals (.getStealCount executor) + labels (into-array String [(name skey)]) - (mtx/run! metrics {:id :executors-active-threads :labels labels :val active}) - (mtx/run! metrics {:id :executors-running-threads :labels labels :val running}) - (mtx/run! metrics {:id :executors-queued-submissions :labels labels :val queued}) - (mtx/run! metrics {:id :executors-completed-tasks :labels labels :inc steals-increment}) + steals-increment (- steals prev-steals) + steals-increment (if (neg? steals-increment) 0 steals-increment)] - (swap! state update key assoc - :running running - :active active - :queued queued - :steals steals))) + (mtx/run! metrics + :id :executor-active-threads + :labels labels + :val active) + (mtx/run! metrics + :id :executor-running-threads + :labels labels :val running) + (mtx/run! metrics + :id :executors-queued-submissions + :labels labels + :val queued) + (mtx/run! metrics + :id :executors-completed-tasks + :labels labels + :inc steals-increment) - (when (and (not (.isShutdown scheduler)) - (not (:shutdown @state))) - (px/schedule! scheduler interval (partial log-stats state))))] + (aa/thread-sleep interval) + (if (.isShutdown executor) + (l/debug :hint "stoping monitor; cause: executor is shutdown") + (assoc state skey steals)))) - (let [state (atom {})] - (px/schedule! scheduler interval (partial log-stats state)) - {:state state}))) + (monitor-fn [] + (try + (loop [items (into (d/queue) executors) + state {}] + (when-let [[skey executor :as item] (peek items)] + (if-let [state (monitor! state skey executor)] + (recur (conj items item) state) + (recur items state)))) + (catch InterruptedException _cause + (l/debug :hint "stoping monitor; interrupted"))))] -(defmethod ig/halt-key! ::executors-monitor - [_ {:keys [state]}] - (swap! state assoc :shutdown true)) + (let [thread (Thread. monitor-fn)] + (.setDaemon thread true) + (.setName thread "penpot/executor-monitor") + (.start thread) + + thread))) + +(defmethod ig/halt-key! ::executor-monitor + [_ thread] + (.interrupt ^Thread thread)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Worker