♻️ Refactor http server layer

Make it fully asynchronous.
This commit is contained in:
Andrey Antukh 2022-03-04 18:00:16 +01:00 committed by Andrés Moya
parent a7e79b13f9
commit 1b444a42f2
28 changed files with 615 additions and 630 deletions

View file

@ -18,7 +18,8 @@
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
[promesa.exec :as px]
[yetti.response :as yrs]))
(def ^:private cache-max-age
(dt/duration {:hours 24}))
@ -53,27 +54,25 @@
(case (:type backend)
:db
(p/let [body (sto/get-object-bytes storage obj)]
{:status 200
:headers {"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body body})
(yrs/response :status 200
:body body
:headers {"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}))
:s3
(p/let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})]
{:status 307
:headers {"location" (str url)
"x-host" (cond-> host port (str ":" port))
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body ""})
(yrs/response :status 307
:headers {"location" (str url)
"x-host" (cond-> host port (str ":" port))
"cache-control" (str "max-age=" (inst-ms cache-max-age))}))
:fs
(p/let [purl (u/uri (:assets-path cfg))
purl (u/join purl (sto/object->relative-path obj))]
{:status 204
:headers {"x-accel-redirect" (:path purl)
"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body ""}))))
(yrs/response :status 204
:headers {"x-accel-redirect" (:path purl)
"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))})))))
(defn objects-handler
"Handler that servers storage objects by id."
@ -84,7 +83,7 @@
obj (sto/get-object storage id)]
(if obj
(serve-object cfg obj)
{:status 404 :body ""})))
(yrs/response 404))))
(p/bind p/wrap)
(p/then' respond)
@ -98,7 +97,7 @@
obj (sto/get-object storage (kf mobj))]
(if obj
(serve-object cfg obj)
{:status 404 :body ""})))
(yrs/response 404))))
(defn file-objects-handler
"Handler that serves storage objects by file media id."

View file

@ -15,7 +15,8 @@
[cuerdas.core :as str]
[integrant.core :as ig]
[jsonista.core :as j]
[promesa.exec :as px]))
[promesa.exec :as px]
[yetti.response :as yrs]))
(declare parse-json)
(declare handle-request)
@ -32,7 +33,7 @@
(fn [request respond _]
(let [data (slurp (:body request))]
(px/run! executor #(handle-request cfg data))
(respond {:status 200 :body ""}))))
(respond (yrs/response 200)))))
(defn handle-request
[{:keys [http-client] :as cfg} data]

View file

@ -25,7 +25,9 @@
[fipp.edn :as fpp]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
[promesa.exec :as px]
[yetti.request :as yrq]
[yetti.response :as yrs]))
;; (selmer.parser/cache-off!)
@ -41,11 +43,10 @@
(when-not (authorized? pool request)
(ex/raise :type :authentication
:code :only-admins-allowed))
{:status 200
:headers {"content-type" "text/html"}
:body (-> (io/resource "templates/debug.tmpl")
(tmpl/render {}))})
(yrs/response :status 200
:headers {"content-type" "text/html"}
:body (-> (io/resource "templates/debug.tmpl")
(tmpl/render {}))))
(def sql:retrieve-range-of-changes
@ -61,13 +62,14 @@
:code :enpty-data
:hint "empty response"))
(cond-> {:status 200
:headers {"content-type" "application/transit+json"}
:body body}
(cond-> (yrs/response :status 200
:body body
:headers {"content-type" "application/transit+json"})
(contains? params :download)
(update :headers assoc "content-disposition" "attachment")))
(defn retrieve-file-data
(defn- retrieve-file-data
[{:keys [pool]} {:keys [params] :as request}]
(when-not (authorized? pool request)
(ex/raise :type :authentication
@ -87,7 +89,7 @@
(update :headers assoc "content-type" "application/octet-stream"))
(prepare-response request (some-> data blob/decode))))))
(defn upload-file-data
(defn- upload-file-data
[{:keys [pool]} {:keys [profile-id params] :as request}]
(let [project-id (some-> (profile/retrieve-additional-data pool profile-id) :default-project-id)
data (some-> params :file :tempfile fs/slurp-bytes blob/decode)]
@ -99,10 +101,16 @@
:project-id project-id
:profile-id profile-id
:data data})
{:status 200
:body "OK"})
{:status 500
:body "error"})))
(yrs/response 200 "OK"))
(yrs/response 500 "ERROR"))))
(defn file-data
[cfg request]
(case (yrq/method request)
:get (retrieve-file-data cfg request)
:post (upload-file-data cfg request)
(ex/raise :type :http
:code :method-not-found)))
(defn retrieve-file-changes
[{:keys [pool]} request]
@ -175,12 +183,11 @@
(retrieve-report)
(render-template))]
(if result
{:status 200
:headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}
:body result}
{:status 404
:body "not found"}))))
(yrs/response :status 200
:body result
:headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"})
(yrs/response 404 "not found")))))
(def sql:error-reports
"select id, created_at from server_error_report order by created_at desc limit 100")
@ -192,18 +199,18 @@
:code :only-admins-allowed))
(let [items (db/exec! pool [sql:error-reports])
items (map #(update % :created-at dt/format-instant :rfc1123) items)]
{:status 200
:headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"}
:body (-> (io/resource "templates/error-list.tmpl")
(tmpl/render {:items items}))}))
(yrs/response :status 200
:body (-> (io/resource "templates/error-list.tmpl")
(tmpl/render {:items items}))
:headers {"content-type" "text/html; charset=utf-8"
"x-robots-tag" "noindex"})))
(defn health-check
"Mainly a task that performs a health check."
[{:keys [pool]} _]
(db/with-atomic [conn pool]
(db/exec-one! conn ["select count(*) as count from server_prop;"])
{:status 200 :body "Ok"}))
(yrs/response 200 "OK")))
(defn- wrap-async
[{:keys [executor] :as cfg} f]
@ -219,8 +226,7 @@
[_ cfg]
{:index (wrap-async cfg index)
:health-check (wrap-async cfg health-check)
:retrieve-file-data (wrap-async cfg retrieve-file-data)
:retrieve-file-changes (wrap-async cfg retrieve-file-changes)
:retrieve-error (wrap-async cfg retrieve-error)
:retrieve-error-list (wrap-async cfg retrieve-error-list)
:upload-file-data (wrap-async cfg upload-file-data)})
:file-data (wrap-async cfg file-data)})

View file

@ -13,7 +13,8 @@
[app.util.template :as tmpl]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[pretty-spec.core :as ps]))
[pretty-spec.core :as ps]
[yetti.response :as yrs]))
(defn get-spec-str
[k]
@ -47,8 +48,7 @@
(let [context (prepare-context rpc)]
(if (contains? cf/flags :backend-api-doc)
(fn [_ respond _]
(respond {:status 200
:body (-> (io/resource "api-doc.tmpl")
(tmpl/render context))}))
(respond (yrs/response 200 (-> (io/resource "api-doc.tmpl")
(tmpl/render context)))))
(fn [_ respond _]
(respond {:status 404 :body ""})))))
(respond (yrs/response 404))))))

View file

@ -11,13 +11,15 @@
[app.common.logging :as l]
[app.common.spec :as us]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]))
[cuerdas.core :as str]
[yetti.request :as yrq]
[yetti.response :as yrs]))
(defn- parse-client-ip
[{:keys [headers] :as request}]
(or (some-> (get headers "x-forwarded-for") (str/split ",") first)
(get headers "x-real-ip")
(get request :remote-addr)))
[request]
(or (some-> (yrq/get-header request "x-forwarded-for") (str/split ",") first)
(yrq/get-header request "x-real-ip")
(yrq/remote-addr request)))
(defn get-error-context
[request error]
@ -49,20 +51,19 @@
(defmethod handle-exception :authentication
[err _]
{:status 401 :body (ex-data err)})
(yrs/response 401 (ex-data err)))
(defmethod handle-exception :restriction
[err _]
{:status 400 :body (ex-data err)})
(yrs/response 400 (ex-data err)))
(defmethod handle-exception :validation
[err _]
(let [data (ex-data err)
explain (us/pretty-explain data)]
{:status 400
:body (-> data
(dissoc ::s/problems ::s/value)
(cond-> explain (assoc :explain explain)))}))
(yrs/response 400 (-> data
(dissoc ::s/problems ::s/value)
(cond-> explain (assoc :explain explain))))))
(defmethod handle-exception :assertion
[error request]
@ -71,17 +72,16 @@
(l/error ::l/raw (ex-message error)
::l/context (get-error-context request error)
:cause error)
{:status 500
:body {:type :server-error
:code :assertion
:data (-> edata
(dissoc ::s/problems ::s/value ::s/spec)
(cond-> explain (assoc :explain explain)))}}))
(yrs/response :status 500
:body {:type :server-error
:code :assertion
:data (-> edata
(dissoc ::s/problems ::s/value ::s/spec)
(cond-> explain (assoc :explain explain)))})))
(defmethod handle-exception :not-found
[err _]
{:status 404 :body (ex-data err)})
(yrs/response 404 (ex-data err)))
(defmethod handle-exception :default
[error request]
@ -98,11 +98,10 @@
(l/error ::l/raw (ex-message error)
::l/context (get-error-context request error)
:cause error)
{:status 500
:body {:type :server-error
:code :unexpected
:hint (ex-message error)
:data edata}}))))
(yrs/response 500 {:type :server-error
:code :unexpected
:hint (ex-message error)
:data edata})))))
(defmethod handle-exception org.postgresql.util.PSQLException
[error request]
@ -112,23 +111,20 @@
:cause error)
(cond
(= state "57014")
{:status 504
:body {:type :server-timeout
:code :statement-timeout
:hint (ex-message error)}}
(yrs/response 504 {:type :server-timeout
:code :statement-timeout
:hint (ex-message error)})
(= state "25P03")
{:status 504
:body {:type :server-timeout
:code :idle-in-transaction-timeout
:hint (ex-message error)}}
(yrs/response 504 {:type :server-timeout
:code :idle-in-transaction-timeout
:hint (ex-message error)})
:else
{:status 500
:body {:type :server-error
:code :psql-exception
:hint (ex-message error)
:state state}})))
(yrs/response 500 {:type :server-error
:code :psql-exception
:hint (ex-message error)
:state state}))))
(defn handle
[error req]

View file

@ -7,7 +7,6 @@
(ns app.http.feedback
"A general purpose feedback module."
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.config :as cf]
@ -18,7 +17,9 @@
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
[promesa.exec :as px]
[yetti.request :as yrq]
[yetti.response :as yrs]))
(declare ^:private send-feedback)
(declare ^:private handler)
@ -42,9 +43,8 @@
(defn- handler
[{:keys [pool] :as cfg} {:keys [profile-id] :as request}]
(let [ftoken (cf/get :feedback-token ::no-token)
token (get-in request [:headers "x-feedback-token"])
params (d/merge (:params request)
(:body-params request))]
token (yrq/get-header request "x-feedback-token")
params (::yrq/params request)]
(cond
(uuid? profile-id)
(let [profile (profile/retrieve-profile-data pool profile-id)
@ -54,7 +54,7 @@
(= token ftoken)
(send-feedback cfg nil params))
{:status 204 :body ""}))
(yrs/response 204)))
(s/def ::content ::us/string)
(s/def ::from ::us/email)

View file

@ -10,49 +10,41 @@
[app.common.transit :as t]
[app.config :as cf]
[app.util.json :as json]
[ring.core.protocols :as rp]
[ring.middleware.cookies :refer [wrap-cookies]]
[ring.middleware.keyword-params :refer [wrap-keyword-params]]
[ring.middleware.multipart-params :refer [wrap-multipart-params]]
[ring.middleware.params :refer [wrap-params]]
[yetti.adapter :as yt]))
[cuerdas.core :as str]
[yetti.adapter :as yt]
[yetti.middleware :as ymw]
[yetti.request :as yrq]
[yetti.response :as yrs])
(:import java.io.OutputStream))
(defn wrap-server-timing
(def server-timing
{:name ::server-timing
:compile (constantly ymw/wrap-server-timing)})
(def params
{:name ::params
:compile (constantly ymw/wrap-params)})
(defn wrap-parse-request
[handler]
(letfn [(get-age [start]
(float (/ (- (System/nanoTime) start) 1000000000)))
(letfn [(process-request [request]
(let [header (yrq/get-header request "content-type")]
(cond
(str/starts-with? header "application/transit+json")
(with-open [is (-> request yrq/body yrq/body-stream)]
(let [params (t/read! (t/reader is))]
(-> request
(assoc :body-params params)
(update :params merge params))))
(update-headers [headers start]
(assoc headers "Server-Timing" (str "total;dur=" (get-age start))))]
(fn [request respond raise]
(let [start (System/nanoTime)]
(handler request #(respond (update % :headers update-headers start)) raise)))))
(defn wrap-parse-request-body
[handler]
(letfn [(parse-transit [body]
(let [reader (t/reader body)]
(t/read! reader)))
(parse-json [body]
(json/read body))
(handle-request [{:keys [headers body] :as request}]
(let [ctype (get headers "content-type")]
(case ctype
"application/transit+json"
(let [params (parse-transit body)]
(-> request
(assoc :body-params params)
(update :params merge params)))
"application/json"
(let [params (parse-json body)]
(-> request
(assoc :body-params params)
(update :params merge params)))
(str/starts-with? header "application/json")
(with-open [is (-> request yrq/body yrq/body-stream)]
(let [params (json/read is)]
(-> request
(assoc :body-params params)
(update :params merge params))))
:else
request)))
(handle-exception [cause]
@ -60,20 +52,20 @@
:code :unable-to-parse-request-body
:hint "malformed params"}]
(l/error :hint (ex-message cause) :cause cause)
{:status 400
:headers {"content-type" "application/transit+json"}
:body (t/encode-str data {:type :json-verbose})}))]
(yrs/response :status 400
:headers {"content-type" "application/transit+json"}
:body (t/encode-str data {:type :json-verbose}))))]
(fn [request respond raise]
(try
(let [request (handle-request request)]
(let [request (process-request request)]
(handler request respond raise))
(catch Exception cause
(respond (handle-exception cause)))))))
(def parse-request-body
{:name ::parse-request-body
:compile (constantly wrap-parse-request-body)})
(def parse-request
{:name ::parse-request
:compile (constantly wrap-parse-request)})
(defn buffered-output-stream
"Returns a buffered output stream that ignores flush calls. This is
@ -87,56 +79,51 @@
(proxy-super flush)
(proxy-super close))))
(def ^:const buffer-size (:http/output-buffer-size yt/base-defaults))
(def ^:const buffer-size (:xnio/buffer-size yt/defaults))
(defn wrap-format-response-body
(defn wrap-format-response
[handler]
(letfn [(transit-streamable-body [data opts]
(reify rp/StreamableResponseBody
(write-body-to-stream [_ _ output-stream]
;; Use the same buffer as jetty output buffer size
(reify yrs/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
(try
(with-open [bos (buffered-output-stream output-stream buffer-size)]
(let [tw (t/writer bos opts)]
(t/write! tw data)))
(catch org.eclipse.jetty.io.EofException _cause
(catch java.io.IOException _cause
;; Do nothing, EOF means client closes connection abruptly
nil)
(catch Throwable cause
(l/warn :hint "unexpected error on encoding response"
:cause cause))))))
:cause cause))
(finally
(.close ^OutputStream output-stream))))))
(impl-format-response-body [response {:keys [query-params] :as request}]
(let [body (:body response)
opts {:type (if (contains? query-params "transit_verbose") :json-verbose :json)}]
(cond
(:ws response)
response
(coll? body)
(-> response
(update :headers assoc "content-type" "application/transit+json")
(assoc :body (transit-streamable-body body opts)))
(nil? body)
(assoc response :status 204 :body "")
:else
(format-response [response request]
(let [body (yrs/body response)]
(if (coll? body)
(let [qs (yrq/query request)
opts {:type (if (str/includes? qs "verbose") :json-verbose :json)}]
(-> response
(update :headers assoc "content-type" "application/transit+json")
(assoc :body (transit-streamable-body body opts))))
response)))
(handle-response [response request]
(process-response [response request]
(cond-> response
(map? response) (impl-format-response-body request)))]
(map? response) (format-response request)))]
(fn [request respond raise]
(handler request
(fn [response]
(respond (handle-response response request)))
(let [response (process-response response request)]
(respond response)))
raise))))
(def format-response-body
{:name ::format-response-body
:compile (constantly wrap-format-response-body)})
(def format-response
{:name ::format-response
:compile (constantly wrap-format-response)})
(defn wrap-errors
[handler on-error]
@ -148,51 +135,46 @@
{:name ::errors
:compile (constantly wrap-errors)})
(def cookies
{:name ::cookies
:compile (constantly wrap-cookies)})
(def params
{:name ::params
:compile (constantly wrap-params)})
(def multipart-params
{:name ::multipart-params
:compile (constantly wrap-multipart-params)})
(def keyword-params
{:name ::keyword-params
:compile (constantly wrap-keyword-params)})
(def server-timing
{:name ::server-timing
:compile (constantly wrap-server-timing)})
(defn wrap-cors
[handler]
(if-not (contains? cf/flags :cors)
handler
(letfn [(add-cors-headers [response request]
(-> response
(update
:headers
(fn [headers]
(-> headers
(assoc "access-control-allow-origin" (get-in request [:headers "origin"]))
(assoc "access-control-allow-methods" "GET,POST,DELETE,OPTIONS,PUT,HEAD,PATCH")
(assoc "access-control-allow-credentials" "true")
(assoc "access-control-expose-headers" "x-requested-with, content-type, cookie")
(assoc "access-control-allow-headers" "x-frontend-version, content-type, accept, x-requested-width"))))))]
(letfn [(add-headers [headers request]
(let [origin (yrq/get-header request "origin")]
(-> headers
(assoc "access-control-allow-origin" origin)
(assoc "access-control-allow-methods" "GET,POST,DELETE,OPTIONS,PUT,HEAD,PATCH")
(assoc "access-control-allow-credentials" "true")
(assoc "access-control-expose-headers" "x-requested-with, content-type, cookie")
(assoc "access-control-allow-headers" "x-frontend-version, content-type, accept, x-requested-width"))))
(update-response [response request]
(update response :headers add-headers request))]
(fn [request respond raise]
(if (= (:request-method request) :options)
(-> {:status 200 :body ""}
(add-cors-headers request)
(if (= (yrq/method request) :options)
(-> (yrs/response 200)
(update-response request)
(respond))
(handler request
(fn [response]
(respond (add-cors-headers response request)))
(respond (update-response response request)))
raise))))))
(def cors
{:name ::cors
:compile (constantly wrap-cors)})
(defn compile-restrict-methods
[data _]
(when-let [allowed (:allowed-methods data)]
(fn [handler]
(fn [request respond raise]
(let [method (yrq/method request)]
(if (contains? allowed method)
(handler request respond raise)
(respond (yrs/response 405))))))))
(def restrict-methods
{:name ::restrict-methods
:compile compile-restrict-methods})

View file

@ -22,7 +22,8 @@
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
[promesa.exec :as px]
[yetti.response :as yrs]))
(defn- build-redirect-uri
[{:keys [provider] :as cfg}]
@ -175,9 +176,7 @@
(defn- redirect-response
[uri]
{:status 302
:headers {"location" (str uri)}
:body ""})
(yrs/response :status 302 :headers {"location" (str uri)}))
(defn- generate-error-redirect
[cfg error]
@ -233,7 +232,7 @@
:props props
:exp (dt/in-future "15m")})
uri (build-auth-uri cfg state)]
(respond {:status 200 :body {:redirect-uri uri}})))
(respond (yrs/response 200 {:redirect-uri uri}))))
(defn- callback-handler
[cfg request respond _]

View file

@ -19,7 +19,9 @@
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[ring.middleware.session.store :as rss]))
[promesa.core :as p]
[promesa.exec :as px]
[yetti.request :as yrq]))
;; A default cookie name for storing the session. We don't allow to configure it.
(def token-cookie-name "auth-token")
@ -29,65 +31,100 @@
;; prevents using it if some one wants to.
(def authenticated-cookie-name "authenticated")
(deftype DatabaseStore [pool tokens]
rss/SessionStore
(read-session [_ token]
(db/exec-one! pool (sql/select :http-session {:id token})))
(defprotocol ISessionStore
(read-session [store key])
(write-session [store key data])
(delete-session [store key]))
(write-session [_ _ data]
(let [profile-id (:profile-id data)
user-agent (:user-agent data)
token (tokens :generate {:iss "authentication"
:iat (dt/now)
:uid profile-id})
(defn- make-database-store
[{:keys [pool tokens executor]}]
(reify ISessionStore
(read-session [_ token]
(px/with-dispatch executor
(db/exec-one! pool (sql/select :http-session {:id token}))))
now (dt/now)
params {:user-agent user-agent
:profile-id profile-id
:created-at now
:updated-at now
:id token}]
(db/insert! pool :http-session params)
token))
(write-session [_ _ data]
(px/with-dispatch executor
(let [profile-id (:profile-id data)
user-agent (:user-agent data)
token (tokens :generate {:iss "authentication"
:iat (dt/now)
:uid profile-id})
(delete-session [_ token]
(db/delete! pool :http-session {:id token})
nil))
now (dt/now)
params {:user-agent user-agent
:profile-id profile-id
:created-at now
:updated-at now
:id token}]
(db/insert! pool :http-session params)
token)))
(deftype MemoryStore [cache tokens]
rss/SessionStore
(read-session [_ token]
(get @cache token))
(delete-session [_ token]
(px/with-dispatch executor
(db/delete! pool :http-session {:id token})
nil))))
(write-session [_ _ data]
(let [profile-id (:profile-id data)
user-agent (:user-agent data)
token (tokens :generate {:iss "authentication"
:iat (dt/now)
:uid profile-id})
params {:user-agent user-agent
:profile-id profile-id
:id token}]
(defn make-inmemory-store
[{:keys [tokens]}]
(let [cache (atom {})]
(reify ISessionStore
(read-session [_ token]
(p/do (get @cache token)))
(swap! cache assoc token params)
token))
(write-session [_ _ data]
(p/do
(let [profile-id (:profile-id data)
user-agent (:user-agent data)
token (tokens :generate {:iss "authentication"
:iat (dt/now)
:uid profile-id})
params {:user-agent user-agent
:profile-id profile-id
:id token}]
(delete-session [_ token]
(swap! cache dissoc token)
nil))
(swap! cache assoc token params)
token)))
(delete-session [_ token]
(p/do
(swap! cache dissoc token)
nil)))))
(s/def ::tokens fn?)
(defmethod ig/pre-init-spec ::store [_]
(s/keys :req-un [::db/pool ::wrk/executor ::tokens]))
(defmethod ig/init-key ::store
[_ {:keys [pool] :as cfg}]
(if (db/read-only? pool)
(make-inmemory-store cfg)
(make-database-store cfg)))
(defmethod ig/halt-key! ::store
[_ _])
;; --- IMPL
(defn- create-session
(defn- create-session!
[store request profile-id]
(let [params {:user-agent (get-in request [:headers "user-agent"])
(let [params {:user-agent (yrq/get-header request "user-agent")
:profile-id profile-id}]
(rss/write-session store nil params)))
(write-session store nil params)))
(defn- delete-session
(defn- delete-session!
[store {:keys [cookies] :as request}]
(when-let [token (get-in cookies [token-cookie-name :value])]
(rss/delete-session store token)))
(delete-session store token)))
(defn- retrieve-session
[store request]
(when-let [cookie (yrq/get-cookie request token-cookie-name)]
(-> (read-session store (:value cookie))
(p/then (fn [session]
(when session
{:session-id (:id session)
:profile-id (:profile-id session)}))))))
(defn- add-cookies
[response token]
@ -114,43 +151,40 @@
(defn- clear-cookies
[response]
(let [authenticated-cookie-domain (cfg/get :authenticated-cookie-domain)]
(assoc response :cookies {token-cookie-name {:path "/"
:value ""
:max-age -1}
authenticated-cookie-name {:domain authenticated-cookie-domain
:path "/"
:value ""
:max-age -1}})))
(assoc response :cookies
{token-cookie-name {:path "/"
:value ""
:max-age -1}
authenticated-cookie-name {:domain authenticated-cookie-domain
:path "/"
:value ""
:max-age -1}})))
;; NOTE: for now the session middleware is synchronous and is
;; processed on jetty threads. This is because of probably a bug on
;; jetty that causes NPE on upgrading connection to websocket from
;; thread not managed by jetty. We probably can fix it running
;; websocket server in different port as standalone service.
(defn- make-middleware
[{:keys [::events-ch store] :as cfg}]
{:name :session-middleware
:wrap (fn [handler]
(fn [request respond raise]
(try
(-> (retrieve-session store request)
(p/then' #(merge request %))
(p/finally (fn [request cause]
(if cause
(raise cause)
(do
(when-let [session-id (:session-id request)]
(a/offer! events-ch session-id))
(handler request respond raise))))))
(catch Throwable cause
(raise cause)))))})
(defn- middleware
[{:keys [::events-ch ::store] :as cfg} handler]
(letfn [(get-session [{:keys [cookies] :as request}]
(if-let [token (get-in cookies [token-cookie-name :value])]
(if-let [{:keys [id profile-id]} (rss/read-session store token)]
(assoc request :session-id id :profile-id profile-id)
request)
request))]
(fn [request respond raise]
(try
(let [{:keys [session-id profile-id] :as request} (get-session request)]
(when (and session-id profile-id)
(a/offer! events-ch session-id))
(handler request respond raise))
(catch Throwable cause
(raise cause))))))
;; --- STATE INIT: SESSION
(s/def ::tokens fn?)
(s/def ::store #(satisfies? ISessionStore %))
(defmethod ig/pre-init-spec :app.http/session [_]
(s/keys :req-un [::db/pool ::tokens ::wrk/executor]))
(s/keys :req-un [::store]))
(defmethod ig/prep-key :app.http/session
[_ cfg]
@ -158,29 +192,23 @@
(d/without-nils cfg)))
(defmethod ig/init-key :app.http/session
[_ {:keys [pool tokens] :as cfg}]
[_ {:keys [store] :as cfg}]
(let [events-ch (a/chan (a/dropping-buffer (:buffer-size cfg)))
store (if (db/read-only? pool)
(->MemoryStore (atom {}) tokens)
(->DatabaseStore pool tokens))
cfg (assoc cfg ::store store ::events-ch events-ch)]
(when (db/read-only? pool)
(l/warn :hint "sessions module initialized with in-memory store"))
cfg (assoc cfg ::events-ch events-ch)]
(-> cfg
(assoc :middleware (partial middleware cfg))
(assoc :middleware (make-middleware cfg))
(assoc :create (fn [profile-id]
(fn [request response]
(let [token (create-session store request profile-id)]
(p/let [token (create-session! store request profile-id)]
(add-cookies response token)))))
(assoc :delete (fn [request response]
(delete-session store request)
(-> response
(assoc :status 204)
(assoc :body "")
(clear-cookies)))))))
(p/do
(delete-session! store request)
(-> response
(assoc :status 204)
(assoc :body nil)
(clear-cookies))))))))
(defmethod ig/halt-key! :app.http/session
[_ data]