🎉 Add conditional reading to RPC

This commit is contained in:
Andrey Antukh 2022-11-07 16:56:02 +01:00 committed by Andrés Moya
parent 5192b36669
commit fde03e21b0
12 changed files with 242 additions and 90 deletions

View file

@ -16,6 +16,8 @@
[app.metrics :as mtx]
[app.msgbus :as-alias mbus]
[app.rpc.climit :as climit]
[app.rpc.cond :as cond]
[app.rpc.helpers :as rph]
[app.rpc.retry :as retry]
[app.rpc.rlimit :as rlimit]
[app.storage :as-alias sto]
@ -25,6 +27,7 @@
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]
[yetti.request :as yrq]
[yetti.response :as yrs]))
(defn- default-handler
@ -33,23 +36,29 @@
(defn- handle-response-transformation
[response request mdata]
(if-let [transform-fn (::transform-response mdata)]
(p/do (transform-fn request response))
(p/resolved response)))
(let [transform-fn (reduce (fn [res-fn transform-fn]
(fn [request response]
(p/then (res-fn request response) #(transform-fn request %))))
(constantly response)
(::response-transform-fns mdata))]
(transform-fn request response)))
(defn- handle-before-comple-hook
[response mdata]
(when-let [hook-fn (::before-complete mdata)]
(doseq [hook-fn (::before-complete-fns mdata)]
(ex/ignoring (hook-fn)))
response)
(defn- handle-response
[request result]
(let [mdata (meta result)
result (if (sv/wrapped? result) @result result)]
(p/-> (yrs/response 200 result (::http/headers mdata {}))
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata))))
(if (fn? result)
(p/wrap (result request))
(let [mdata (meta result)]
(p/-> (yrs/response {:status (::http/status mdata 200)
:headers (::http/headers mdata {})
:body (rph/unwrap result)})
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata)))))
(defn- rpc-query-handler
"Ring handler that dispatches query requests and convert between
@ -92,18 +101,20 @@
internal async flow into ring async flow."
[methods {:keys [profile-id session-id params] :as request} respond raise]
(let [cmd (keyword (:command params))
data (into {::request request} params)
etag (yrq/get-header request "if-none-match")
data (into {::request request ::cond/key etag} params)
data (if profile-id
(assoc data :profile-id profile-id ::session-id session-id)
(dissoc data :profile-id))
method (get methods cmd default-handler)]
(-> (method data)
(p/then (partial handle-response request))
(p/then respond)
(p/catch (fn [cause]
(let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context))))))))
(binding [cond/*enabled* true]
(-> (method data)
(p/then (partial handle-response request))
(p/then respond)
(p/catch (fn [cause]
(let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context)))))))))
(defn- wrap-metrics
"Wrap service method with metrics measurement."
@ -125,9 +136,9 @@
[{:keys [executor] :as cfg} f mdata]
(with-meta
(fn [cfg params]
(-> (px/submit! executor #(f cfg params))
(p/bind p/wrap)
(p/then' sv/wrap)))
(->> (px/submit! executor (px/wrap-bindings #(f cfg params)))
(p/mapcat p/wrap)
(p/map rph/wrap)))
mdata))
(defn- wrap-audit
@ -161,6 +172,7 @@
[cfg f mdata]
(let [f (as-> f $
(wrap-dispatch cfg $ mdata)
(cond/wrap cfg $ mdata)
(retry/wrap-retry cfg $ mdata)
(wrap-metrics cfg $ mdata)
(climit/wrap cfg $ mdata)