From 0926fbcbc6b0cda3e0d5e468558e71b8006e716c Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 30 Mar 2021 14:55:19 +0200 Subject: [PATCH] :recycle: Minor code reorganization. Improves modularity and reusability and allows usage of backend code as a library. --- backend/dev/user.clj | 2 +- backend/src/app/cli/fixtures.clj | 3 +- backend/src/app/cli/manage.clj | 5 +- backend/src/app/cli/migrate_media.clj | 10 +- backend/src/app/config.clj | 56 +- backend/src/app/db.clj | 11 +- backend/src/app/emails.clj | 96 +++- backend/src/app/http.clj | 85 +-- backend/src/app/http/feedback.clj | 17 +- backend/src/app/http/oauth/github.clj | 5 +- backend/src/app/loggers/mattermost.clj | 2 +- backend/src/app/main.clj | 568 ++++++++++----------- backend/src/app/rpc/mutations/demo.clj | 13 +- backend/src/app/rpc/mutations/files.clj | 12 +- backend/src/app/rpc/mutations/profile.clj | 62 ++- backend/src/app/rpc/mutations/projects.clj | 10 +- backend/src/app/rpc/mutations/teams.clj | 34 +- backend/src/app/tasks.clj | 110 ---- backend/src/app/tasks/sendmail.clj | 2 +- backend/src/app/tasks/telemetry.clj | 25 +- backend/src/app/telemetry.clj | 121 ----- backend/src/app/util/blob.clj | 24 +- backend/src/app/util/time.clj | 1 - backend/src/app/worker.clj | 196 +++++-- backend/tests/app/tests/helpers.clj | 20 +- common/app/common/data.cljc | 3 + common/app/common/exceptions.cljc | 2 +- 27 files changed, 704 insertions(+), 791 deletions(-) delete mode 100644 backend/src/app/tasks.clj delete mode 100644 backend/src/app/telemetry.clj diff --git a/backend/dev/user.clj b/backend/dev/user.clj index d7ee62abf..b6816c3ad 100644 --- a/backend/dev/user.clj +++ b/backend/dev/user.clj @@ -70,7 +70,7 @@ [] (alter-var-root #'system (fn [sys] (when sys (ig/halt! sys)) - (-> (main/build-system-config cfg/config) + (-> main/system-config (ig/prep) (ig/init)))) :started) diff --git a/backend/src/app/cli/fixtures.clj b/backend/src/app/cli/fixtures.clj index 414ea90ae..710d3f1fe 100644 --- a/backend/src/app/cli/fixtures.clj +++ b/backend/src/app/cli/fixtures.clj @@ -12,7 +12,6 @@ (:require [app.common.pages :as cp] [app.common.uuid :as uuid] - [app.config :as cfg] [app.db :as db] [app.main :as main] [app.rpc.mutations.profile :as profile] @@ -233,7 +232,7 @@ (defn run [{:keys [preset] :or {preset :small}}] - (let [config (select-keys (main/build-system-config cfg/config) + (let [config (select-keys main/system-config [:app.db/pool :app.telemetry/migrations :app.migrations/migrations diff --git a/backend/src/app/cli/manage.clj b/backend/src/app/cli/manage.clj index 36093375e..04216fc22 100644 --- a/backend/src/app/cli/manage.clj +++ b/backend/src/app/cli/manage.clj @@ -5,12 +5,11 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.cli.manage "A manage cli api." (:require - [app.config :as cfg] [app.db :as db] [app.main :as main] [app.rpc.mutations.profile :as profile] @@ -26,7 +25,7 @@ (defn init-system [] - (let [data (-> (main/build-system-config cfg/config) + (let [data (-> main/system-config (select-keys [:app.db/pool :app.metrics/metrics]) (assoc :app.migrations/all {}))] (-> data ig/prep ig/init))) diff --git a/backend/src/app/cli/migrate_media.clj b/backend/src/app/cli/migrate_media.clj index 4afaf79b2..6a0531126 100644 --- a/backend/src/app/cli/migrate_media.clj +++ b/backend/src/app/cli/migrate_media.clj @@ -10,7 +10,7 @@ (ns app.cli.migrate-media (:require [app.common.media :as cm] - [app.config :as cfg] + [app.config :as cf] [app.db :as db] [app.main :as main] [app.storage :as sto] @@ -34,7 +34,7 @@ (defn run [] - (let [config (select-keys (main/build-system-config cfg/config) + (let [config (select-keys main/system-config [:app.db/pool :app.migrations/migrations :app.metrics/metrics @@ -60,7 +60,7 @@ (->> (db/exec! conn ["select * from profile"]) (filter #(not (str/empty? (:photo %)))) (seq)))] - (let [base (fs/path (:storage-fs-old-directory cfg/config)) + (let [base (fs/path (cf/get :storage-fs-old-directory)) storage (-> (:app.storage/storage system) (assoc :conn conn))] (doseq [profile (retrieve-profiles conn)] @@ -81,7 +81,7 @@ (->> (db/exec! conn ["select * from team"]) (filter #(not (str/empty? (:photo %)))) (seq)))] - (let [base (fs/path (:storage-fs-old-directory cfg/config)) + (let [base (fs/path (cf/get :storage-fs-old-directory)) storage (-> (:app.storage/storage system) (assoc :conn conn))] (doseq [team (retrieve-teams conn)] @@ -105,7 +105,7 @@ from file_media_object as fmo join file_media_thumbnail as fth on (fth.media_object_id = fmo.id)"]) (seq)))] - (let [base (fs/path (:storage-fs-old-directory cfg/config)) + (let [base (fs/path (cf/get :storage-fs-old-directory)) storage (-> (:app.storage/storage system) (assoc :conn conn))] (doseq [mobj (retrieve-media-objects conn)] diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index aff4989a0..a48878a81 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -5,7 +5,7 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.config "A configuration management." @@ -15,10 +15,19 @@ [app.common.version :as v] [app.util.time :as dt] [clojure.core :as c] + [clojure.pprint :as pprint] [clojure.spec.alpha :as s] [cuerdas.core :as str] [environ.core :refer [env]])) +(prefer-method print-method + clojure.lang.IRecord + clojure.lang.IDeref) + +(prefer-method pprint/simple-dispatch + clojure.lang.IPersistentMap + clojure.lang.IDeref) + (def defaults {:http-server-port 6060 :host "devenv" @@ -221,39 +230,31 @@ ::telemetry-server-enabled ::telemetry-server-port ::telemetry-uri + ::telemetry-referer ::telemetry-with-taiga ::tenant])) -(defn- env->config - [env] - (reduce-kv - (fn [acc k v] - (cond-> acc - (str/starts-with? (name k) "penpot-") - (assoc (keyword (subs (name k) 7)) v) +(defn read-env + [prefix] + (let [prefix (str prefix "-") + len (count prefix)] + (reduce-kv + (fn [acc k v] + (cond-> acc + (str/starts-with? (name k) prefix) + (assoc (keyword (subs (name k) len)) v))) + {} + env))) - (str/starts-with? (name k) "app-") - (assoc (keyword (subs (name k) 4)) v))) - {} - env)) (defn- read-config - [env] - (->> (env->config env) + [] + (->> (read-env "penpot") (merge defaults) (us/conform ::config))) -(defn- read-test-config - [env] - (merge {:redis-uri "redis://redis/1" - :database-uri "postgresql://postgres/penpot_test" - :storage-fs-directory "/tmp/app/storage" - :migrations-verbose false} - (read-config env))) - (def version (v/parse "%version%")) -(def config (read-config env)) -(def test-config (read-test-config env)) +(def config (atom (read-config))) (def deletion-delay (dt/duration {:days 7})) @@ -261,6 +262,9 @@ (defn get "A configuration getter. Helps code be more testable." ([key] - (c/get config key)) + (c/get @config key)) ([key default] - (c/get config key default))) + (c/get @config key default))) + +;; Set value for all new threads bindings. +(alter-var-root #'*assert* (constantly (get :asserts-enabled))) diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index 7791190c2..29a5627f8 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -9,6 +9,7 @@ (ns app.db (:require + [app.common.data :as d] [app.common.exceptions :as ex] [app.common.geom.point :as gpt] [app.common.spec :as us] @@ -48,8 +49,8 @@ (declare instrument-jdbc!) +(s/def ::name keyword?) (s/def ::uri ::us/not-empty-string) -(s/def ::name ::us/not-empty-string) (s/def ::min-pool-size ::us/integer) (s/def ::max-pool-size ::us/integer) (s/def ::migrations map?) @@ -59,14 +60,14 @@ (defmethod ig/init-key ::pool [_ {:keys [migrations metrics] :as cfg}] - (log/infof "initialize connection pool '%s' with uri '%s'" (:name cfg) (:uri cfg)) + (log/infof "initialize connection pool '%s' with uri '%s'" (name (:name cfg)) (:uri cfg)) (instrument-jdbc! (:registry metrics)) (let [pool (create-pool cfg)] (when (seq migrations) (with-open [conn ^AutoCloseable (open pool)] (mg/setup! conn) - (doseq [[mname steps] migrations] - (mg/migrate! conn {:name (name mname) :steps steps})))) + (doseq [[name steps] migrations] + (mg/migrate! conn {:name (d/name name) :steps steps})))) pool)) (defmethod ig/halt-key! ::pool @@ -100,7 +101,7 @@ mtf (PrometheusMetricsTrackerFactory. (:registry metrics))] (doto config (.setJdbcUrl (str "jdbc:" dburi)) - (.setPoolName (:name cfg "default")) + (.setPoolName (d/name (:name cfg))) (.setAutoCommit true) (.setReadOnly false) (.setConnectionTimeout 8000) ;; 8seg diff --git a/backend/src/app/emails.clj b/backend/src/app/emails.clj index 74fbaf84b..dd0fcf4e1 100644 --- a/backend/src/app/emails.clj +++ b/backend/src/app/emails.clj @@ -5,7 +5,7 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.emails "Main api for send emails." @@ -14,36 +14,34 @@ [app.config :as cfg] [app.db :as db] [app.db.sql :as sql] - [app.tasks :as tasks] [app.util.emails :as emails] - [clojure.spec.alpha :as s])) + [app.worker :as wrk] + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] + [integrant.core :as ig])) -;; --- Defaults - -(defn default-context - [] - {:assets-uri (:assets-uri cfg/config) - :public-uri (:public-uri cfg/config)}) - -;; --- Public API +;; --- PUBLIC API (defn render [email-factory context] (email-factory context)) (defn send! "Schedule the email for sending." - [conn email-factory context] - (us/verify fn? email-factory) - (us/verify map? context) - (let [email (email-factory context)] - (tasks/submit! conn {:name "sendmail" - :delay 0 - :max-retries 1 - :priority 200 - :props email}))) + [{:keys [::conn ::factory] :as context}] + (us/verify fn? factory) + (us/verify some? conn) + (let [email (factory context)] + (wrk/submit! (assoc email + ::wrk/task :sendmail + ::wrk/delay 0 + ::wrk/max-retries 1 + ::wrk/priority 200 + ::wrk/conn conn)))) +;; --- BOUNCE/COMPLAINS HANDLING + (def sql:profile-complaint-report "select (select count(*) from profile_complaint_report @@ -91,7 +89,7 @@ (>= (count reports) threshold)))) -;; --- Emails +;; --- EMAIL FACTORIES (s/def ::subject ::us/string) (s/def ::content ::us/string) @@ -101,7 +99,7 @@ (def feedback "A profile feedback email." - (emails/template-factory ::feedback default-context)) + (emails/template-factory ::feedback)) (s/def ::name ::us/string) (s/def ::register @@ -109,7 +107,7 @@ (def register "A new profile registration welcome email." - (emails/template-factory ::register default-context)) + (emails/template-factory ::register)) (s/def ::token ::us/string) (s/def ::password-recovery @@ -117,7 +115,7 @@ (def password-recovery "A password recovery notification email." - (emails/template-factory ::password-recovery default-context)) + (emails/template-factory ::password-recovery)) (s/def ::pending-email ::us/email) (s/def ::change-email @@ -125,7 +123,7 @@ (def change-email "Password change confirmation email" - (emails/template-factory ::change-email default-context)) + (emails/template-factory ::change-email)) (s/def :internal.emails.invite-to-team/invited-by ::us/string) (s/def :internal.emails.invite-to-team/team ::us/string) @@ -138,4 +136,50 @@ (def invite-to-team "Teams member invitation email." - (emails/template-factory ::invite-to-team default-context)) + (emails/template-factory ::invite-to-team)) + + +;; --- SENDMAIL TASK + +(declare send-console!) + +(s/def ::username ::cfg/smtp-username) +(s/def ::password ::cfg/smtp-password) +(s/def ::tls ::cfg/smtp-tls) +(s/def ::ssl ::cfg/smtp-ssl) +(s/def ::host ::cfg/smtp-host) +(s/def ::port ::cfg/smtp-port) +(s/def ::default-reply-to ::cfg/smtp-default-reply-to) +(s/def ::default-from ::cfg/smtp-default-from) +(s/def ::enabled ::cfg/smtp-enabled) + +(defmethod ig/pre-init-spec ::sendmail-handler [_] + (s/keys :req-un [::enabled] + :opt-un [::username + ::password + ::tls + ::ssl + ::host + ::port + ::default-from + ::default-reply-to])) + +(defmethod ig/init-key ::sendmail-handler + [_ cfg] + (fn [{:keys [props] :as task}] + (if (:enabled cfg) + (emails/send! cfg props) + (send-console! cfg props)))) + +(defn- send-console! + [cfg email] + (let [baos (java.io.ByteArrayOutputStream.) + mesg (emails/smtp-message cfg email)] + (.writeTo mesg baos) + (let [out (with-out-str + (println "email console dump:") + (println "******** start email" (:id email) "**********") + (println (.toString baos)) + (println "******** end email "(:id email) "**********"))] + (log/info out)))) + diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 65b641ea6..ae7a43df7 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -5,13 +5,13 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.http (:require [app.common.data :as d] + [app.common.exceptions :as ex] [app.common.spec :as us] - [app.config :as cfg] [app.http.errors :as errors] [app.http.middleware :as middleware] [app.metrics :as mtx] @@ -26,22 +26,24 @@ org.eclipse.jetty.server.handler.ErrorHandler org.eclipse.jetty.server.handler.StatisticsHandler)) +(declare router-handler) + (s/def ::handler fn?) +(s/def ::router some?) (s/def ::ws (s/map-of ::us/string fn?)) -(s/def ::port ::cfg/http-server-port) +(s/def ::port ::us/integer) (s/def ::name ::us/string) (defmethod ig/pre-init-spec ::server [_] - (s/keys :req-un [::handler ::port] - :opt-un [::ws ::name ::mtx/metrics])) + (s/keys :req-un [::port] + :opt-un [::ws ::name ::mtx/metrics ::router ::handler])) (defmethod ig/prep-key ::server [_ cfg] - (merge {:name "http"} - (d/without-nils cfg))) + (merge {:name "http"} (d/without-nils cfg))) (defmethod ig/init-key ::server - [_ {:keys [handler ws port name metrics] :as opts}] + [_ {:keys [handler router ws port name metrics] :as opts}] (log/infof "starting '%s' server on port %s." name port) (let [pre-start (fn [^Server server] (let [handler (doto (ErrorHandler.) @@ -49,7 +51,7 @@ (.setServer server))] (.setErrorHandler server ^ErrorHandler handler) (when metrics - (let [stats (new StatisticsHandler)] + (let [stats (StatisticsHandler.)] (.setHandler ^StatisticsHandler stats (.getHandler server)) (.setHandler server stats) (mtx/instrument-jetty! (:registry metrics) stats))))) @@ -63,6 +65,13 @@ (when (seq ws) {:websockets ws})) + handler (cond + (fn? handler) handler + (some? router) (router-handler router) + :else (ex/raise :type :internal + :code :invalid-argument + :hint "Missing `handler` or `router` option.")) + server (jetty/run-jetty handler options)] (assoc opts :server server))) @@ -71,31 +80,13 @@ (log/infof "stoping '%s' server on port %s." name port) (jetty/stop-server server)) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Http Main Handler (Router) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(declare create-router) - -(s/def ::rpc map?) -(s/def ::session map?) -(s/def ::metrics map?) -(s/def ::oauth map?) -(s/def ::storage map?) -(s/def ::assets map?) -(s/def ::feedback fn?) - -(defmethod ig/pre-init-spec ::router [_] - (s/keys :req-un [::rpc ::session ::metrics ::oauth ::storage ::assets ::feedback])) - -(defmethod ig/init-key ::router - [_ cfg] - (let [handler (rr/ring-handler - (create-router cfg) - (rr/routes - (rr/create-resource-handler {:path "/"}) - (rr/create-default-handler)) - {:middleware [middleware/server-timing]})] +(defn- router-handler + [router] + (let [handler (rr/ring-handler router + (rr/routes + (rr/create-resource-handler {:path "/"}) + (rr/create-default-handler)) + {:middleware [middleware/server-timing]})] (fn [request] (try (handler request) @@ -104,18 +95,30 @@ (let [cdata (errors/get-error-context request e)] (update-thread-context! cdata) (log/errorf e "unhandled exception: %s (id: %s)" (ex-message e) (str (:id cdata))) - {:status 500 - :body "internal server error"}) + {:status 500 :body "internal server error"}) (catch Throwable e (log/errorf e "unhandled exception: %s" (ex-message e)) - {:status 500 - :body "internal server error"}))))))) + {:status 500 :body "internal server error"}))))))) -(defn- create-router - [{:keys [session rpc oauth metrics svgparse assets feedback] :as cfg}] + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Http Main Handler (Router) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(s/def ::rpc map?) +(s/def ::session map?) +(s/def ::oauth map?) +(s/def ::storage map?) +(s/def ::assets map?) +(s/def ::feedback fn?) + +(defmethod ig/pre-init-spec ::router [_] + (s/keys :req-un [::rpc ::session ::mtx/metrics ::oauth ::storage ::assets ::feedback])) + +(defmethod ig/init-key ::router + [_ {:keys [session rpc oauth metrics svgparse assets feedback] :as cfg}] (rr/router [["/metrics" {:get (:handler metrics)}] - ["/assets" {:middleware [[middleware/format-response-body] [middleware/errors errors/handle]]} ["/by-id/:id" {:get (:objects-handler assets)}] diff --git a/backend/src/app/http/feedback.clj b/backend/src/app/http/feedback.clj index 0d3fa4959..1ab206435 100644 --- a/backend/src/app/http/feedback.clj +++ b/backend/src/app/http/feedback.clj @@ -15,7 +15,7 @@ [app.common.spec :as us] [app.config :as cfg] [app.db :as db] - [app.emails :as emails] + [app.emails :as eml] [app.rpc.queries.profile :as profile] [clojure.spec.alpha :as s] [integrant.core :as ig])) @@ -62,11 +62,12 @@ [pool profile params] (let [params (us/conform ::feedback params) destination (cfg/get :feedback-destination)] - (emails/send! pool emails/feedback - {:to destination - :profile profile - :reply-to (:from params) - :email (:from params) - :subject (:subject params) - :content (:content params)}) + (eml/send! {::eml/conn pool + ::eml/factory eml/feedback + :to destination + :profile profile + :reply-to (:from params) + :email (:from params) + :subject (:subject params) + :content (:content params)}) nil)) diff --git a/backend/src/app/http/oauth/github.clj b/backend/src/app/http/oauth/github.clj index bfa4c3c14..379b9143a 100644 --- a/backend/src/app/http/oauth/github.clj +++ b/backend/src/app/http/oauth/github.clj @@ -5,13 +5,12 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.http.oauth.github (:require [app.common.exceptions :as ex] [app.common.spec :as us] - [app.config :as cfg] [app.http.oauth.google :as gg] [app.util.http :as http] [app.util.time :as dt] @@ -105,7 +104,7 @@ state (tokens :generate {:iss :github-oauth :invitation-token invitation :exp (dt/in-future "15m")}) - params {:client_id (:client-id cfg/config) + params {:client_id (:client-id cfg) :redirect_uri (build-redirect-url cfg) :state state :scope scope} diff --git a/backend/src/app/loggers/mattermost.clj b/backend/src/app/loggers/mattermost.clj index c2247e71e..f7eef7a1c 100644 --- a/backend/src/app/loggers/mattermost.clj +++ b/backend/src/app/loggers/mattermost.clj @@ -65,7 +65,7 @@ (try (let [uri (:uri cfg) text (str "Unhandled exception (@channel):\n" - "- detail: " (:public-uri cfg/config) "/dbg/error-by-id/" id "\n" + "- detail: " (cfg/get :public-uri) "/dbg/error-by-id/" id "\n" "- host: `" host "`\n" "- version: `" version "`\n" (when error diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 9d72865f1..e1a98a56a 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -5,353 +5,332 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.main (:require [app.common.data :as d] - [app.config :as cfg] + [app.config :as cf] [app.util.time :as dt] - [clojure.pprint :as pprint] [clojure.tools.logging :as log] [integrant.core :as ig])) -;; Set value for all new threads bindings. -(alter-var-root #'*assert* (constantly (:asserts-enabled cfg/config))) +(def system-config + {:app.db/pool + {:uri (cf/get :database-uri) + :username (cf/get :database-username) + :password (cf/get :database-password) + :metrics (ig/ref :app.metrics/metrics) + :migrations (ig/ref :app.migrations/all) + :name :main + :min-pool-size 0 + :max-pool-size 20} -(derive :app.telemetry/server :app.http/server) + :app.metrics/metrics + {:definitions + {:profile-register + {:name "actions_profile_register_count" + :help "A global counter of user registrations." + :type :counter} + :profile-activation + {:name "actions_profile_activation_count" + :help "A global counter of profile activations" + :type :counter}}} -;; --- Entry point + :app.migrations/all + {:main (ig/ref :app.migrations/migrations)} -(defn build-system-config - [config] - (d/deep-merge - {:app.db/pool - {:uri (:database-uri config) - :username (:database-username config) - :password (:database-password config) - :metrics (ig/ref :app.metrics/metrics) - :migrations (ig/ref :app.migrations/all) - :name "main" - :min-pool-size 0 - :max-pool-size 20} + :app.migrations/migrations + {} - :app.metrics/metrics - {:definitions - {:profile-register - {:name "actions_profile_register_count" - :help "A global counter of user registrations." - :type :counter} - :profile-activation - {:name "actions_profile_activation_count" - :help "A global counter of profile activations" - :type :counter}}} + :app.msgbus/msgbus + {:backend (cf/get :msgbus-backend :redis) + :redis-uri (cf/get :redis-uri)} - :app.migrations/all - {:main (ig/ref :app.migrations/migrations) - :telemetry (ig/ref :app.telemetry/migrations)} + :app.tokens/tokens + {:sprops (ig/ref :app.setup/props)} - :app.migrations/migrations - {} + :app.storage/gc-deleted-task + {:pool (ig/ref :app.db/pool) + :storage (ig/ref :app.storage/storage) + :min-age (dt/duration {:hours 2})} - :app.telemetry/migrations - {} + :app.storage/gc-touched-task + {:pool (ig/ref :app.db/pool)} - :app.msgbus/msgbus - {:backend (:msgbus-backend config :redis) - :redis-uri (:redis-uri config)} + :app.storage/recheck-task + {:pool (ig/ref :app.db/pool) + :storage (ig/ref :app.storage/storage)} - :app.tokens/tokens - {:sprops (ig/ref :app.setup/props)} + :app.http.session/session + {:pool (ig/ref :app.db/pool) + :cookie-name (cf/get :http-session-cookie-name)} - :app.storage/gc-deleted-task - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage) - :min-age (dt/duration {:hours 2})} + :app.http.session/gc-task + {:pool (ig/ref :app.db/pool) + :max-age (cf/get :http-session-idle-max-age)} - :app.storage/gc-touched-task - {:pool (ig/ref :app.db/pool)} + :app.http.session/updater + {:pool (ig/ref :app.db/pool) + :metrics (ig/ref :app.metrics/metrics) + :executor (ig/ref :app.worker/executor) + :session (ig/ref :app.http.session/session) + :max-batch-age (cf/get :http-session-updater-batch-max-age) + :max-batch-size (cf/get :http-session-updater-batch-max-size)} - :app.storage/recheck-task - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage)} + :app.http.awsns/handler + {:tokens (ig/ref :app.tokens/tokens) + :pool (ig/ref :app.db/pool)} - :app.http.session/session - {:pool (ig/ref :app.db/pool) - :cookie-name (:http-session-cookie-name config)} + :app.http/server + {:port (cf/get :http-server-port) + :router (ig/ref :app.http/router) + :metrics (ig/ref :app.metrics/metrics) + :ws {"/ws/notifications" (ig/ref :app.notifications/handler)}} - :app.http.session/gc-task - {:pool (ig/ref :app.db/pool) - :max-age (:http-session-idle-max-age config)} + :app.http/router + { + :rpc (ig/ref :app.rpc/rpc) + :session (ig/ref :app.http.session/session) + :tokens (ig/ref :app.tokens/tokens) + :public-uri (cf/get :public-uri) + :metrics (ig/ref :app.metrics/metrics) + :oauth (ig/ref :app.http.oauth/all) + :assets (ig/ref :app.http.assets/handlers) + :svgparse (ig/ref :app.svgparse/handler) + :storage (ig/ref :app.storage/storage) + :sns-webhook (ig/ref :app.http.awsns/handler) + :feedback (ig/ref :app.http.feedback/handler) + :error-report-handler (ig/ref :app.loggers.mattermost/handler)} - :app.http.session/updater - {:pool (ig/ref :app.db/pool) - :metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref :app.worker/executor) - :session (ig/ref :app.http.session/session) - :max-batch-age (:http-session-updater-batch-max-age config) - :max-batch-size (:http-session-updater-batch-max-size config)} + :app.http.assets/handlers + {:metrics (ig/ref :app.metrics/metrics) + :assets-path (cf/get :assets-path) + :storage (ig/ref :app.storage/storage) + :cache-max-age (dt/duration {:hours 24}) + :signature-max-age (dt/duration {:hours 24 :minutes 5})} - :app.http.awsns/handler - {:tokens (ig/ref :app.tokens/tokens) - :pool (ig/ref :app.db/pool)} + :app.http.feedback/handler + {:pool (ig/ref :app.db/pool)} - :app.http/server - {:port (:http-server-port config) - :handler (ig/ref :app.http/router) - :metrics (ig/ref :app.metrics/metrics) - :ws {"/ws/notifications" (ig/ref :app.notifications/handler)}} + :app.http.oauth/all + {:google (ig/ref :app.http.oauth/google) + :gitlab (ig/ref :app.http.oauth/gitlab) + :github (ig/ref :app.http.oauth/github)} - :app.http/router - {:rpc (ig/ref :app.rpc/rpc) - :session (ig/ref :app.http.session/session) - :tokens (ig/ref :app.tokens/tokens) - :public-uri (:public-uri config) - :metrics (ig/ref :app.metrics/metrics) - :oauth (ig/ref :app.http.oauth/all) - :assets (ig/ref :app.http.assets/handlers) - :svgparse (ig/ref :app.svgparse/handler) - :storage (ig/ref :app.storage/storage) - :sns-webhook (ig/ref :app.http.awsns/handler) - :feedback (ig/ref :app.http.feedback/handler) - :error-report-handler (ig/ref :app.loggers.mattermost/handler)} + :app.http.oauth/google + {:rpc (ig/ref :app.rpc/rpc) + :session (ig/ref :app.http.session/session) + :tokens (ig/ref :app.tokens/tokens) + :public-uri (cf/get :public-uri) + :client-id (cf/get :google-client-id) + :client-secret (cf/get :google-client-secret)} - :app.http.assets/handlers - {:metrics (ig/ref :app.metrics/metrics) - :assets-path (:assets-path config) - :storage (ig/ref :app.storage/storage) - :cache-max-age (dt/duration {:hours 24}) - :signature-max-age (dt/duration {:hours 24 :minutes 5})} + :app.http.oauth/github + {:rpc (ig/ref :app.rpc/rpc) + :session (ig/ref :app.http.session/session) + :tokens (ig/ref :app.tokens/tokens) + :public-uri (cf/get :public-uri) + :client-id (cf/get :github-client-id) + :client-secret (cf/get :github-client-secret)} - :app.http.feedback/handler - {:pool (ig/ref :app.db/pool)} + :app.http.oauth/gitlab + {:rpc (ig/ref :app.rpc/rpc) + :session (ig/ref :app.http.session/session) + :tokens (ig/ref :app.tokens/tokens) + :public-uri (cf/get :public-uri) + :base-uri (cf/get :gitlab-base-uri) + :client-id (cf/get :gitlab-client-id) + :client-secret (cf/get :gitlab-client-secret)} - :app.http.oauth/all - {:google (ig/ref :app.http.oauth/google) - :gitlab (ig/ref :app.http.oauth/gitlab) - :github (ig/ref :app.http.oauth/github)} + :app.svgparse/svgc + {:metrics (ig/ref :app.metrics/metrics)} - :app.http.oauth/google - {:rpc (ig/ref :app.rpc/rpc) - :session (ig/ref :app.http.session/session) - :tokens (ig/ref :app.tokens/tokens) - :public-uri (:public-uri config) - :client-id (:google-client-id config) - :client-secret (:google-client-secret config)} + ;; HTTP Handler for SVG parsing + :app.svgparse/handler + {:metrics (ig/ref :app.metrics/metrics) + :svgc (ig/ref :app.svgparse/svgc)} - :app.http.oauth/github - {:rpc (ig/ref :app.rpc/rpc) - :session (ig/ref :app.http.session/session) - :tokens (ig/ref :app.tokens/tokens) - :public-uri (:public-uri config) - :client-id (:github-client-id config) - :client-secret (:github-client-secret config)} + ;; RLimit definition for password hashing + :app.rlimits/password + (cf/get :rlimits-password) - :app.http.oauth/gitlab - {:rpc (ig/ref :app.rpc/rpc) - :session (ig/ref :app.http.session/session) - :tokens (ig/ref :app.tokens/tokens) - :public-uri (:public-uri config) - :base-uri (:gitlab-base-uri config) - :client-id (:gitlab-client-id config) - :client-secret (:gitlab-client-secret config)} + ;; RLimit definition for image processing + :app.rlimits/image + (cf/get :rlimits-image) - ;; HTTP Handler for SVG parsing - :app.svgparse/handler - {:metrics (ig/ref :app.metrics/metrics)} + ;; A collection of rlimits as hash-map. + :app.rlimits/all + {:password (ig/ref :app.rlimits/password) + :image (ig/ref :app.rlimits/image)} - ;; RLimit definition for password hashing - :app.rlimits/password - (:rlimits-password config) + :app.rpc/rpc + {:pool (ig/ref :app.db/pool) + :session (ig/ref :app.http.session/session) + :tokens (ig/ref :app.tokens/tokens) + :metrics (ig/ref :app.metrics/metrics) + :storage (ig/ref :app.storage/storage) + :msgbus (ig/ref :app.msgbus/msgbus) + :rlimits (ig/ref :app.rlimits/all) + :svgc (ig/ref :app.svgparse/svgc) + :public-uri (cf/get :public-uri)} - ;; RLimit definition for image processing - :app.rlimits/image - (:rlimits-image config) + :app.notifications/handler + {:msgbus (ig/ref :app.msgbus/msgbus) + :pool (ig/ref :app.db/pool) + :session (ig/ref :app.http.session/session) + :metrics (ig/ref :app.metrics/metrics) + :executor (ig/ref :app.worker/executor)} - ;; A collection of rlimits as hash-map. - :app.rlimits/all - {:password (ig/ref :app.rlimits/password) - :image (ig/ref :app.rlimits/image)} + :app.worker/executor + {:min-threads 0 + :max-threads 256 + :idle-timeout 60000 + :name :worker} - :app.rpc/rpc - {:pool (ig/ref :app.db/pool) - :session (ig/ref :app.http.session/session) - :tokens (ig/ref :app.tokens/tokens) - :metrics (ig/ref :app.metrics/metrics) - :storage (ig/ref :app.storage/storage) - :msgbus (ig/ref :app.msgbus/msgbus) - :rlimits (ig/ref :app.rlimits/all)} + :app.worker/worker + {:executor (ig/ref :app.worker/executor) + :tasks (ig/ref :app.worker/registry) + :metrics (ig/ref :app.metrics/metrics) + :pool (ig/ref :app.db/pool)} - :app.notifications/handler - {:msgbus (ig/ref :app.msgbus/msgbus) - :pool (ig/ref :app.db/pool) - :session (ig/ref :app.http.session/session) - :metrics (ig/ref :app.metrics/metrics) - :executor (ig/ref :app.worker/executor)} + :app.worker/scheduler + {:executor (ig/ref :app.worker/executor) + :tasks (ig/ref :app.worker/registry) + :pool (ig/ref :app.db/pool) + :schedule + [{:cron #app/cron "0 0 0 */1 * ? *" ;; daily + :task :file-media-gc} - :app.worker/executor - {:name "worker"} + {:cron #app/cron "0 0 */1 * * ?" ;; hourly + :task :file-xlog-gc} - :app.worker/worker - {:executor (ig/ref :app.worker/executor) - :pool (ig/ref :app.db/pool) - :tasks (ig/ref :app.tasks/registry)} + {:cron #app/cron "0 0 1 */1 * ?" ;; daily (1 hour shift) + :task :storage-deleted-gc} - :app.worker/scheduler - {:executor (ig/ref :app.worker/executor) - :pool (ig/ref :app.db/pool) - :tasks (ig/ref :app.tasks/registry) - :schedule - [{:id "file-media-gc" - :cron #app/cron "0 0 0 */1 * ? *" ;; daily - :task :file-media-gc} + {:cron #app/cron "0 0 2 */1 * ?" ;; daily (2 hour shift) + :task :storage-touched-gc} - {:id "file-xlog-gc" - :cron #app/cron "0 0 */1 * * ?" ;; hourly - :task :file-xlog-gc} + {:cron #app/cron "0 0 3 */1 * ?" ;; daily (3 hour shift) + :task :session-gc} - {:id "storage-deleted-gc" - :cron #app/cron "0 0 1 */1 * ?" ;; daily (1 hour shift) - :task :storage-deleted-gc} + {:cron #app/cron "0 0 */1 * * ?" ;; hourly + :task :storage-recheck} - {:id "storage-touched-gc" - :cron #app/cron "0 0 2 */1 * ?" ;; daily (2 hour shift) - :task :storage-touched-gc} + {:cron #app/cron "0 0 0 */1 * ?" ;; daily + :task :tasks-gc} - {:id "session-gc" - :cron #app/cron "0 0 3 */1 * ?" ;; daily (3 hour shift) - :task :session-gc} + (when (cf/get :telemetry-enabled) + {:cron #app/cron "0 0 */6 * * ?" ;; every 6h + :uri (cf/get :telemetry-uri) + :task :telemetry})]} - {:id "storage-recheck" - :cron #app/cron "0 0 */1 * * ?" ;; hourly - :task :storage-recheck} + :app.worker/registry + {:metrics (ig/ref :app.metrics/metrics) + :tasks + {:sendmail (ig/ref :app.emails/sendmail-handler) + :delete-object (ig/ref :app.tasks.delete-object/handler) + :delete-profile (ig/ref :app.tasks.delete-profile/handler) + :file-media-gc (ig/ref :app.tasks.file-media-gc/handler) + :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) + :storage-deleted-gc (ig/ref :app.storage/gc-deleted-task) + :storage-touched-gc (ig/ref :app.storage/gc-touched-task) + :storage-recheck (ig/ref :app.storage/recheck-task) + :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) + :telemetry (ig/ref :app.tasks.telemetry/handler) + :session-gc (ig/ref :app.http.session/gc-task)}} - {:id "tasks-gc" - :cron #app/cron "0 0 0 */1 * ?" ;; daily - :task :tasks-gc} + :app.emails/sendmail-handler + {:host (cf/get :smtp-host) + :port (cf/get :smtp-port) + :ssl (cf/get :smtp-ssl) + :tls (cf/get :smtp-tls) + :enabled (cf/get :smtp-enabled) + :username (cf/get :smtp-username) + :password (cf/get :smtp-password) + :metrics (ig/ref :app.metrics/metrics) + :default-reply-to (cf/get :smtp-default-reply-to) + :default-from (cf/get :smtp-default-from)} - (when (:telemetry-enabled config) - {:id "telemetry" - :cron #app/cron "0 0 */6 * * ?" ;; every 6h - :uri (:telemetry-uri config) - :task :telemetry})]} + :app.tasks.tasks-gc/handler + {:pool (ig/ref :app.db/pool) + :max-age (dt/duration {:hours 24}) + :metrics (ig/ref :app.metrics/metrics)} - :app.tasks/registry - {:metrics (ig/ref :app.metrics/metrics) - :tasks - {:sendmail (ig/ref :app.tasks.sendmail/handler) - :delete-object (ig/ref :app.tasks.delete-object/handler) - :delete-profile (ig/ref :app.tasks.delete-profile/handler) - :file-media-gc (ig/ref :app.tasks.file-media-gc/handler) - :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) - :storage-deleted-gc (ig/ref :app.storage/gc-deleted-task) - :storage-touched-gc (ig/ref :app.storage/gc-touched-task) - :storage-recheck (ig/ref :app.storage/recheck-task) - :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) - :telemetry (ig/ref :app.tasks.telemetry/handler) - :session-gc (ig/ref :app.http.session/gc-task)}} + :app.tasks.delete-object/handler + {:pool (ig/ref :app.db/pool) + :metrics (ig/ref :app.metrics/metrics)} - :app.tasks.sendmail/handler - {:host (:smtp-host config) - :port (:smtp-port config) - :ssl (:smtp-ssl config) - :tls (:smtp-tls config) - :enabled (:smtp-enabled config) - :username (:smtp-username config) - :password (:smtp-password config) - :metrics (ig/ref :app.metrics/metrics) - :default-reply-to (:smtp-default-reply-to config) - :default-from (:smtp-default-from config)} + :app.tasks.delete-storage-object/handler + {:pool (ig/ref :app.db/pool) + :storage (ig/ref :app.storage/storage) + :metrics (ig/ref :app.metrics/metrics)} - :app.tasks.tasks-gc/handler - {:pool (ig/ref :app.db/pool) - :max-age (dt/duration {:hours 24}) - :metrics (ig/ref :app.metrics/metrics)} + :app.tasks.delete-profile/handler + {:pool (ig/ref :app.db/pool) + :metrics (ig/ref :app.metrics/metrics)} - :app.tasks.delete-object/handler - {:pool (ig/ref :app.db/pool) - :metrics (ig/ref :app.metrics/metrics)} + :app.tasks.file-media-gc/handler + {:pool (ig/ref :app.db/pool) + :metrics (ig/ref :app.metrics/metrics) + :max-age (dt/duration {:hours 48})} - :app.tasks.delete-storage-object/handler - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage) - :metrics (ig/ref :app.metrics/metrics)} + :app.tasks.file-xlog-gc/handler + {:pool (ig/ref :app.db/pool) + :metrics (ig/ref :app.metrics/metrics) + :max-age (dt/duration {:hours 48})} - :app.tasks.delete-profile/handler - {:pool (ig/ref :app.db/pool) - :metrics (ig/ref :app.metrics/metrics)} + :app.tasks.telemetry/handler + {:pool (ig/ref :app.db/pool) + :version (:full cf/version) + :uri (cf/get :telemetry-uri) + :sprops (ig/ref :app.setup/props)} - :app.tasks.file-media-gc/handler - {:pool (ig/ref :app.db/pool) - :metrics (ig/ref :app.metrics/metrics) - :max-age (dt/duration {:hours 48})} + :app.srepl/server + {:port (cf/get :srepl-port) + :host (cf/get :srepl-host)} - :app.tasks.file-xlog-gc/handler - {:pool (ig/ref :app.db/pool) - :metrics (ig/ref :app.metrics/metrics) - :max-age (dt/duration {:hours 48})} + :app.setup/props + {:pool (ig/ref :app.db/pool)} - :app.tasks.telemetry/handler - {:pool (ig/ref :app.db/pool) - :version (:full cfg/version) - :uri (:telemetry-uri config) - :sprops (ig/ref :app.setup/props)} + :app.loggers.zmq/receiver + {:endpoint (cf/get :loggers-zmq-uri)} - :app.srepl/server - {:port (:srepl-port config) - :host (:srepl-host config)} + :app.loggers.loki/reporter + {:uri (cf/get :loggers-loki-uri) + :receiver (ig/ref :app.loggers.zmq/receiver) + :executor (ig/ref :app.worker/executor)} - :app.setup/props - {:pool (ig/ref :app.db/pool)} + :app.loggers.mattermost/reporter + {:uri (cf/get :error-report-webhook) + :receiver (ig/ref :app.loggers.zmq/receiver) + :pool (ig/ref :app.db/pool) + :executor (ig/ref :app.worker/executor)} - :app.loggers.zmq/receiver - {:endpoint (:loggers-zmq-uri config)} + :app.loggers.mattermost/handler + {:pool (ig/ref :app.db/pool)} - :app.loggers.loki/reporter - {:uri (:loggers-loki-uri config) - :receiver (ig/ref :app.loggers.zmq/receiver) - :executor (ig/ref :app.worker/executor)} + :app.storage/storage + {:pool (ig/ref :app.db/pool) + :executor (ig/ref :app.worker/executor) + :backend (cf/get :storage-backend :fs) + :backends {:s3 (ig/ref [::main :app.storage.s3/backend]) + :db (ig/ref [::main :app.storage.db/backend]) + :fs (ig/ref [::main :app.storage.fs/backend]) + :tmp (ig/ref [::tmp :app.storage.fs/backend])}} - :app.loggers.mattermost/reporter - {:uri (:error-report-webhook config) - :receiver (ig/ref :app.loggers.zmq/receiver) - :pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor)} + [::main :app.storage.s3/backend] + {:region (cf/get :storage-s3-region) + :bucket (cf/get :storage-s3-bucket)} - :app.loggers.mattermost/handler - {:pool (ig/ref :app.db/pool)} + [::main :app.storage.fs/backend] + {:directory (cf/get :storage-fs-directory)} - :app.storage/storage - {:pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor) - :backend (:storage-backend config :fs) - :backends {:s3 (ig/ref [::main :app.storage.s3/backend]) - :db (ig/ref [::main :app.storage.db/backend]) - :fs (ig/ref [::main :app.storage.fs/backend]) - :tmp (ig/ref [::tmp :app.storage.fs/backend])}} + [::tmp :app.storage.fs/backend] + {:directory "/tmp/penpot"} - [::main :app.storage.s3/backend] - {:region (:storage-s3-region config) - :bucket (:storage-s3-bucket config)} - - [::main :app.storage.fs/backend] - {:directory (:storage-fs-directory config)} - - [::tmp :app.storage.fs/backend] - {:directory "/tmp/penpot"} - - [::main :app.storage.db/backend] - {:pool (ig/ref :app.db/pool)}} - - (when (:telemetry-server-enabled config) - {:app.telemetry/handler - {:pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor)} - - :app.telemetry/server - {:port (:telemetry-server-port config 6063) - :handler (ig/ref :app.telemetry/handler) - :name "telemetry"}}))) + [::main :app.storage.db/backend] + {:pool (ig/ref :app.db/pool)}}) (defmethod ig/init-key :default [_ data] data) (defmethod ig/prep-key :default @@ -364,15 +343,14 @@ (defn start [] - (let [system-config (build-system-config cfg/config)] - (ig/load-namespaces system-config) - (alter-var-root #'system (fn [sys] - (when sys (ig/halt! sys)) - (-> system-config - (ig/prep) - (ig/init)))) - (log/infof "welcome to penpot (version: '%s')" - (:full cfg/version)))) + (ig/load-namespaces system-config) + (alter-var-root #'system (fn [sys] + (when sys (ig/halt! sys)) + (-> system-config + (ig/prep) + (ig/init)))) + (log/infof "welcome to penpot (version: '%s')" + (:full cf/version))) (defn stop [] @@ -380,14 +358,6 @@ (when sys (ig/halt! sys)) nil))) -(prefer-method print-method - clojure.lang.IRecord - clojure.lang.IDeref) - -(prefer-method pprint/simple-dispatch - clojure.lang.IPersistentMap - clojure.lang.IDeref) - (defn -main [& _args] (start)) diff --git a/backend/src/app/rpc/mutations/demo.clj b/backend/src/app/rpc/mutations/demo.clj index 80658a5da..4f78184bf 100644 --- a/backend/src/app/rpc/mutations/demo.clj +++ b/backend/src/app/rpc/mutations/demo.clj @@ -5,7 +5,7 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.rpc.mutations.demo "A demo specific mutations." @@ -16,8 +16,8 @@ [app.db :as db] [app.rpc.mutations.profile :as profile] [app.setup.initial-data :as sid] - [app.tasks :as tasks] [app.util.services :as sv] + [app.worker :as wrk] [buddy.core.codecs :as bc] [buddy.core.nonce :as bn] [clojure.spec.alpha :as s])) @@ -40,7 +40,7 @@ :password password :props {:onboarding-viewed true}}] - (when-not (:allow-demo-users cfg/config) + (when-not (cfg/get :allow-demo-users) (ex/raise :type :validation :code :demo-users-not-allowed :hint "Demo users are disabled by config.")) @@ -51,9 +51,10 @@ (sid/load-initial-project! conn)) ;; Schedule deletion of the demo profile - (tasks/submit! conn {:name "delete-profile" - :delay cfg/deletion-delay - :props {:profile-id id}}) + (wrk/submit! {::wrk/task :delete-profile + ::wrk/delay cfg/deletion-delay + ::wrk/conn conn + :profile-id id}) {:email email :password password}))) diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index 0631429d5..fb547bfa9 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -5,7 +5,7 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.rpc.mutations.files (:require @@ -19,10 +19,10 @@ [app.rpc.permissions :as perms] [app.rpc.queries.files :as files] [app.rpc.queries.projects :as proj] - [app.tasks :as tasks] [app.util.blob :as blob] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s])) ;; --- Helpers & Specs @@ -126,9 +126,11 @@ (files/check-edition-permissions! conn profile-id id) ;; Schedule object deletion - (tasks/submit! conn {:name "delete-object" - :delay cfg/deletion-delay - :props {:id id :type :file}}) + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay cfg/deletion-delay + ::wrk/conn conn + :id id + :type :file}) (mark-file-deleted conn params))) diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj index dc0ec95fe..57e79e402 100644 --- a/backend/src/app/rpc/mutations/profile.clj +++ b/backend/src/app/rpc/mutations/profile.clj @@ -14,16 +14,16 @@ [app.common.uuid :as uuid] [app.config :as cfg] [app.db :as db] - [app.emails :as emails] + [app.emails :as eml] [app.media :as media] [app.rpc.mutations.projects :as projects] [app.rpc.mutations.teams :as teams] [app.rpc.queries.profile :as profile] [app.setup.initial-data :as sid] [app.storage :as sto] - [app.tasks :as tasks] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [buddy.hashers :as hashers] [clojure.spec.alpha :as s] [cuerdas.core :as str])) @@ -117,16 +117,19 @@ ;; Don't allow proceed in register page if the email is ;; already reported as permanent bounced - (when (emails/has-bounce-reports? conn (:email profile)) + (when (eml/has-bounce-reports? conn (:email profile)) (ex/raise :type :validation :code :email-has-permanent-bounces :hint "looks like the email has one or many bounces reported")) - (emails/send! conn emails/register - {:to (:email profile) - :name (:fullname profile) - :token vtoken - :extra-data ptoken}) + (eml/send! {::eml/conn conn + ::eml/factory eml/register + :public-uri (:public-uri cfg) + :to (:email profile) + :name (:fullname profile) + :token vtoken + :extra-data ptoken}) + (with-meta profile {:before-complete (annotate-profile-register metrics profile)}))))) @@ -439,7 +442,7 @@ {:changed true}) (defn- request-email-change - [{:keys [conn tokens]} {:keys [profile email] :as params}] + [{:keys [conn tokens] :as cfg} {:keys [profile email] :as params}] (let [token (tokens :generate {:iss :change-email :exp (dt/in-future "15m") @@ -452,22 +455,24 @@ (when (not= email (:email profile)) (check-profile-existence! conn params)) - (when-not (emails/allow-send-emails? conn profile) + (when-not (eml/allow-send-emails? conn profile) (ex/raise :type :validation :code :profile-is-muted :hint "looks like the profile has reported repeatedly as spam or has permanent bounces.")) - (when (emails/has-bounce-reports? conn email) + (when (eml/has-bounce-reports? conn email) (ex/raise :type :validation :code :email-has-permanent-bounces :hint "looks like the email you invite has been repeatedly reported as spam or permanent bounce")) - (emails/send! conn emails/change-email - {:to (:email profile) - :name (:fullname profile) - :pending-email email - :token token - :extra-data ptoken}) + (eml/send! {::eml/conn conn + ::eml/factory eml/change-email + :public-uri (:public-uri cfg) + :to (:email profile) + :name (:fullname profile) + :pending-email email + :token token + :extra-data ptoken}) nil)) @@ -493,16 +498,18 @@ (let [ptoken (tokens :generate-predefined {:iss :profile-identity :profile-id (:id profile)})] - (emails/send! conn emails/password-recovery - {:to (:email profile) - :token (:token profile) - :name (:fullname profile) - :extra-data ptoken}) + (eml/send! {::eml/conn conn + ::eml/factory eml/password-recovery + :public-uri (:public-uri cfg) + :to (:email profile) + :token (:token profile) + :name (:fullname profile) + :extra-data ptoken}) nil))] (db/with-atomic [conn pool] (when-let [profile (profile/retrieve-profile-data-by-email conn email)] - (when-not (emails/allow-send-emails? conn profile) + (when-not (eml/allow-send-emails? conn profile) (ex/raise :type :validation :code :profile-is-muted :hint "looks like the profile has reported repeatedly as spam or has permanent bounces.")) @@ -512,7 +519,7 @@ :code :profile-not-verified :hint "the user need to validate profile before recover password")) - (when (emails/has-bounce-reports? conn (:email profile)) + (when (eml/has-bounce-reports? conn (:email profile)) (ex/raise :type :validation :code :email-has-permanent-bounces :hint "looks like the email you invite has been repeatedly reported as spam or permanent bounce")) @@ -579,9 +586,10 @@ (check-can-delete-profile! conn profile-id) ;; Schedule a complete deletion of profile - (tasks/submit! conn {:name "delete-profile" - :delay cfg/deletion-delay - :props {:profile-id profile-id}}) + (wrk/submit! {::wrk/task :delete-profile + ::wrk/dalay cfg/deletion-delay + ::wrk/conn conn + :profile-id profile-id}) (db/update! conn :profile {:deleted-at (dt/now)} diff --git a/backend/src/app/rpc/mutations/projects.clj b/backend/src/app/rpc/mutations/projects.clj index 3dbdef8f0..1a03fd34d 100644 --- a/backend/src/app/rpc/mutations/projects.clj +++ b/backend/src/app/rpc/mutations/projects.clj @@ -16,9 +16,9 @@ [app.rpc.permissions :as perms] [app.rpc.queries.projects :as proj] [app.rpc.queries.teams :as teams] - [app.tasks :as tasks] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s])) ;; --- Helpers & Specs @@ -128,9 +128,11 @@ (proj/check-edition-permissions! conn profile-id id) ;; Schedule object deletion - (tasks/submit! conn {:name "delete-object" - :delay cfg/deletion-delay - :props {:id id :type :project}}) + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay cfg/deletion-delay + ::wrk/conn conn + :id id + :type :project}) (db/update! conn :project {:deleted-at (dt/now)} diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj index c1934de9d..46de58083 100644 --- a/backend/src/app/rpc/mutations/teams.clj +++ b/backend/src/app/rpc/mutations/teams.clj @@ -5,7 +5,7 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.rpc.mutations.teams (:require @@ -15,16 +15,16 @@ [app.common.uuid :as uuid] [app.config :as cfg] [app.db :as db] - [app.emails :as emails] + [app.emails :as eml] [app.media :as media] [app.rpc.mutations.projects :as projects] [app.rpc.permissions :as perms] [app.rpc.queries.profile :as profile] [app.rpc.queries.teams :as teams] [app.storage :as sto] - [app.tasks :as tasks] [app.util.services :as sv] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [datoteka.core :as fs])) @@ -139,9 +139,11 @@ :code :only-owner-can-delete-team)) ;; Schedule object deletion - (tasks/submit! conn {:name "delete-object" - :delay cfg/deletion-delay - :props {:id id :type :team}}) + (wrk/submit! {::wrk/task :delete-object + ::wrk/delay cfg/deletion-delay + ::wrk/conn conn + :id id + :type :team}) (db/update! conn :team {:deleted-at (dt/now)} @@ -323,27 +325,29 @@ :code :insufficient-permissions)) ;; First check if the current profile is allowed to send emails. - (when-not (emails/allow-send-emails? conn profile) + (when-not (eml/allow-send-emails? conn profile) (ex/raise :type :validation :code :profile-is-muted :hint "looks like the profile has reported repeatedly as spam or has permanent bounces")) - (when (and member (not (emails/allow-send-emails? conn member))) + (when (and member (not (eml/allow-send-emails? conn member))) (ex/raise :type :validation :code :member-is-muted :hint "looks like the profile has reported repeatedly as spam or has permanent bounces")) ;; Secondly check if the invited member email is part of the ;; global spam/bounce report. - (when (emails/has-bounce-reports? conn email) + (when (eml/has-bounce-reports? conn email) (ex/raise :type :validation :code :email-has-permanent-bounces :hint "looks like the email you invite has been repeatedly reported as spam or permanent bounce")) - (emails/send! conn emails/invite-to-team - {:to email - :invited-by (:fullname profile) - :team (:name team) - :token itoken - :extra-data ptoken}) + (eml/send! {::eml/conn conn + ::eml/factory eml/invite-to-team + :public-uri (:public-uri cfg) + :to email + :invited-by (:fullname profile) + :team (:name team) + :token itoken + :extra-data ptoken}) nil))) diff --git a/backend/src/app/tasks.clj b/backend/src/app/tasks.clj deleted file mode 100644 index 5ef8f0d4f..000000000 --- a/backend/src/app/tasks.clj +++ /dev/null @@ -1,110 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; This Source Code Form is "Incompatible With Secondary Licenses", as -;; defined by the Mozilla Public License, v. 2.0. -;; -;; Copyright (c) 2020-2021 UXBOX Labs SL - -(ns app.tasks - (:require - [app.common.spec :as us] - [app.common.uuid :as uuid] - [app.db :as db] - [app.metrics :as mtx] - [app.util.time :as dt] - [app.worker] - [clojure.spec.alpha :as s] - [clojure.tools.logging :as log] - [integrant.core :as ig])) - -(s/def ::name ::us/string) -(s/def ::delay - (s/or :int ::us/integer - :duration dt/duration?)) -(s/def ::queue ::us/string) - -(s/def ::task-options - (s/keys :req-un [::name] - :opt-un [::delay ::props ::queue])) - -(def ^:private sql:insert-new-task - "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) - values (?, ?, ?, ?, ?, ?, clock_timestamp() + ?) - returning id") - -(defn submit! - [conn {:keys [name delay props queue priority max-retries] - :or {delay 0 props {} queue "default" priority 100 max-retries 3} - :as options}] - (us/verify ::task-options options) - (let [duration (dt/duration delay) - interval (db/interval duration) - props (db/tjson props) - id (uuid/next)] - (log/debugf "submit task '%s' to be executed in '%s'" name (str duration)) - (db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval]) - id)) - -(defn- instrument! - [registry] - (mtx/instrument-vars! - [#'submit!] - {:registry registry - :type :counter - :labels ["name"] - :name "tasks_submit_total" - :help "A counter of task submissions." - :wrap (fn [rootf mobj] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn [conn params] - (let [tname (:name params)] - (mobj :inc [tname]) - (origf conn params))) - {::original origf})))}) - - (mtx/instrument-vars! - [#'app.worker/run-task] - {:registry registry - :type :summary - :quantiles [] - :name "tasks_checkout_timing" - :help "Latency measured between scheduld_at and execution time." - :wrap (fn [rootf mobj] - (let [mdata (meta rootf) - origf (::original mdata rootf)] - (with-meta - (fn [tasks item] - (let [now (inst-ms (dt/now)) - sat (inst-ms (:scheduled-at item))] - (mobj :observe (- now sat)) - (origf tasks item))) - {::original origf})))})) - -;; --- STATE INIT: REGISTRY - -(s/def ::tasks - (s/map-of keyword? fn?)) - -(defmethod ig/pre-init-spec ::registry [_] - (s/keys :req-un [::mtx/metrics ::tasks])) - -(defmethod ig/init-key ::registry - [_ {:keys [metrics tasks]}] - (instrument! (:registry metrics)) - (let [mobj (mtx/create - {:registry (:registry metrics) - :type :summary - :labels ["name"] - :quantiles [] - :name "tasks_timing" - :help "Background task execution timing."})] - (reduce-kv (fn [res k v] - (let [tname (name k)] - (log/debugf "registring task '%s'" tname) - (assoc res tname (mtx/wrap-summary v mobj [tname])))) - {} - tasks))) diff --git a/backend/src/app/tasks/sendmail.clj b/backend/src/app/tasks/sendmail.clj index 0619b75a2..333f7efd1 100644 --- a/backend/src/app/tasks/sendmail.clj +++ b/backend/src/app/tasks/sendmail.clj @@ -5,7 +5,7 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020-2021 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.tasks.sendmail (:require diff --git a/backend/src/app/tasks/telemetry.clj b/backend/src/app/tasks/telemetry.clj index 64ca03c8e..4de76ce0f 100644 --- a/backend/src/app/tasks/telemetry.clj +++ b/backend/src/app/tasks/telemetry.clj @@ -5,13 +5,14 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020 UXBOX Labs SL +;; Copyright (c) UXBOX Labs SL (ns app.tasks.telemetry "A task that is reponsible to collect anonymous statistical information about the current instance and send it to the telemetry server." (:require + [app.common.data :as d] [app.common.exceptions :as ex] [app.common.spec :as us] [app.config :as cfg] @@ -32,7 +33,6 @@ (s/def ::sprops (s/keys :req-un [::instance-id])) - (defmethod ig/pre-init-spec ::handler [_] (s/keys :req-un [::db/pool ::version ::uri ::sprops])) @@ -128,11 +128,16 @@ (defn- retrieve-stats [{:keys [conn version]}] - (merge - {:version version - :with-taiga (:telemetry-with-taiga cfg/config false) - :total-teams (retrieve-num-teams conn) - :total-projects (retrieve-num-projects conn) - :total-files (retrieve-num-files conn)} - (retrieve-team-averages conn) - (retrieve-jvm-stats))) + (let [referer (if (cfg/get :telemetry-with-taiga) + "taiga" + (cfg/get :telemetry-referer))] + (-> {:version version + :referer referer + :total-teams (retrieve-num-teams conn) + :total-projects (retrieve-num-projects conn) + :total-files (retrieve-num-files conn)} + (d/merge + (retrieve-team-averages conn) + (retrieve-jvm-stats)) + (d/without-nils)))) + diff --git a/backend/src/app/telemetry.clj b/backend/src/app/telemetry.clj deleted file mode 100644 index a5268ef23..000000000 --- a/backend/src/app/telemetry.clj +++ /dev/null @@ -1,121 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; This Source Code Form is "Incompatible With Secondary Licenses", as -;; defined by the Mozilla Public License, v. 2.0. -;; -;; Copyright (c) 2020-2021 UXBOX Labs SL - -(ns app.telemetry - (:require - [app.common.spec :as us] - [app.db :as db] - [app.http.middleware :refer [wrap-parse-request-body]] - [clojure.pprint :refer [pprint]] - [clojure.spec.alpha :as s] - [clojure.tools.logging :as log] - [integrant.core :as ig] - [promesa.exec :as px] - [ring.middleware.keyword-params :refer [wrap-keyword-params]] - [ring.middleware.params :refer [wrap-params]])) - - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Migrations -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(def sql:create-instance-table - "CREATE TABLE IF NOT EXISTS telemetry.instance ( - id uuid PRIMARY KEY, - created_at timestamptz NOT NULL DEFAULT now() - );") - -(def sql:create-info-table - "CREATE TABLE telemetry.info ( - instance_id uuid, - created_at timestamptz NOT NULL DEFAULT clock_timestamp(), - data jsonb NOT NULL, - - PRIMARY KEY (instance_id, created_at) - ) PARTITION BY RANGE(created_at); - - CREATE TABLE telemetry.info_default (LIKE telemetry.info INCLUDING ALL); - - ALTER TABLE telemetry.info - ATTACH PARTITION telemetry.info_default DEFAULT;") - -(def migrations - [{:name "0001-add-telemetry-schema" - :fn #(db/exec! % ["CREATE SCHEMA IF NOT EXISTS telemetry;"])} - - {:name "0002-add-instance-table" - :fn #(db/exec! % [sql:create-instance-table])} - - {:name "0003-add-info-table" - :fn #(db/exec! % [sql:create-info-table])} - - {:name "0004-del-instance-table" - :fn #(db/exec! % ["DROP TABLE telemetry.instance;"])}]) - -(defmethod ig/init-key ::migrations [_ _] migrations) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Router Handler -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(declare handler) -(declare process-request) - -(defmethod ig/init-key ::handler - [_ cfg] - (-> (partial handler cfg) - (wrap-keyword-params) - (wrap-params) - (wrap-parse-request-body))) - -(s/def ::instance-id ::us/uuid) -(s/def ::params (s/keys :req-un [::instance-id])) - -(defn handler - [{:keys [executor] :as cfg} {:keys [params] :as request}] - (try - (let [params (us/conform ::params params) - cfg (assoc cfg - :instance-id (:instance-id params) - :data (dissoc params :instance-id))] - (px/run! executor (partial process-request cfg))) - (catch Exception e - ;; We don't want notify user of a error, just log it for posible - ;; future investigation. - (log/warn e (str "unexpected error on telemetry:\n" - (when-let [edata (ex-data e)] - (str "ex-data: \n" - (with-out-str (pprint edata)))) - (str "params: \n" - (with-out-str (pprint params))))))) - {:status 200 - :body "OK\n"}) - - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Request Processing -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(def sql:insert-instance-info - "insert into telemetry.info (instance_id, data, created_at) - values (?, ?, date_trunc('day', now())) - on conflict (instance_id, created_at) - do update set data = ?") - -(defn- process-request - [{:keys [pool instance-id data]}] - (try - (db/with-atomic [conn pool] - (let [data (db/json data)] - (db/exec! conn [sql:insert-instance-info - instance-id - data - data]))) - (catch Exception e - (log/errorf e "error on procesing request")))) diff --git a/backend/src/app/util/blob.clj b/backend/src/app/util/blob.clj index b478f9de2..b3ecf3249 100644 --- a/backend/src/app/util/blob.clj +++ b/backend/src/app/util/blob.clj @@ -5,13 +5,13 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2016-2020 Andrey Antukh +;; Copyright (c) UXBOX Labs SL (ns app.util.blob - "A generic blob storage encoding. Mainly used for - page data, page options and txlog payload storage." + "A generic blob storage encoding. Mainly used for page data, page + options and txlog payload storage." (:require - [app.config :as cfg] + [app.config :as cf] [app.util.transit :as t] [taoensso.nippy :as n]) (:import @@ -33,17 +33,15 @@ (declare encode-v2) (declare encode-v3) -(def default-version - (:default-blob-version cfg/config 1)) - (defn encode ([data] (encode data nil)) - ([data {:keys [version] :or {version default-version}}] - (case (long version) - 1 (encode-v1 data) - 2 (encode-v2 data) - 3 (encode-v3 data) - (throw (ex-info "unsupported version" {:version version}))))) + ([data {:keys [version]}] + (let [version (or version (cf/get :default-blob-version 1))] + (case (long version) + 1 (encode-v1 data) + 2 (encode-v2 data) + 3 (encode-v3 data) + (throw (ex-info "unsupported version" {:version version})))))) (defn decode "A function used for decode persisted blobs in the database." diff --git a/backend/src/app/util/time.clj b/backend/src/app/util/time.clj index 51d0f89dd..febc0f8de 100644 --- a/backend/src/app/util/time.clj +++ b/backend/src/app/util/time.clj @@ -60,7 +60,6 @@ [t1 t2] (Duration/between t1 t2)) - (letfn [(conformer [v] (cond (duration? v) v diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index fe078ba02..76a2ab8c3 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -5,15 +5,17 @@ ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; -;; Copyright (c) 2020 Andrey Antukh +;; Copyright (c) UXBOX Labs SL (ns app.worker "Async tasks abstraction (impl)." (:require + [app.common.data :as d] [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uuid :as uuid] [app.db :as db] + [app.metrics :as mtx] [app.util.async :as aa] [app.util.log4j :refer [update-thread-context!]] [app.util.time :as dt] @@ -35,21 +37,13 @@ ;; Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::name ::us/string) +(s/def ::name keyword?) (s/def ::min-threads ::us/integer) (s/def ::max-threads ::us/integer) (s/def ::idle-timeout ::us/integer) (defmethod ig/pre-init-spec ::executor [_] - (s/keys :opt-un [::min-threads ::max-threads ::idle-timeout ::name])) - -(defmethod ig/prep-key ::executor - [_ cfg] - (merge {:min-threads 0 - :max-threads 256 - :idle-timeout 60000 - :name "worker"} - cfg)) + (s/keys :req-un [::min-threads ::max-threads ::idle-timeout ::name])) (defmethod ig/init-key ::executor [_ {:keys [min-threads max-threads idle-timeout name]}] @@ -57,28 +51,29 @@ (int min-threads) (int idle-timeout)) (.setStopTimeout 500) - (.setName name) + (.setName (d/name name)) (.start))) (defmethod ig/halt-key! ::executor [_ instance] (.stop ^QueuedThreadPool instance)) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Worker ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (declare event-loop-fn) +(declare instrument-tasks) -(s/def ::queue ::us/string) +(s/def ::queue keyword?) (s/def ::parallelism ::us/integer) (s/def ::batch-size ::us/integer) -(s/def ::tasks (s/map-of string? fn?)) +(s/def ::tasks (s/map-of keyword? fn?)) (s/def ::poll-interval ::dt/duration) (defmethod ig/pre-init-spec ::worker [_] (s/keys :req-un [::executor + ::mtx/metrics ::db/pool ::batch-size ::name @@ -88,29 +83,29 @@ (defmethod ig/prep-key ::worker [_ cfg] - (merge {:batch-size 2 - :name "worker" - :poll-interval (dt/duration {:seconds 5}) - :queue "default"} - cfg)) + (d/merge {:batch-size 2 + :name :worker + :poll-interval (dt/duration {:seconds 5}) + :queue :default} + (d/without-nils cfg))) (defmethod ig/init-key ::worker [_ {:keys [pool poll-interval name queue] :as cfg}] - (log/infof "starting worker '%s' on queue '%s'" name queue) - (let [cch (a/chan 1) - poll-ms (inst-ms poll-interval)] + (log/infof "starting worker '%s' on queue '%s'" (d/name name) (d/name queue)) + (let [close-ch (a/chan 1) + poll-ms (inst-ms poll-interval)] (a/go-loop [] - (let [[val port] (a/alts! [cch (event-loop-fn cfg)] :priority true)] + (let [[val port] (a/alts! [close-ch (event-loop-fn cfg)] :priority true)] (cond ;; Terminate the loop if close channel is closed or ;; event-loop-fn returns nil. - (or (= port cch) (nil? val)) - (log/infof "stop condition found; shutdown worker: '%s'" name) + (or (= port close-ch) (nil? val)) + (log/infof "stop condition found; shutdown worker: '%s'" (d/name name)) (db/pool-closed? pool) (do (log/info "worker eventloop is aborted because pool is closed") - (a/close! cch)) + (a/close! close-ch)) (and (instance? java.sql.SQLException val) (contains? #{"08003" "08006" "08001" "08004"} (.getSQLState ^java.sql.SQLException val))) @@ -143,13 +138,55 @@ (reify java.lang.AutoCloseable (close [_] - (a/close! cch))))) + (a/close! close-ch))))) (defmethod ig/halt-key! ::worker [_ instance] (.close ^java.lang.AutoCloseable instance)) +;; --- SUBMIT + +(s/def ::task keyword?) +(s/def ::delay (s/or :int ::us/integer :duration dt/duration?)) +(s/def ::conn some?) +(s/def ::priority ::us/integer) +(s/def ::max-retries ::us/integer) + +(s/def ::submit-options + (s/keys :req [::task ::conn] + :opt [::delay ::queue ::priority ::max-retries])) + +(def ^:private sql:insert-new-task + "insert into task (id, name, props, queue, priority, max_retries, scheduled_at) + values (?, ?, ?, ?, ?, ?, clock_timestamp() + ?) + returning id") + +(defn- extract-props + [options] + (persistent! + (reduce-kv (fn [res k v] + (cond-> res + (not (qualified-keyword? k)) + (assoc! k v))) + (transient {}) + options))) + +(defn submit! + [{:keys [::task ::delay ::queue ::priority ::max-retries ::conn] + :or {delay 0 queue :default priority 100 max-retries 3} + :as options}] + (us/verify ::submit-options options) + (let [duration (dt/duration delay) + interval (db/interval duration) + props (-> options extract-props db/tjson) + id (uuid/next)] + (log/debugf "submit task '%s' to be executed in '%s'" (d/name task) (str duration)) + (db/exec-one! conn [sql:insert-new-task id (d/name task) props (d/name queue) priority max-retries interval]) + id)) + +;; --- RUNNER + (def ^:private sql:mark-as-retry @@ -194,17 +231,18 @@ nil)) (defn- decode-task-row - [{:keys [props] :as row}] + [{:keys [props name] :as row}] (when row (cond-> row - (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props))))) + (db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)) + (string? name) (assoc :name (keyword name))))) (defn- handle-task [tasks {:keys [name] :as item}] (let [task-fn (get tasks name)] (if task-fn (task-fn item) - (log/warnf "no task handler found for '%s'" (pr-str name))) + (log/warnf "no task handler found for '%s'" (d/name name))) {:status :completed :task item})) (defn get-error-context @@ -236,13 +274,14 @@ (defn- run-task [{:keys [tasks]} item] - (try - (log/debugf "started task '%s/%s/%s'" (:name item) (:id item) (:retry-num item)) - (handle-task tasks item) - (catch Exception e - (handle-exception e item)) - (finally - (log/debugf "finished task '%s/%s/%s'" (:name item) (:id item) (:retry-num item))))) + (let [name (d/name (:name item))] + (try + (log/debugf "started task '%s/%s/%s'" name (:id item) (:retry-num item)) + (handle-task tasks item) + (catch Exception e + (handle-exception e item)) + (finally + (log/debugf "finished task '%s/%s/%s'" name (:id item) (:retry-num item)))))) (def sql:select-next-tasks "select * from task as t @@ -256,7 +295,7 @@ (defn- event-loop-fn* [{:keys [pool executor batch-size] :as cfg}] (db/with-atomic [conn pool] - (let [queue (:queue cfg) + (let [queue (name (:queue cfg)) items (->> (db/exec! conn [sql:select-next-tasks queue batch-size]) (map decode-task-row) (seq)) @@ -288,16 +327,16 @@ (declare synchronize-schedule) (s/def ::fn (s/or :var var? :fn fn?)) -(s/def ::id ::us/string) +(s/def ::id keyword?) (s/def ::cron dt/cron?) (s/def ::props (s/nilable map?)) (s/def ::task keyword?) -(s/def ::scheduled-task-spec - (s/keys :req-un [::id ::cron ::task] - :opt-un [::props])) +(s/def ::scheduled-task + (s/keys :req-un [::cron ::task] + :opt-un [::props ::id])) -(s/def ::schedule (s/coll-of (s/nilable ::scheduled-task-spec))) +(s/def ::schedule (s/coll-of (s/nilable ::scheduled-task))) (defmethod ig/pre-init-spec ::scheduler [_] (s/keys :req-un [::executor ::db/pool ::schedule ::tasks])) @@ -307,8 +346,13 @@ (let [scheduler (Executors/newScheduledThreadPool (int 1)) schedule (->> schedule (filter some?) + ;; If id is not defined, use the task as id. + (map (fn [{:keys [id task] :as item}] + (if (some? id) + item + (assoc item :id task)))) (map (fn [{:keys [task] :as item}] - (let [f (get tasks (name task))] + (let [f (get tasks task)] (when-not f (ex/raise :type :internal :code :task-not-found @@ -341,7 +385,8 @@ (defn- synchronize-schedule-item [conn {:keys [id cron]}] - (let [cron (str cron)] + (let [cron (str cron) + id (name id)] (log/infof "initialize scheduled task '%s' (cron: '%s')" id cron) (db/exec-one! conn [sql:upsert-scheduled-task id cron cron]))) @@ -390,3 +435,62 @@ [{:keys [scheduler] :as cfg} {:keys [cron] :as task}] (let [ms (ms-until-valid cron)] (px/schedule! scheduler ms (partial execute-scheduled-task cfg task)))) + +;; --- INSTRUMENTATION + +(defn instrument! + [registry] + (mtx/instrument-vars! + [#'submit!] + {:registry registry + :type :counter + :labels ["name"] + :name "tasks_submit_total" + :help "A counter of task submissions." + :wrap (fn [rootf mobj] + (let [mdata (meta rootf) + origf (::original mdata rootf)] + (with-meta + (fn [conn params] + (let [tname (:name params)] + (mobj :inc [tname]) + (origf conn params))) + {::original origf})))}) + + (mtx/instrument-vars! + [#'app.worker/run-task] + {:registry registry + :type :summary + :quantiles [] + :name "tasks_checkout_timing" + :help "Latency measured between scheduld_at and execution time." + :wrap (fn [rootf mobj] + (let [mdata (meta rootf) + origf (::original mdata rootf)] + (with-meta + (fn [tasks item] + (let [now (inst-ms (dt/now)) + sat (inst-ms (:scheduled-at item))] + (mobj :observe (- now sat)) + (origf tasks item))) + {::original origf})))})) + + +(defmethod ig/pre-init-spec ::registry [_] + (s/keys :req-un [::mtx/metrics ::tasks])) + +(defmethod ig/init-key ::registry + [_ {:keys [metrics tasks]}] + (let [mobj (mtx/create + {:registry (:registry metrics) + :type :summary + :labels ["name"] + :quantiles [] + :name "tasks_timing" + :help "Background task execution timing."})] + (reduce-kv (fn [res k v] + (let [tname (name k)] + (log/debugf "registring task '%s'" tname) + (assoc res k (mtx/wrap-summary v mobj [tname])))) + {} + tasks))) diff --git a/backend/tests/app/tests/helpers.clj b/backend/tests/app/tests/helpers.clj index 2c4584573..7c86307e5 100644 --- a/backend/tests/app/tests/helpers.clj +++ b/backend/tests/app/tests/helpers.clj @@ -13,7 +13,7 @@ [app.common.pages :as cp] [app.common.spec :as us] [app.common.uuid :as uuid] - [app.config :as cfg] + [app.config :as cf] [app.db :as db] [app.main :as main] [app.media] @@ -38,16 +38,12 @@ (def ^:dynamic *system* nil) (def ^:dynamic *pool* nil) -(def config - (merge {:redis-uri "redis://redis/1" - :database-uri "postgresql://postgres/penpot_test" - :storage-fs-directory "/tmp/app/storage" - :migrations-verbose false} - cfg/config)) - (defn state-init [next] - (let [config (-> (main/build-system-config config) + (let [config (-> main/system-config + (assoc-in [:app.msgbus/msgbus :redis-uri] "redis://redis/1") + (assoc-in [:app.db/pool :uri] "postgresql://postgres/penpot_test") + (assoc-in [[:app.main/main :app.storage.fs/backend] :directory] "/tmp/app/storage") (dissoc :app.srepl/server :app.http/server :app.http/router @@ -328,8 +324,10 @@ "Helper for mock app.config/get" [data] (fn - ([key] (get (merge config data) key)) - ([key default] (get (merge config data) key default)))) + ([key] + (get data key (cf/get key))) + ([key default] + (get data key (cf/get key default))))) (defn reset-mock! [m] diff --git a/common/app/common/data.cljc b/common/app/common/data.cljc index f65fd4161..e3e23dfb8 100644 --- a/common/app/common/data.cljc +++ b/common/app/common/data.cljc @@ -401,6 +401,9 @@ (keyword? maybe-keyword) (core/name maybe-keyword) + (string? maybe-keyword) + maybe-keyword + (nil? maybe-keyword) default-value :else diff --git a/common/app/common/exceptions.cljc b/common/app/common/exceptions.cljc index 96782de95..240194489 100644 --- a/common/app/common/exceptions.cljc +++ b/common/app/common/exceptions.cljc @@ -2,7 +2,7 @@ ;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; -;; Copyright (c) 2016 Andrey Antukh +;; Copyright (c) Andrey Antukh (ns app.common.exceptions "A helpers for work with exceptions."