Move vertx as vendor package.

This commit is contained in:
Andrey Antukh 2020-01-13 16:50:44 +01:00
parent 8663f5a136
commit bd5f25eabf
21 changed files with 927 additions and 15 deletions

View file

@ -52,9 +52,9 @@
:name "login-handler"})
echo-handler (rl/ratelimit handlers/echo-handler
{:limit 100
:period 1000
:timeout 1000
{:limit 1
:period 5000
:timeout 10
:name "echo-handler"})
routes [["/sub/:file-id" {:interceptors [(vxi/cookies)
@ -87,4 +87,4 @@
(defstate server
:start (let [factory (vc/verticle {:on-start on-start})]
@(vc/deploy! system factory {:instances 4})))
@(vc/deploy! system factory {:instances 1})))

View file

@ -30,6 +30,12 @@
{:status 400
:body response})))
(defmethod handle-exception :ratelimit
[err req]
{:status 429
:headers {"retry-after" 1000}
:body ""})
(defmethod handle-exception :not-found
[err req]
(let [response (ex-data err)]

View file

@ -75,7 +75,10 @@
(defn echo-handler
[req]
;; (locking echo-handler
;; (prn "echo-handler" (Thread/currentThread)))
{:status 200
:body {:params (:params req)
:cookies (:cookies req)
:headers (:headers req)}})

View file

@ -1,221 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.core
(:require [clojure.spec.alpha :as s]
[promesa.core :as p]
[vertx.eventbus :as vxe]
[vertx.util :as vu])
(:import
io.vertx.core.Context
io.vertx.core.DeploymentOptions
io.vertx.core.Future
io.vertx.core.Handler
io.vertx.core.Verticle
io.vertx.core.Vertx
io.vertx.core.VertxOptions
java.util.function.Supplier))
(declare opts->deployment-options)
(declare opts->vertx-options)
(declare build-verticle)
(declare build-actor)
(declare build-disposable)
;; --- Protocols
(definterface IVerticleFactory)
;; --- Public Api
(s/def :vertx.core$system/threads pos?)
(s/def :vertx.core$system/on-error fn?)
(s/def ::system-options
(s/keys :opt-un [:vertx.core$system/threads
:vertx.core$system/on-error]))
(defn system
"Creates a new vertx actor system instance."
([] (system {}))
([options]
(s/assert ::system-options options)
(let [^VertxOptions opts (opts->vertx-options options)
^Vertx vsm (Vertx/vertx opts)]
(vxe/configure! vsm opts)
vsm)))
(defn get-or-create-context
[vsm]
(.getOrCreateContext ^Vertx (vu/resolve-system vsm)))
(defn current-context
[]
(Vertx/currentContext))
(defn handle-on-context
"Attaches the context (current if not explicitly provided) to the
promise execution chain."
([prm] (handle-on-context prm (current-context)))
([prm ctx]
(let [d (p/deferred)]
(p/finally prm (fn [v e]
(.runOnContext
^Context ctx
^Handler (reify Handler
(handle [_ v']
(if e
(p/reject! d e)
(p/resolve! d v)))))))
d)))
(s/def :vertx.core$verticle/on-start fn?)
(s/def :vertx.core$verticle/on-stop fn?)
(s/def :vertx.core$verticle/on-error fn?)
(s/def ::verticle-options
(s/keys :req-un [:vertx.core$verticle/on-start]
:opt-un [:vertx.core$verticle/on-stop
:vertx.core$verticle/on-error]))
(defn verticle
"Creates a verticle instance (factory)."
[options]
(s/assert ::verticle-options options)
(reify
IVerticleFactory
Supplier
(get [_] (build-verticle options))))
(defn verticle?
"Return `true` if `v` is instance of `IVerticleFactory`."
[v]
(instance? IVerticleFactory v))
(s/def :vertx.core$actor/on-message fn?)
(s/def ::actor-options
(s/keys :req-un [:vertx.core$actor/on-message]
:opt-un [:vertx.core$verticle/on-start
:vertx.core$verticle/on-error
:vertx.core$verticle/on-stop]))
(defn actor
"A shortcut for create a verticle instance (factory) that consumes a
specific topic."
[topic options]
(s/assert string? topic)
(s/assert ::actor-options options)
(reify
IVerticleFactory
Supplier
(get [_] (build-actor topic options))))
(s/def :vertx.core$deploy/instances pos?)
(s/def :vertx.core$deploy/worker boolean?)
(s/def ::deploy-options
(s/keys :opt-un [:vertx.core$deploy/worker
:vertx.core$deploy/instances]))
(defn deploy!
"Deploy a verticle."
([vsm supplier] (deploy! vsm supplier nil))
([vsm supplier options]
(s/assert verticle? supplier)
(s/assert ::deploy-options options)
(let [d (p/deferred)
o (opts->deployment-options options)]
(.deployVerticle ^Vertx vsm
^Supplier supplier
^DeploymentOptions o
^Handler (vu/deferred->handler d))
(p/then' d (fn [id] (build-disposable vsm id))))))
(defn undeploy!
"Undeploy the verticle, this function should be rarelly used because
the easiest way to undeplo is executin the callable returned by
`deploy!` function."
[vsm id]
(s/assert string? id)
(let [d (p/deferred)]
(.undeploy ^Vertx (vu/resolve-system vsm)
^String id
^Handler (vu/deferred->handler d))
d))
;; --- Impl
(defn- build-verticle
[{:keys [on-start on-stop on-error]
:or {on-error (constantly nil)
on-stop (constantly nil)}}]
(let [vsm (volatile! nil)
ctx (volatile! nil)
lst (volatile! nil)]
(reify Verticle
(init [_ instance context]
(vreset! vsm instance)
(vreset! ctx context))
(getVertx [_] @vsm)
(^void start [_ ^Future o]
(-> (p/do! (on-start @ctx))
(p/handle (fn [state error]
(if error
(do
(.fail o error)
(on-error @ctx error))
(do
(when (map? state)
(vswap! lst merge state))
(.complete o)))))))
(^void stop [_ ^Future o]
(p/handle (p/do! (on-stop @ctx @lst))
(fn [_ err]
(if err
(do (on-error err)
(.fail o err))
(.complete o))))))))
(defn- build-actor
[topic {:keys [on-message on-error on-stop on-start]
:or {on-error (constantly nil)
on-start (constantly {})
on-stop (constantly nil)}}]
(letfn [(-on-start [ctx]
(let [state (on-start ctx)
state (if (map? state) state {})
consumer (vxe/consumer ctx topic on-message)]
(assoc state ::consumer consumer)))]
(build-verticle {:on-error on-error
:on-stop on-stop
:on-start -on-start})))
(defn- build-disposable
[vsm id]
(reify
clojure.lang.IDeref
(deref [_] id)
clojure.lang.IFn
(invoke [_] (undeploy! vsm id))
java.io.Closeable
(close [_]
@(undeploy! vsm id))))
(defn- opts->deployment-options
[{:keys [instances worker]}]
(let [opts (DeploymentOptions.)]
(when instances (.setInstances opts (int instances)))
(when worker (.setWorker opts worker))
opts))
(defn- opts->vertx-options
[{:keys [threads on-error]}]
(let [opts (VertxOptions.)]
(when threads (.setEventLoopPoolSize opts (int threads)))
(when on-error (.exceptionHandler (vu/fn->handler on-error)))
opts))

View file

@ -1,122 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.eventbus
(:require [promesa.core :as p]
[vertx.util :as vu])
(:import io.vertx.core.Vertx
io.vertx.core.Handler
io.vertx.core.Context
io.vertx.core.eventbus.Message
io.vertx.core.eventbus.MessageConsumer
io.vertx.core.eventbus.DeliveryOptions
io.vertx.core.eventbus.EventBus
io.vertx.core.eventbus.MessageCodec
java.util.function.Supplier))
(declare opts->delivery-opts)
(declare resolve-eventbus)
(declare build-message-codec)
(declare build-message)
;; --- Public Api
(defn consumer
[vsm topic f]
(let [^EventBus bus (resolve-eventbus vsm)
^MessageConsumer consumer (.consumer bus ^String topic)]
(.handler consumer (reify Handler
(handle [_ msg]
(.pause consumer)
(-> (p/do! (f vsm (build-message msg)))
(p/handle (fn [res err]
(.resume consumer)
(.reply msg (or res err)
(opts->delivery-opts {}))))))))
consumer))
(defn publish!
([vsm topic msg] (publish! vsm topic msg {}))
([vsm topic msg opts]
(let [bus (resolve-eventbus vsm)
opts (opts->delivery-opts opts)]
(.publish ^EventBus bus
^String topic
^Object msg
^DeliveryOptions opts)
nil)))
(defn send!
([vsm topic msg] (send! vsm topic msg {}))
([vsm topic msg opts]
(let [bus (resolve-eventbus vsm)
opts (opts->delivery-opts opts)]
(.send ^EventBus bus
^String topic
^Object msg
^DeliveryOptions opts)
nil)))
(defn request!
([vsm topic msg] (request! vsm topic msg {}))
([vsm topic msg opts]
(let [bus (resolve-eventbus vsm)
opts (opts->delivery-opts opts)
d (p/deferred)]
(.request ^EventBus bus
^String topic
^Object msg
^DeliveryOptions opts
^Handler (vu/deferred->handler d))
(p/then' d build-message))))
(defn configure!
[vsm opts]
(let [^EventBus bus (resolve-eventbus vsm)]
(.registerCodec bus (build-message-codec))))
(defrecord Msg [body])
(defn message?
[v]
(instance? Msg v))
;; --- Impl
(defn- resolve-eventbus
[o]
(cond
(instance? Vertx o) (.eventBus ^Vertx o)
(instance? Context o) (resolve-eventbus (.owner ^Context o))
(instance? EventBus o) o
:else (throw (ex-info "unexpected argument" {}))))
(defn- build-message-codec
[]
;; TODO: implement the wire encode/decode using transit+msgpack
(reify MessageCodec
(encodeToWire [_ buffer data])
(decodeFromWire [_ pos buffer])
(transform [_ data] data)
(name [_] "clj:msgpack")
(^byte systemCodecID [_] (byte -1))))
(defn- build-message
[^Message msg]
(let [metadata {::reply-to (.replyAddress msg)
::send? (.isSend msg)
::address (.address msg)}
body (.body msg)]
(Msg. body metadata nil)))
(defn- opts->delivery-opts
[{:keys [codec local?]}]
(let [^DeliveryOptions opts (DeliveryOptions.)]
(.setCodecName opts (or codec "clj:msgpack"))
(when local? (.setLocalOnly opts true))
opts))

View file

@ -1,152 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.http
"Enables `raw` access to the http facilites of vertx. If you want more
clojure idiomatic api, refer to the `vertx.web` namespace."
(:require [clojure.spec.alpha :as s]
[promesa.core :as p]
[vertx.util :as vu])
(:import
java.util.Map$Entry
clojure.lang.MapEntry
io.vertx.core.Vertx
io.vertx.core.Verticle
io.vertx.core.Handler
io.vertx.core.Future
io.vertx.core.MultiMap
io.vertx.core.Context
io.vertx.core.buffer.Buffer
io.vertx.core.http.HttpServer
io.vertx.core.http.HttpServerRequest
io.vertx.core.http.HttpServerResponse
io.vertx.core.http.HttpServerOptions
io.vertx.core.http.ServerWebSocket))
(declare opts->http-server-options)
(declare resolve-handler)
;; --- Public Api
(declare -handle-response)
(declare -handle-body)
(defn ->headers
[^MultiMap headers]
(let [it (.iterator ^MultiMap headers)]
(loop [m (transient {})]
(if (.hasNext it)
(let [^Map$Entry me (.next it)
key (.toLowerCase (.getKey me))
val (.getValue me)]
(recur (assoc! m key val)))
(persistent! m)))))
(defn- ->request
[^HttpServerRequest request]
{:method (-> request .rawMethod .toLowerCase keyword)
:path (.path request)
:headers (->headers (.headers request))
::request request
::response (.response request)})
(defn handler
[vsm f]
(reify Handler
(handle [this request]
(let [ctx (->request request)]
(-handle-response (f ctx) ctx)))))
(s/def :vertx.http/handler
(s/or :fn fn? :handler #(instance? Handler %)))
(s/def :vertx.http/host string?)
(s/def :vertx.http/port pos?)
(s/def ::server-options
(s/keys :req-un [:vertx.http/handler]
:opt-un [:vertx.http/host
:vertx.http/port]))
(defn server
"Starts a vertx http server."
[vsm {:keys [handler] :as options}]
(s/assert ::server-options options)
(let [^Vertx vsm (vu/resolve-system vsm)
^HttpServerOptions opts (opts->http-server-options options)
^HttpServer srv (.createHttpServer vsm opts)
^Handler handler (resolve-handler handler)]
(doto srv
(.requestHandler handler)
(.listen))
srv))
;; --- Impl
(defn- opts->http-server-options
[{:keys [host port]}]
(let [opts (HttpServerOptions.)]
(.setReuseAddress opts true)
(.setReusePort opts true)
(.setTcpNoDelay opts true)
(.setTcpFastOpen opts true)
(when host (.setHost opts host))
(when port (.setPort opts port))
opts))
(defn- resolve-handler
[handler]
(cond
(fn? handler) (vu/fn->handler handler)
(instance? Handler handler) handler
:else (throw (ex-info "invalid handler" {}))))
(defn- assign-status-and-headers!
[^HttpServerResponse res response]
(let [headers (:headers response)
status (:status response 200)]
(when (map? headers)
(vu/doseq [[key val] headers]
(.putHeader res ^String (name key) ^String (str val))))
(.setStatusCode res status)))
(defprotocol IAsyncResponse
(-handle-response [_ _]))
(defprotocol IAsyncBody
(-handle-body [_ _]))
(extend-protocol IAsyncResponse
java.util.concurrent.CompletionStage
(-handle-response [data ctx]
(p/then' data #(-handle-response % ctx)))
clojure.lang.IPersistentMap
(-handle-response [data ctx]
(let [body (:body data)
res (::response ctx)]
(assign-status-and-headers! res data)
(-handle-body body res)))
nil
(-handle-response [sws ctx]))
(extend-protocol IAsyncBody
(Class/forName "[B")
(-handle-body [data res]
(.end ^HttpServerResponse res (Buffer/buffer data)))
Buffer
(-handle-body [data res]
(.end ^HttpServerResponse res ^Buffer data))
nil
(-handle-body [data res]
(.putHeader ^HttpServerResponse res "content-length" "0")
(.end ^HttpServerResponse res))
String
(-handle-body [data res]
(let [length (count data)]
(.putHeader ^HttpServerResponse res "content-length" (str length))
(.end ^HttpServerResponse res data))))

View file

@ -1,35 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.timers
"The timers and async scheduled tasks."
(:require
[clojure.spec.alpha :as s]
[promesa.core :as p]
[vertx.util :as vu])
(:import
io.vertx.core.Vertx
io.vertx.core.Handler))
(defn schedule-once!
[vsm ms f]
(let [^Vertx system (vu/resolve-system vsm)
^Handler handler (vu/fn->handler (fn [v] (f)))
timer-id (.setTimer system ms handler)]
(reify
java.lang.AutoCloseable
(close [_]
(.cancelTimer system timer-id)))))
(defn sechdule-periodic!
[vsm ms f]
(let [^Vertx system (vu/resolve-system vsm)
^Handler handler (vu/fn->handler (fn [v] (f)))
timer-id (.setPeriodic system ms handler)]
(reify
java.lang.AutoCloseable
(close [_]
(.cancelTimer system timer-id)))))

View file

@ -1,51 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.util
(:refer-clojure :exclude [doseq])
(:require [promesa.core :as p])
(:import io.vertx.core.Vertx
io.vertx.core.Handler
io.vertx.core.Context
io.vertx.core.AsyncResult
java.util.function.Supplier))
(defn resolve-system
[o]
(cond
(instance? Vertx o) o
(instance? Context o) (.owner ^Context o)
:else (throw (ex-info "unexpected parameters" {}))))
(defn fn->supplier
[f]
(reify Supplier
(get [_] (f))))
(defn fn->handler
[f]
(reify Handler
(handle [_ v]
(f v))))
(defn deferred->handler
[d]
(reify Handler
(handle [_ ar]
(if (.failed ar)
(p/reject! d (.cause ar))
(p/resolve! d (.result ar))))))
(defmacro doseq
"A faster version of doseq."
[[bsym csym] & body]
`(let [it# (.iterator ~csym)]
(loop []
(when (.hasNext it#)
(let [~bsym (.next it#)]
~@body
(recur))))))

View file

@ -1,156 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.web
"High level api for http servers."
(:require
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s]
[promesa.core :as p]
[sieppari.core :as sp]
[reitit.core :as rt]
[vertx.http :as vh]
[vertx.util :as vu])
(:import
clojure.lang.IPersistentMap
clojure.lang.Keyword
io.vertx.core.Future
io.vertx.core.Handler
io.vertx.core.Vertx
io.vertx.core.buffer.Buffer
io.vertx.core.http.Cookie
io.vertx.core.http.HttpServer
io.vertx.core.http.HttpServerOptions
io.vertx.core.http.HttpServerRequest
io.vertx.core.http.HttpServerResponse
io.vertx.core.http.ServerWebSocket
io.vertx.ext.web.Route
io.vertx.ext.web.Router
io.vertx.ext.web.RoutingContext
io.vertx.ext.web.handler.BodyHandler
io.vertx.ext.web.handler.LoggerHandler
io.vertx.ext.web.handler.ResponseTimeHandler
io.vertx.ext.web.handler.StaticHandler))
;; --- Public Api
(s/def ::wrap-handler
(s/or :fn fn?
:vec (s/every fn? :kind vector?)))
(defn- ->request
[^RoutingContext routing-context]
(let [^HttpServerRequest request (.request ^RoutingContext routing-context)
^HttpServerResponse response (.response ^RoutingContext routing-context)
^Vertx system (.vertx routing-context)]
{:body (.getBody routing-context)
:path (.path request)
:headers (vh/->headers (.headers request))
:method (-> request .rawMethod .toLowerCase keyword)
::vh/request request
::vh/response response
::execution-context (.getContext system)
::routing-context routing-context}))
(defn handler
"Wraps a user defined funcion based handler into a vertx-web aware
handler (with support for multipart uploads.
If the handler is a vector, the sieppari intercerptos engine will be used
to resolve the execution of the interceptors + handler."
[vsm & handlers]
(let [^Vertx vsm (vu/resolve-system vsm)
^Router router (Router/router vsm)]
(reduce #(%2 %1) router handlers)))
(defn assets
([path] (assets path {}))
([path {:keys [root] :or {root "public"} :as options}]
(fn [^Router router]
(let [^Route route (.route router path)
^Handler handler (doto (StaticHandler/create)
(.setWebRoot root)
(.setDirectoryListing true))]
(.handler route handler)
router))))
(defn- default-handler
[ctx]
(if (::match ctx)
{:status 405}
{:status 404}))
(defn- default-on-error
[err req]
(log/error err)
{:status 500
:body "Internal server error!\n"})
(defn- run-chain
[ctx chain handler]
(let [d (p/deferred)]
(sp/execute (conj chain handler) ctx #(p/resolve! d %) #(p/reject! d %))
d))
(defn- router-handler
[router {:keys [path method] :as ctx}]
(let [{:keys [data path-params] :as match} (rt/match-by-path router path)
handler-fn (or (get data method)
(get data :all)
default-handler)
interceptors (get data :interceptors)
ctx (assoc ctx ::match match :path-params path-params)]
(if (empty? interceptors)
(handler-fn ctx)
(run-chain ctx interceptors handler-fn))))
(defn router
([routes] (router routes {}))
([routes {:keys [delete-uploads?
upload-dir
on-error
log-requests?
time-response?]
:or {delete-uploads? true
upload-dir "/tmp/vertx.uploads"
on-error default-on-error
log-requests? false
time-response? true}
:as options}]
(let [rtr (rt/router routes options)
f #(router-handler rtr %)]
(fn [^Router router]
(let [^Route route (.route router)]
(when time-response? (.handler route (ResponseTimeHandler/create)))
(when log-requests? (.handler route (LoggerHandler/create)))
(doto route
(.failureHandler
(reify Handler
(handle [_ rc]
(let [err (.failure ^RoutingContext rc)
req (.get ^RoutingContext rc "vertx$clj$req")]
(-> (p/do! (on-error err req))
(vh/-handle-response req))))))
(.handler
(doto (BodyHandler/create true)
(.setDeleteUploadedFilesOnEnd delete-uploads?)
(.setUploadsDirectory upload-dir)))
(.handler
(reify Handler
(handle [_ rc]
(let [req (->request rc)
efn (fn [err]
(.put ^RoutingContext rc "vertx$clj$req" req)
(.fail ^RoutingContext rc err))]
(try
(-> (vh/-handle-response (f req) req)
(p/catch' efn))
(catch Exception err
(efn err)))))))))
router))))

View file

@ -1,53 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.web.client
"High level http client."
(:refer-clojure :exclude [get])
(:require
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s]
[promesa.core :as p]
[sieppari.core :as sp]
[reitit.core :as rt]
[vertx.http :as vh]
[vertx.util :as vu])
(:import
clojure.lang.IPersistentMap
clojure.lang.Keyword
io.vertx.core.Future
io.vertx.core.Handler
io.vertx.core.Vertx
io.vertx.core.buffer.Buffer
io.vertx.core.http.HttpMethod
io.vertx.ext.web.client.HttpRequest
io.vertx.ext.web.client.HttpResponse
io.vertx.ext.web.client.WebClientSession
io.vertx.ext.web.client.WebClient))
;; TODO: accept options
(defn create
([vsm] (create vsm {}))
([vsm opts]
(let [^Vertx system (vu/resolve-system vsm)]
(WebClient/create system))))
(defn session
[client]
(WebClientSession/create client))
(defn get
([session url] (get session url {}))
([session url opts]
(let [^HttpRequest req (.getAbs session url)
d (p/deferred)]
(.send req (vu/deferred->handler d))
(p/then d (fn [^HttpResponse res]
{:body (.bodyAsBuffer res)
:status (.statusCode res)
:headers (vh/->headers (.headers res))})))))

View file

@ -1,202 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.web.interceptors
"High level api for http servers."
(:require
[clojure.spec.alpha :as s]
[clojure.string :as str]
[promesa.core :as p]
[reitit.core :as r]
[vertx.http :as vh]
[vertx.web :as vw]
[vertx.util :as vu]
[sieppari.context :as spx]
[sieppari.core :as sp])
(:import
clojure.lang.Keyword
clojure.lang.MapEntry
io.vertx.core.Future
io.vertx.core.Handler
io.vertx.core.MultiMap
io.vertx.core.Vertx
io.vertx.core.http.Cookie
io.vertx.core.http.HttpServerRequest
io.vertx.core.http.HttpServerResponse
io.vertx.ext.web.FileUpload
io.vertx.ext.web.RoutingContext
java.util.Map
java.util.Map$Entry))
;; --- Cookies
(defn- build-cookie
[name data]
(cond-> (Cookie/cookie ^String name ^String (:value data))
(:http-only data) (.setHttpOnly true)
(:domain data) (.setDomain (:domain data))
(:path data) (.setPath (:path data))
(:secure data) (.setSecure true)))
(defn cookies
[]
{:enter
(fn [data]
(let [^HttpServerRequest req (get-in data [:request ::vh/request])
parse-cookie (fn [^Cookie item] [(.getName item) (.getValue item)])
cookies (into {} (map parse-cookie) (vals (.cookieMap req)))]
(update data :request assoc :cookies cookies)))
:leave
(fn [data]
(let [cookies (get-in data [:response :cookies])
^HttpServerResponse res (get-in data [:request ::vh/response])]
(when (map? cookies)
(vu/doseq [[key val] cookies]
(if (nil? val)
(.removeCookie res key)
(.addCookie res (build-cookie key val)))))
data))})
;; --- Params
(defn- parse-params
[^HttpServerRequest request]
(let [params (.params request)
it (.iterator ^MultiMap params)]
(loop [m (transient {})]
(if (.hasNext it)
(let [^Map$Entry o (.next it)
key (keyword (.toLowerCase (.getKey o)))
prv (get m key ::default)
val (.getValue o)]
(cond
(= prv ::default)
(recur (assoc! m key val))
(vector? prv)
(recur (assoc! m key (conj prv val)))
:else
(recur (assoc! m key [prv val]))))
(persistent! m)))))
(defn params
([] (params nil))
([{:keys [attr] :or {attr :params}}]
{:enter (fn [data]
(let [request (get-in data [:request ::vh/request])
params (parse-params request)]
(update data :request assoc attr params)))}))
;; --- Uploads
(defn uploads
([] (uploads nil))
([{:keys [attr] :or {attr :uploads}}]
{:enter (fn [data]
(let [context (get-in data [:request ::vw/routing-context])
uploads (reduce (fn [acc ^FileUpload upload]
(assoc! acc
(keyword (.name upload))
{:type :uploaded-file
:mtype (.contentType upload)
:path (.uploadedFileName upload)
:name (.fileName upload)
:size (.size upload)}))
(transient {})
(.fileUploads ^RoutingContext context))]
(update data :request assoc attr (persistent! uploads))))}))
;; --- Errors
(defn errors
"A error handling interceptor."
[handler-fn]
{:error
(fn [data]
(let [request (:request data)
error (:error data)
response (handler-fn error request)]
(-> data
(assoc :response response)
(dissoc :error))))})
;; --- CORS
(s/def ::origin string?)
(s/def ::allow-credentials boolean?)
(s/def ::allow-methods (s/every keyword? :kind set?))
(s/def ::allow-headers (s/every keyword? :kind set?))
(s/def ::expose-headers (s/every keyword? :kind set?))
(s/def ::max-age number?)
(s/def ::cors-opts
(s/keys :req-un [::origin]
:opt-un [::allow-headers
::allow-methods
::expose-headers
::max-age]))
(defn cors
[opts]
(s/assert ::cors-opts opts)
(letfn [(preflight? [{:keys [method headers] :as ctx}]
(and (= method :options)
(contains? headers "origin")
(contains? headers "access-control-request-method")))
(normalize [data]
(str/join ", " (map name data)))
(allow-origin? [headers]
(let [origin (:origin opts)
value (get headers "origin")]
(cond
(nil? value) value
(= origin "*") origin
(set? origin) (origin value)
(= origin value) origin)))
(get-headers [{:keys [headers] :as ctx}]
(when-let [origin (allow-origin? headers)]
(cond-> {"access-control-allow-origin" origin
"access-control-allow-methods" "GET, OPTIONS, HEAD"}
(:allow-methods opts)
(assoc "access-control-allow-methods"
(-> (normalize (:allow-methods opts))
(str/upper-case)))
(:allow-credentials opts)
(assoc "access-control-allow-credentials" "true")
(:expose-headers opts)
(assoc "access-control-expose-headers"
(-> (normalize (:expose-headers opts))
(str/lower-case)))
(:max-age opts)
(assoc "access-control-max-age" (:max-age opts))
(:allow-headers opts)
(assoc "access-control-allow-headers"
(-> (normalize (:allow-headers opts))
(str/lower-case))))))
(enter [data]
(let [ctx (:request data)]
(if (preflight? ctx)
(spx/terminate (assoc data ::preflight true))
data)))
(leave [data]
(let [headers (get-headers (:request data))]
(if (::preflight data)
(assoc data :response {:status 204 :headers headers})
(update-in data [:response :headers] merge headers))))]
{:enter enter
:leave leave}))