♻️ Refactor backend to be more async friendly

This commit is contained in:
Andrey Antukh 2022-02-28 17:15:58 +01:00 committed by Alonso Torres
parent 087d896569
commit 9e4a50fb15
49 changed files with 1503 additions and 1378 deletions

View file

@ -13,12 +13,12 @@
[app.db :as db]
[app.metrics :as mtx]
[app.storage :as sto]
[app.util.async :as async]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]))
[promesa.core :as p]
[promesa.exec :as px]))
(def ^:private cache-max-age
(dt/duration {:hours 24}))
@ -35,27 +35,31 @@
res))
(defn- get-file-media-object
[{:keys [pool] :as storage} id]
(let [id (coerce-id id)
mobj (db/exec-one! pool ["select * from file_media_object where id=?" id])]
(when-not mobj
(ex/raise :type :not-found
:hint "object does not found"))
mobj))
[{:keys [pool executor] :as storage} id]
(px/with-dispatch executor
(let [id (coerce-id id)
mobj (db/exec-one! pool ["select * from file_media_object where id=?" id])]
(when-not mobj
(ex/raise :type :not-found
:hint "object does not found"))
mobj)))
(defn- serve-object
"Helper function that returns the appropriate responde depending on
the storage object backend type."
[{:keys [storage] :as cfg} obj]
(let [mdata (meta obj)
backend (sto/resolve-backend storage (:backend obj))]
(case (:type backend)
:db
{:status 200
:headers {"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body (sto/get-object-bytes storage obj)}
(p/let [body (sto/get-object-bytes storage obj)]
{:status 200
:headers {"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body body})
:s3
(let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})]
(p/let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})]
{:status 307
:headers {"location" (str url)
"x-host" (cond-> host port (str ":" port))
@ -63,43 +67,49 @@
:body ""})
:fs
(let [purl (u/uri (:assets-path cfg))
purl (u/join purl (sto/object->relative-path obj))]
(p/let [purl (u/uri (:assets-path cfg))
purl (u/join purl (sto/object->relative-path obj))]
{:status 204
:headers {"x-accel-redirect" (:path purl)
"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body ""}))))
(defn- generic-handler
[{:keys [storage executor] :as cfg} request kf]
(async/with-dispatch executor
(let [id (get-in request [:path-params :id])
mobj (get-file-media-object storage id)
obj (sto/get-object storage (kf mobj))]
(if obj
(serve-object cfg obj)
{:status 404 :body ""}))))
(defn objects-handler
"Handler that servers storage objects by id."
[{:keys [storage executor] :as cfg} request respond raise]
(-> (async/with-dispatch executor
(let [id (get-in request [:path-params :id])
id (coerce-id id)
obj (sto/get-object storage id)]
(-> (px/with-dispatch executor
(p/let [id (get-in request [:path-params :id])
id (coerce-id id)
obj (sto/get-object storage id)]
(if obj
(serve-object cfg obj)
{:status 404 :body ""})))
(p/then respond)
(p/bind p/wrap)
(p/then' respond)
(p/catch raise)))
(defn- generic-handler
"A generic handler helper/common code for file-media based handlers."
[{:keys [storage] :as cfg} request kf]
(p/let [id (get-in request [:path-params :id])
mobj (get-file-media-object storage id)
obj (sto/get-object storage (kf mobj))]
(if obj
(serve-object cfg obj)
{:status 404 :body ""})))
(defn file-objects-handler
"Handler that serves storage objects by file media id."
[cfg request respond raise]
(-> (generic-handler cfg request :media-id)
(p/then respond)
(p/catch raise)))
(defn file-thumbnails-handler
"Handler that serves storage objects by thumbnail-id and quick
fallback to file-media-id if no thumbnail is available."
[cfg request respond raise]
(-> (generic-handler cfg request #(or (:thumbnail-id %) (:media-id %)))
(p/then respond)

View file

@ -11,45 +11,53 @@
[app.common.logging :as l]
[app.db :as db]
[app.db.sql :as sql]
[app.util.http :as http]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[jsonista.core :as j]))
[jsonista.core :as j]
[promesa.exec :as px]))
(declare parse-json)
(declare handle-request)
(declare parse-notification)
(declare process-report)
(s/def ::http-client fn?)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool]))
(s/keys :req-un [::db/pool ::http-client]))
(defmethod ig/init-key ::handler
[_ cfg]
[_ {:keys [executor] :as cfg}]
(fn [request respond _]
(try
(let [body (parse-json (slurp (:body request)))
mtype (get body "Type")]
(cond
(= mtype "SubscriptionConfirmation")
(let [surl (get body "SubscribeURL")
stopic (get body "TopicArn")]
(l/info :action "subscription received" :topic stopic :url surl)
(http/send! {:uri surl :method :post :timeout 10000}))
(let [data (slurp (:body request))]
(px/run! executor #(handle-request cfg data))
(respond {:status 200 :body ""}))))
(= mtype "Notification")
(when-let [message (parse-json (get body "Message"))]
(let [notification (parse-notification cfg message)]
(process-report cfg notification)))
(defn handle-request
[{:keys [http-client] :as cfg} data]
(try
(let [body (parse-json data)
mtype (get body "Type")]
(cond
(= mtype "SubscriptionConfirmation")
(let [surl (get body "SubscribeURL")
stopic (get body "TopicArn")]
(l/info :action "subscription received" :topic stopic :url surl)
(http-client {:uri surl :method :post :timeout 10000} {:sync? true}))
:else
(l/warn :hint "unexpected data received"
:report (pr-str body))))
(catch Throwable cause
(l/error :hint "unexpected exception on awsns handler"
:cause cause)))
(= mtype "Notification")
(when-let [message (parse-json (get body "Message"))]
(let [notification (parse-notification cfg message)]
(process-report cfg notification)))
(respond {:status 200 :body ""})))
:else
(l/warn :hint "unexpected data received"
:report (pr-str body))))
(catch Throwable cause
(l/error :hint "unexpected exception on awsns"
:cause cause))))
(defn- parse-bounce
[data]

View file

@ -0,0 +1,30 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.http.client
"Http client abstraction layer."
(:require
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[java-http-clj.core :as http]))
(defmethod ig/pre-init-spec :app.http/client [_]
(s/keys :req-un [::wrk/executor]))
(defmethod ig/init-key :app.http/client
[_ {:keys [executor] :as cfg}]
(let [client (http/build-client {:executor executor
:connect-timeout 30000 ;; 10s
:follow-redirects :always})]
(with-meta
(fn send
([req] (send req {}))
([req {:keys [response-type sync?] :or {response-type :string sync? false}}]
(if sync?
(http/send req {:client client :as response-type})
(http/send-async req {:client client :as response-type}))))
{::client client})))

View file

@ -14,7 +14,6 @@
[app.db :as db]
[app.rpc.mutations.files :as m.files]
[app.rpc.queries.profile :as profile]
[app.util.async :as async]
[app.util.blob :as blob]
[app.util.template :as tmpl]
[app.util.time :as dt]
@ -25,7 +24,8 @@
[datoteka.core :as fs]
[fipp.edn :as fpp]
[integrant.core :as ig]
[promesa.core :as p]))
[promesa.core :as p]
[promesa.exec :as px]))
;; (selmer.parser/cache-off!)
@ -208,8 +208,7 @@
(defn- wrap-async
[{:keys [executor] :as cfg} f]
(fn [request respond raise]
(-> (async/with-dispatch executor
(f cfg request))
(-> (px/submit! executor #(f cfg request))
(p/then respond)
(p/catch raise))))

View file

@ -15,17 +15,15 @@
[app.db :as db]
[app.loggers.audit :as audit]
[app.rpc.queries.profile :as profile]
[app.util.http :as http]
[app.util.json :as json]
[app.util.time :as dt]
[clojure.data.json :as json]
[clojure.set :as set]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
;; TODO: make it fully async (?)
(defn- build-redirect-uri
[{:keys [provider] :as cfg}]
(let [public (u/uri (:public-uri cfg))]
@ -43,27 +41,6 @@
(assoc :query query)
(str))))
(defn retrieve-access-token
[{:keys [provider] :as cfg} code]
(try
(let [params {:client_id (:client-id provider)
:client_secret (:client-secret provider)
:code code
:grant_type "authorization_code"
:redirect_uri (build-redirect-uri cfg)}
req {:method :post
:headers {"content-type" "application/x-www-form-urlencoded"}
:uri (:token-uri provider)
:body (u/map->query-string params)}
res (http/send! req)]
(when (= 200 (:status res))
(let [data (json/read-str (:body res))]
{:token (get data "access_token")
:type (get data "token_type")})))
(catch Exception e
(l/warn :hint "unexpected error on retrieve-access-token" :cause e)
nil)))
(defn- qualify-props
[provider props]
(reduce-kv (fn [result k v]
@ -71,25 +48,56 @@
{}
props))
(defn- retrieve-user-info
[{:keys [provider] :as cfg} tdata]
(try
(let [req {:uri (:user-uri provider)
:headers {"Authorization" (str (:type tdata) " " (:token tdata))}
:timeout 6000
:method :get}
res (http/send! req)]
(defn retrieve-access-token
[{:keys [provider http-client] :as cfg} code]
(let [params {:client_id (:client-id provider)
:client_secret (:client-secret provider)
:code code
:grant_type "authorization_code"
:redirect_uri (build-redirect-uri cfg)}
req {:method :post
:headers {"content-type" "application/x-www-form-urlencoded"}
:uri (:token-uri provider)
:body (u/map->query-string params)}]
(p/then
(http-client req)
(fn [{:keys [status body] :as res}]
(if (= status 200)
(let [data (json/read body)]
{:token (get data :access_token)
:type (get data :token_type)})
(ex/raise :type :internal
:code :unable-to-retrieve-token
::http-status status
::http-body body))))))
(when (= 200 (:status res))
(let [info (json/read-str (:body res) :key-fn keyword)]
{:backend (:name provider)
:email (:email info)
:fullname (:name info)
:props (->> (dissoc info :name :email)
(qualify-props provider))})))
(catch Exception e
(l/warn :hint "unexpected exception on retrieve-user-info" :cause e)
nil)))
(defn- retrieve-user-info
[{:keys [provider http-client] :as cfg} tdata]
(p/then
(http-client {:uri (:user-uri provider)
:headers {"Authorization" (str (:type tdata) " " (:token tdata))}
:timeout 6000
:method :get})
(fn [{:keys [status body] :as res}]
(if (= 200 status)
(let [info (json/read body)
info {:backend (:name provider)
:email (get info :email)
:fullname (get info :name)
:props (->> (dissoc info :name :email)
(qualify-props provider))}]
(when-not (s/valid? ::info info)
(l/warn :hint "received incomplete profile info object (please set correct scopes)"
:info (pr-str info))
(ex/raise :type :internal
:code :unable-to-auth
:hint "no user info"))
info)
(ex/raise :type :internal
:code :unable-to-retrieve-user-info
::http-status status
::http-body body)))))
(s/def ::backend ::us/not-empty-string)
(s/def ::email ::us/not-empty-string)
@ -104,45 +112,44 @@
(defn retrieve-info
[{:keys [tokens provider] :as cfg} request]
(let [state (get-in request [:params :state])
state (tokens :verify {:token state :iss :oauth})
info (some->> (get-in request [:params :code])
(retrieve-access-token cfg)
(retrieve-user-info cfg))]
(letfn [(validate-oidc [info]
;; If the provider is OIDC, we can proceed to check
;; roles if they are defined.
(when (and (= "oidc" (:name provider))
(seq (:roles provider)))
(let [provider-roles (into #{} (:roles provider))
profile-roles (let [attr (cf/get :oidc-roles-attr :roles)
roles (get info attr)]
(cond
(string? roles) (into #{} (str/words roles))
(vector? roles) (into #{} roles)
:else #{}))]
(when-not (s/valid? ::info info)
(l/warn :hint "received incomplete profile info object (please set correct scopes)"
:info (pr-str info))
(ex/raise :type :internal
:code :unable-to-auth
:hint "no user info"))
;; check if profile has a configured set of roles
(when-not (set/subset? provider-roles profile-roles)
(ex/raise :type :internal
:code :unable-to-auth
:hint "not enough permissions"))))
info)
;; If the provider is OIDC, we can proceed to check
;; roles if they are defined.
(when (and (= "oidc" (:name provider))
(seq (:roles provider)))
(let [provider-roles (into #{} (:roles provider))
profile-roles (let [attr (cf/get :oidc-roles-attr :roles)
roles (get info attr)]
(cond
(string? roles) (into #{} (str/words roles))
(vector? roles) (into #{} roles)
:else #{}))]
(post-process [state info]
(cond-> info
(some? (:invitation-token state))
(assoc :invitation-token (:invitation-token state))
;; check if profile has a configured set of roles
(when-not (set/subset? provider-roles profile-roles)
(ex/raise :type :internal
:code :unable-to-auth
:hint "not enough permissions"))))
;; If state token comes with props, merge them. The state token
;; props can contain pm_ and utm_ prefixed query params.
(map? (:props state))
(update :props merge (:props state))))]
(cond-> info
(some? (:invitation-token state))
(assoc :invitation-token (:invitation-token state))
;; If state token comes with props, merge them. The state token
;; props can contain pm_ and utm_ prefixed query params.
(map? (:props state))
(update :props merge (:props state)))))
(let [state (get-in request [:params :state])
state (tokens :verify {:token state :iss :oauth})
code (get-in request [:params :code])]
(-> (p/resolved code)
(p/then #(retrieve-access-token cfg %))
(p/then #(retrieve-user-info cfg %))
(p/then' validate-oidc)
(p/then' (partial post-process state))))))
;; --- HTTP HANDLERS
@ -158,12 +165,13 @@
params))
(defn- retrieve-profile
[{:keys [pool] :as cfg} info]
(with-open [conn (db/open pool)]
(some->> (:email info)
(profile/retrieve-profile-data-by-email conn)
(profile/populate-additional-data conn)
(profile/decode-profile-row))))
[{:keys [pool executor] :as cfg} info]
(px/with-dispatch executor
(with-open [conn (db/open pool)]
(some->> (:email info)
(profile/retrieve-profile-data-by-email conn)
(profile/populate-additional-data conn)
(profile/decode-profile-row)))))
(defn- redirect-response
[uri]
@ -202,6 +210,7 @@
(->> (redirect-response uri)
(sxf request)))
(let [info (assoc info
:iss :prepared-register
:is-active true
@ -216,35 +225,30 @@
(redirect-response uri))))
(defn- auth-handler
[{:keys [tokens executor] :as cfg} {:keys [params] :as request} respond _]
(px/run!
executor
(fn []
(let [invitation (:invitation-token params)
props (extract-utm-props params)
state (tokens :generate
{:iss :oauth
:invitation-token invitation
:props props
:exp (dt/in-future "15m")})
uri (build-auth-uri cfg state)]
(respond
{:status 200
:body {:redirect-uri uri}})))))
[{:keys [tokens] :as cfg} {:keys [params] :as request} respond _]
(let [props (extract-utm-props params)
state (tokens :generate
{:iss :oauth
:invitation-token (:invitation-token params)
:props props
:exp (dt/in-future "15m")})
uri (build-auth-uri cfg state)]
(respond {:status 200 :body {:redirect-uri uri}})))
(defn- callback-handler
[{:keys [executor] :as cfg} request respond _]
(px/run!
executor
(fn []
(try
(let [info (retrieve-info cfg request)
profile (retrieve-profile cfg info)]
(respond (generate-redirect cfg request info profile)))
(catch Exception cause
(l/warn :hint "error on oauth process" :cause cause)
(respond (generate-error-redirect cfg cause)))))))
[cfg request respond _]
(letfn [(process-request []
(p/let [info (retrieve-info cfg request)
profile (retrieve-profile cfg info)]
(generate-redirect cfg request info profile)))
(handle-error [cause]
(l/warn :hint "error on oauth process" :cause cause)
(respond (generate-error-redirect cfg cause)))]
(-> (process-request)
(p/then respond)
(p/catch handle-error))))
;; --- INIT
@ -281,10 +285,10 @@
:callback-handler (wrap-handler cfg callback-handler)}))
(defn- discover-oidc-config
[{:keys [base-uri] :as opts}]
[{:keys [http-client]} {:keys [base-uri] :as opts}]
(let [discovery-uri (u/join base-uri ".well-known/openid-configuration")
response (ex/try (http/send! {:method :get :uri (str discovery-uri)}))]
response (ex/try (http-client {:method :get :uri (str discovery-uri)} {:sync? true}))]
(cond
(ex/exception? response)
(do
@ -294,10 +298,10 @@
nil)
(= 200 (:status response))
(let [data (json/read-str (:body response))]
{:token-uri (get data "token_endpoint")
:auth-uri (get data "authorization_endpoint")
:user-uri (get data "userinfo_endpoint")})
(let [data (json/read (:body response))]
{:token-uri (get data :token_endpoint)
:auth-uri (get data :authorization_endpoint)
:user-uri (get data :userinfo_endpoint)})
:else
(do
@ -325,6 +329,7 @@
:roles-attr (cf/get :oidc-roles-attr)
:roles (cf/get :oidc-roles)
:name "oidc"}]
(if (and (string? (:base-uri opts))
(string? (:client-id opts))
(string? (:client-secret opts)))
@ -339,7 +344,7 @@
(assoc-in cfg [:providers "oidc"] opts))
(do
(l/debug :hint "trying to discover oidc provider configuration using BASE_URI")
(if-let [opts' (discover-oidc-config opts)]
(if-let [opts' (discover-oidc-config cfg opts)]
(do
(l/debug :hint "discovered opts" :additional-opts opts')
(assoc-in cfg [:providers "oidc"] (merge opts opts')))

View file

@ -89,16 +89,6 @@
(when-let [token (get-in cookies [token-cookie-name :value])]
(rss/delete-session store token)))
(defn- retrieve-session
[store token]
(when token
(rss/read-session store token)))
(defn- retrieve-from-request
[store {:keys [cookies] :as request}]
(->> (get-in cookies [token-cookie-name :value])
(retrieve-session store)))
(defn- add-cookies
[response token]
(let [cors? (contains? cfg/flags :cors)
@ -132,40 +122,55 @@
:value ""
:max-age -1}})))
;; NOTE: for now the session middleware is synchronous and is
;; processed on jetty threads. This is because of probably a bug on
;; jetty that causes NPE on upgrading connection to websocket from
;; thread not managed by jetty. We probably can fix it running
;; websocket server in different port as standalone service.
(defn- middleware
[events-ch store handler]
(fn [request respond raise]
(if-let [{:keys [id profile-id] :as session} (retrieve-from-request store request)]
(do
(a/>!! events-ch id)
(l/set-context! {:profile-id profile-id})
(handler (assoc request :profile-id profile-id :session-id id) respond raise))
(handler request respond raise))))
[{:keys [::events-ch ::store] :as cfg} handler]
(letfn [(get-session [{:keys [cookies] :as request}]
(if-let [token (get-in cookies [token-cookie-name :value])]
(if-let [{:keys [id profile-id]} (rss/read-session store token)]
(assoc request :session-id id :profile-id profile-id)
request)
request))]
(fn [request respond raise]
(try
(let [{:keys [session-id profile-id] :as request} (get-session request)]
(when (and session-id profile-id)
(a/offer! events-ch session-id))
(handler request respond raise))
(catch Throwable cause
(raise cause))))))
;; --- STATE INIT: SESSION
(s/def ::tokens fn?)
(defmethod ig/pre-init-spec ::session [_]
(s/keys :req-un [::db/pool ::tokens]))
(defmethod ig/pre-init-spec :app.http/session [_]
(s/keys :req-un [::db/pool ::tokens ::wrk/executor]))
(defmethod ig/prep-key ::session
(defmethod ig/prep-key :app.http/session
[_ cfg]
(d/merge {:buffer-size 128}
(d/without-nils cfg)))
(defmethod ig/init-key ::session
(defmethod ig/init-key :app.http/session
[_ {:keys [pool tokens] :as cfg}]
(let [events-ch (a/chan (a/dropping-buffer (:buffer-size cfg)))
store (if (db/read-only? pool)
(->MemoryStore (atom {}) tokens)
(->DatabaseStore pool tokens))]
(->DatabaseStore pool tokens))
cfg (assoc cfg ::store store ::events-ch events-ch)]
(when (db/read-only? pool)
(l/warn :hint "sessions module initialized with in-memory store"))
(-> cfg
(assoc ::events-ch events-ch)
(assoc :middleware (partial middleware events-ch store))
(assoc :middleware (partial middleware cfg))
(assoc :create (fn [profile-id]
(fn [request response]
(let [token (create-session store request profile-id)]
@ -177,11 +182,10 @@
(assoc :body "")
(clear-cookies)))))))
(defmethod ig/halt-key! ::session
(defmethod ig/halt-key! :app.http/session
[_ data]
(a/close! (::events-ch data)))
;; --- STATE INIT: SESSION UPDATER
(declare update-sessions)
@ -192,8 +196,7 @@
(defmethod ig/pre-init-spec ::updater [_]
(s/keys :req-un [::db/pool ::wrk/executor ::mtx/metrics ::session]
:opt-un [::max-batch-age
::max-batch-size]))
:opt-un [::max-batch-age ::max-batch-size]))
(defmethod ig/prep-key ::updater
[_ cfg]