Restructure vertx vendor package.

This commit is contained in:
Andrey Antukh 2020-02-05 23:42:29 +01:00
parent b4790c89ce
commit 3d5e4370e0
9 changed files with 195 additions and 156 deletions

View file

@ -9,7 +9,7 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[promesa.core :as p] [promesa.core :as p]
[vertx.eventbus :as vxe] [vertx.eventbus :as vxe]
[vertx.util :as vu]) [vertx.impl :as impl])
(:import (:import
io.vertx.core.AsyncResult io.vertx.core.AsyncResult
io.vertx.core.Context io.vertx.core.Context
@ -49,92 +49,6 @@
[^Vertx o] [^Vertx o]
(.close o)) (.close o))
(def ^:dynamic *context* nil)
(defn get-or-create-context
[vsm]
(or *context* (.getOrCreateContext ^Vertx (vu/resolve-system vsm))))
(defn current-context
[]
(or *context* (Vertx/currentContext)))
(defmacro blocking
[& body]
(let [sym-vsm (with-meta (gensym "blocking")
{:tag 'io.vertx.core.Vertx})
sym-e (with-meta (gensym "blocking")
{:tag 'java.lang.Throwable})
sym-prm (gensym "blocking")
sym-ar (gensym "blocking")]
`(let [~sym-vsm (-> (current-context)
(vu/resolve-system))
d# (p/deferred)]
(.executeBlocking
~sym-vsm
(reify Handler
(handle [_ ~sym-prm]
(let [prm# ~(with-meta sym-prm {:tag 'io.vertx.core.Promise})]
(try
(.complete prm# (do ~@body))
(catch Throwable ~sym-e
(.fail prm# ~sym-e))))))
true
(reify Handler
(handle [_ ~sym-ar]
(let [ar# ~(with-meta sym-ar {:tag 'io.vertx.core.AsyncResult})]
(if (.failed ar#)
(p/reject! d# (.cause ar#))
(p/resolve! d# (.result ar#)))))))
d#)))
(defn wrap-blocking
([f] (wrap-blocking (current-context) f))
([ctx f]
(let [^Vertx vsm (vu/resolve-system ctx)]
(fn [& args]
(let [d (p/deferred)]
(.executeBlocking
vsm
(reify Handler
(handle [_ prm]
(try
(.complete ^Promise prm (apply f args))
(catch Throwable e
(.fail ^Promise prm e)))))
true
(reify Handler
(handle [_ ar]
(if (.failed ^AsyncResult ar)
(p/reject! d (.cause ^AsyncResult ar))
(p/resolve! d (.result ^AsyncResult ar))))))
d)))))
(defn handle-on-context
"Attaches the context (current if not explicitly provided) to the
promise execution chain."
([prm] (handle-on-context prm (current-context)))
([prm ctx]
(assert (instance? Context ctx) "`ctx` should be a valid Context instance")
(let [d (p/deferred)]
(p/finally prm (fn [v e]
(.runOnContext
^Context ctx
^Handler (reify Handler
(handle [_ v']
(if e
(p/reject! d e)
(p/resolve! d v)))))))
d)))
(defn run-on-context
[ctx f]
(.runOnContext
^Context ctx
^Handler (reify Handler
(handle [_ v']
(f ctx)))))
(s/def :vertx.core$verticle/on-start fn?) (s/def :vertx.core$verticle/on-start fn?)
(s/def :vertx.core$verticle/on-stop fn?) (s/def :vertx.core$verticle/on-stop fn?)
(s/def :vertx.core$verticle/on-error fn?) (s/def :vertx.core$verticle/on-error fn?)
@ -192,7 +106,7 @@
(.deployVerticle ^Vertx vsm (.deployVerticle ^Vertx vsm
^Supplier supplier ^Supplier supplier
^DeploymentOptions o ^DeploymentOptions o
^Handler (vu/deferred->handler d)) ^Handler (impl/deferred->handler d))
(p/then' d (fn [id] (build-disposable vsm id)))))) (p/then' d (fn [id] (build-disposable vsm id))))))
(defn undeploy! (defn undeploy!
@ -202,9 +116,9 @@
[vsm id] [vsm id]
(s/assert string? id) (s/assert string? id)
(let [d (p/deferred)] (let [d (p/deferred)]
(.undeploy ^Vertx (vu/resolve-system vsm) (.undeploy ^Vertx (impl/resolve-system vsm)
^String id ^String id
^Handler (vu/deferred->handler d)) ^Handler (impl/deferred->handler d))
d)) d))
;; --- Impl ;; --- Impl
@ -280,7 +194,7 @@
(let [opts (VertxOptions.)] (let [opts (VertxOptions.)]
(when threads (.setEventLoopPoolSize opts (int threads))) (when threads (.setEventLoopPoolSize opts (int threads)))
(when worker-threads (.setWorkerPoolSize opts (int worker-threads))) (when worker-threads (.setWorkerPoolSize opts (int worker-threads)))
#_(when on-error (.exceptionHandler opts (vu/fn->handler on-error))) #_(when on-error (.exceptionHandler opts (impl/fn->handler on-error)))
opts)) opts))

View file

@ -6,7 +6,7 @@
(ns vertx.eventbus (ns vertx.eventbus
(:require [promesa.core :as p] (:require [promesa.core :as p]
[vertx.util :as vu]) [vertx.impl :as impl])
(:import io.vertx.core.Vertx (:import io.vertx.core.Vertx
io.vertx.core.Handler io.vertx.core.Handler
io.vertx.core.Context io.vertx.core.Context
@ -70,7 +70,7 @@
^String topic ^String topic
^Object msg ^Object msg
^DeliveryOptions opts ^DeliveryOptions opts
^Handler (vu/deferred->handler d)) ^Handler (impl/deferred->handler d))
(p/then' d build-message)))) (p/then' d build-message))))
(defn configure! (defn configure!

View file

@ -9,7 +9,8 @@
clojure idiomatic api, refer to the `vertx.web` namespace." clojure idiomatic api, refer to the `vertx.web` namespace."
(:require [clojure.spec.alpha :as s] (:require [clojure.spec.alpha :as s]
[promesa.core :as p] [promesa.core :as p]
[vertx.util :as vu]) [vertx.util :as util]
[vertx.impl :as impl])
(:import (:import
java.util.Map$Entry java.util.Map$Entry
clojure.lang.MapEntry clojure.lang.MapEntry
@ -73,7 +74,7 @@
"Starts a vertx http server." "Starts a vertx http server."
[vsm {:keys [handler] :as options}] [vsm {:keys [handler] :as options}]
(s/assert ::server-options options) (s/assert ::server-options options)
(let [^Vertx vsm (vu/resolve-system vsm) (let [^Vertx vsm (impl/resolve-system vsm)
^HttpServerOptions opts (opts->http-server-options options) ^HttpServerOptions opts (opts->http-server-options options)
^HttpServer srv (.createHttpServer vsm opts) ^HttpServer srv (.createHttpServer vsm opts)
^Handler handler (resolve-handler handler)] ^Handler handler (resolve-handler handler)]
@ -98,7 +99,7 @@
(defn- resolve-handler (defn- resolve-handler
[handler] [handler]
(cond (cond
(fn? handler) (vu/fn->handler handler) (fn? handler) (impl/fn->handler handler)
(instance? Handler handler) handler (instance? Handler handler) handler
:else (throw (ex-info "invalid handler" {})))) :else (throw (ex-info "invalid handler" {}))))
@ -107,7 +108,7 @@
(let [headers (:headers response) (let [headers (:headers response)
status (:status response 200)] status (:status response 200)]
(when (map? headers) (when (map? headers)
(vu/doseq [[key val] headers] (util/doseq [[key val] headers]
(.putHeader res ^String (name key) ^String (str val)))) (.putHeader res ^String (name key) ^String (str val))))
(.setStatusCode res status))) (.setStatusCode res status)))

55
backend/vendor/vertx/src/vertx/impl.clj vendored Normal file
View file

@ -0,0 +1,55 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns vertx.impl
"Implementation helpers."
(:refer-clojure :exclude [doseq])
(:require [promesa.core :as p])
(:import
io.vertx.core.Vertx
io.vertx.core.Handler
io.vertx.core.Context
io.vertx.core.AsyncResult
java.util.function.Supplier))
(defn resolve-system
[o]
(cond
(instance? Vertx o) o
(instance? Context o) (.owner ^Context o)
:else (throw (ex-info "unexpected parameters" {:o o}))))
(defn fn->supplier
[f]
(reify Supplier
(get [_] (f))))
(defn fn->handler
[f]
(reify Handler
(handle [_ v]
(f v))))
(defn deferred->handler
[d]
(reify Handler
(handle [_ ar]
(if (.failed ^AsyncResult ar)
(p/reject! d (.cause ^AsyncResult ar))
(p/resolve! d (.result ^AsyncResult ar))))))
(defmacro doseq
"A faster version of doseq."
[[bsym csym] & body]
(let [itsym (gensym "iterator")]
`(let [~itsym (.iterator ~(with-meta csym {:tag 'java.lang.Iterable}))]
(loop []
(when (.hasNext ~(with-meta itsym {:tag 'java.util.Iterator}))
(let [~bsym (.next ~itsym)]
~@body
(recur)))))))

View file

@ -9,7 +9,7 @@
(:require (:require
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[promesa.core :as p] [promesa.core :as p]
[vertx.util :as vu]) [vertx.impl :as impl])
(:import (:import
io.vertx.core.Vertx io.vertx.core.Vertx
io.vertx.core.Handler)) io.vertx.core.Handler))
@ -18,8 +18,8 @@
(defn schedule-once! (defn schedule-once!
[vsm ms f] [vsm ms f]
(let [^Vertx system (vu/resolve-system vsm) (let [^Vertx system (impl/resolve-system vsm)
^Handler handler (vu/fn->handler (fn [v] (f))) ^Handler handler (impl/fn->handler (fn [v] (f)))
timer-id (.setTimer system ms handler)] timer-id (.setTimer system ms handler)]
(reify (reify
java.lang.AutoCloseable java.lang.AutoCloseable
@ -28,8 +28,8 @@
(defn sechdule-periodic! (defn sechdule-periodic!
[vsm ms f] [vsm ms f]
(let [^Vertx system (vu/resolve-system vsm) (let [^Vertx system (impl/resolve-system vsm)
^Handler handler (vu/fn->handler (fn [v] (f))) ^Handler handler (impl/fn->handler (fn [v] (f)))
timer-id (.setPeriodic system ms handler)] timer-id (.setPeriodic system ms handler)]
(reify (reify
java.lang.AutoCloseable java.lang.AutoCloseable
@ -54,7 +54,7 @@
(when (and (not once) (not repeat)) (when (and (not once) (not repeat))
(throw (IllegalArgumentException. "you should specify `once` or `repeat` params"))) (throw (IllegalArgumentException. "you should specify `once` or `repeat` params")))
(let [system (vu/resolve-system vsm) (let [system (impl/resolve-system vsm)
state (atom nil) state (atom nil)
taskfn (fn wrapped-task [] taskfn (fn wrapped-task []
(-> (p/do! ((::fn opts) opts)) (-> (p/do! ((::fn opts) opts))

View file

@ -2,49 +2,119 @@
;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; ;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz> ;; Copyright (c) 2019-2020 Andrey Antukh <niwi@niwi.nz>
(ns vertx.util (ns vertx.util
(:refer-clojure :exclude [doseq]) (:refer-clojure :exclude [loop doseq])
(:require [promesa.core :as p]) (:require
(:import io.vertx.core.Vertx [clojure.spec.alpha :as s]
io.vertx.core.Handler [clojure.core :as c]
io.vertx.core.Context [promesa.core :as p]
[vertx.impl :as impl])
(:import
io.vertx.core.AsyncResult io.vertx.core.AsyncResult
java.util.function.Supplier)) io.vertx.core.Context
io.vertx.core.Handler
io.vertx.core.Promise
io.vertx.core.Vertx))
(defn resolve-system (defn get-or-create-context
[o] [vsm]
(cond (.getOrCreateContext ^Vertx (impl/resolve-system vsm)))
(instance? Vertx o) o
(instance? Context o) (.owner ^Context o)
:else (throw (ex-info "unexpected parameters" {}))))
(defn fn->supplier (defn current-context
[f] "Returns the current context or nil."
(reify Supplier []
(get [_] (f)))) (Vertx/currentContext))
(defn fn->handler (defmacro blocking
[f] [& body]
(let [sym-vsm (with-meta (gensym "blocking")
{:tag 'io.vertx.core.Vertx})
sym-e (with-meta (gensym "blocking")
{:tag 'java.lang.Throwable})
sym-prm (gensym "blocking")
sym-ar (gensym "blocking")]
`(let [~sym-vsm (-> (current-context)
(impl/resolve-system))
d# (p/deferred)]
(.executeBlocking
~sym-vsm
(reify Handler (reify Handler
(handle [_ v] (handle [_ ~sym-prm]
(f v)))) (let [prm# ~(with-meta sym-prm {:tag 'io.vertx.core.Promise})]
(try
(.complete prm# (do ~@body))
(catch Throwable ~sym-e
(.fail prm# ~sym-e))))))
true
(reify Handler
(handle [_ ~sym-ar]
(let [ar# ~(with-meta sym-ar {:tag 'io.vertx.core.AsyncResult})]
(if (.failed ar#)
(p/reject! d# (.cause ar#))
(p/resolve! d# (.result ar#)))))))
d#)))
(defn deferred->handler (defn wrap-blocking
[d] ([f] (wrap-blocking (current-context) f))
([ctx f]
(let [^Vertx vsm (impl/resolve-system ctx)]
(fn [& args]
(let [d (p/deferred)]
(.executeBlocking
vsm
(reify Handler
(handle [_ prm]
(try
(.complete ^Promise prm (apply f args))
(catch Throwable e
(.fail ^Promise prm e)))))
true
(reify Handler (reify Handler
(handle [_ ar] (handle [_ ar]
(if (.failed ^AsyncResult ar) (if (.failed ^AsyncResult ar)
(p/reject! d (.cause ^AsyncResult ar)) (p/reject! d (.cause ^AsyncResult ar))
(p/resolve! d (.result ^AsyncResult ar)))))) (p/resolve! d (.result ^AsyncResult ar))))))
d)))))
(defn handle-on-context
"Attaches the context (current if not explicitly provided) to the
promise execution chain."
([prm] (handle-on-context prm (current-context)))
([prm ctx]
(assert (instance? Context ctx) "`ctx` should be a valid Context instance")
(let [d (p/deferred)]
(p/finally prm (fn [v e]
(.runOnContext
^Context ctx
^Handler (reify Handler
(handle [_ v']
(if e
(p/reject! d e)
(p/resolve! d v)))))))
d)))
(defn run-on-context!
"Run callbale on context."
[ctx f]
(.runOnContext ^Context ctx
^Handler (reify Handler
(handle [_ v']
(f)))))
(defmacro loop
[& args]
`(let [ctx# (current-context)]
(binding [p/*loop-run-fn* #(run-on-context! ctx# %)]
(p/loop ~@args))))
(defmacro doseq (defmacro doseq
"A faster version of doseq." "A faster version of doseq."
[[bsym csym] & body] [[bsym csym] & body]
(let [itsym (gensym "iterator")] (let [itsym (gensym "iterator")]
`(let [~itsym (.iterator ~(with-meta csym {:tag 'java.lang.Iterable}))] `(let [~itsym (.iterator ~(with-meta csym {:tag 'java.lang.Iterable}))]
(loop [] (c/loop []
(when (.hasNext ~(with-meta itsym {:tag 'java.util.Iterator})) (when (.hasNext ~(with-meta itsym {:tag 'java.util.Iterator}))
(let [~bsym (.next ~itsym)] (let [~bsym (.next ~itsym)]
~@body ~@body

View file

@ -12,8 +12,8 @@
[promesa.core :as p] [promesa.core :as p]
[sieppari.core :as sp] [sieppari.core :as sp]
[reitit.core :as rt] [reitit.core :as rt]
[vertx.http :as vh] [vertx.http :as http]
[vertx.util :as vu]) [vertx.impl :as impl])
(:import (:import
clojure.lang.IPersistentMap clojure.lang.IPersistentMap
clojure.lang.Keyword clojure.lang.Keyword
@ -48,10 +48,10 @@
^Vertx system (.vertx routing-context)] ^Vertx system (.vertx routing-context)]
{:body (.getBody routing-context) {:body (.getBody routing-context)
:path (.path request) :path (.path request)
:headers (vh/->headers (.headers request)) :headers (http/->headers (.headers request))
:method (-> request .rawMethod .toLowerCase keyword) :method (-> request .rawMethod .toLowerCase keyword)
::vh/request request ::http/request request
::vh/response response ::http/response response
;; ::execution-context (.getContext system) ;; ::execution-context (.getContext system)
::routing-context routing-context})) ::routing-context routing-context}))
@ -62,7 +62,7 @@
If the handler is a vector, the sieppari intercerptos engine will be used If the handler is a vector, the sieppari intercerptos engine will be used
to resolve the execution of the interceptors + handler." to resolve the execution of the interceptors + handler."
[vsm & handlers] [vsm & handlers]
(let [^Vertx vsm (vu/resolve-system vsm) (let [^Vertx vsm (impl/resolve-system vsm)
^Router router (Router/router vsm)] ^Router router (Router/router vsm)]
(reduce #(%2 %1) router handlers))) (reduce #(%2 %1) router handlers)))
@ -134,7 +134,7 @@
(let [err (.failure ^RoutingContext rc) (let [err (.failure ^RoutingContext rc)
req (.get ^RoutingContext rc "vertx$clj$req")] req (.get ^RoutingContext rc "vertx$clj$req")]
(-> (p/do! (on-error err req)) (-> (p/do! (on-error err req))
(vh/-handle-response req)))))) (http/-handle-response req))))))
(.handler (.handler
(doto (BodyHandler/create true) (doto (BodyHandler/create true)
@ -149,7 +149,7 @@
(.put ^RoutingContext rc "vertx$clj$req" req) (.put ^RoutingContext rc "vertx$clj$req" req)
(.fail ^RoutingContext rc ^Throwable err))] (.fail ^RoutingContext rc ^Throwable err))]
(try (try
(-> (vh/-handle-response (f req) req) (-> (http/-handle-response (f req) req)
(p/catch' efn)) (p/catch' efn))
(catch Exception err (catch Exception err
(efn err))))))))) (efn err)))))))))

View file

@ -8,13 +8,12 @@
"High level http client." "High level http client."
(:refer-clojure :exclude [get]) (:refer-clojure :exclude [get])
(:require (:require
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[promesa.core :as p] [promesa.core :as p]
[sieppari.core :as sp] [sieppari.core :as sp]
[reitit.core :as rt] [reitit.core :as rt]
[vertx.http :as vh] [vertx.http :as http]
[vertx.util :as vu]) [vertx.impl :as impl])
(:import (:import
clojure.lang.IPersistentMap clojure.lang.IPersistentMap
clojure.lang.Keyword clojure.lang.Keyword
@ -33,7 +32,7 @@
(defn create (defn create
([vsm] (create vsm {})) ([vsm] (create vsm {}))
([vsm opts] ([vsm opts]
(let [^Vertx system (vu/resolve-system vsm)] (let [^Vertx system (impl/resolve-system vsm)]
(WebClient/create system)))) (WebClient/create system))))
(defn session (defn session
@ -45,9 +44,9 @@
([session url opts] ([session url opts]
(let [^HttpRequest req (.getAbs ^WebClientSession session url) (let [^HttpRequest req (.getAbs ^WebClientSession session url)
d (p/deferred)] d (p/deferred)]
(.send req (vu/deferred->handler d)) (.send req (impl/deferred->handler d))
(p/then d (fn [^HttpResponse res] (p/then d (fn [^HttpResponse res]
{:body (.bodyAsBuffer res) {:body (.bodyAsBuffer res)
:status (.statusCode res) :status (.statusCode res)
:headers (vh/->headers (.headers res))}))))) :headers (http/->headers (.headers res))})))))

View file

@ -11,9 +11,9 @@
[clojure.string :as str] [clojure.string :as str]
[promesa.core :as p] [promesa.core :as p]
[reitit.core :as r] [reitit.core :as r]
[vertx.http :as vh] [vertx.http :as http]
[vertx.web :as vw] [vertx.web :as web]
[vertx.util :as vu] [vertx.util :as util]
[sieppari.context :as spx] [sieppari.context :as spx]
[sieppari.core :as sp]) [sieppari.core :as sp])
(:import (:import
@ -45,16 +45,16 @@
[] []
{:enter {:enter
(fn [data] (fn [data]
(let [^HttpServerRequest req (get-in data [:request ::vh/request]) (let [^HttpServerRequest req (get-in data [:request ::http/request])
parse-cookie (fn [^Cookie item] [(.getName item) (.getValue item)]) parse-cookie (fn [^Cookie item] [(.getName item) (.getValue item)])
cookies (into {} (map parse-cookie) (vals (.cookieMap req)))] cookies (into {} (map parse-cookie) (vals (.cookieMap req)))]
(update data :request assoc :cookies cookies))) (update data :request assoc :cookies cookies)))
:leave :leave
(fn [data] (fn [data]
(let [cookies (get-in data [:response :cookies]) (let [cookies (get-in data [:response :cookies])
^HttpServerResponse res (get-in data [:request ::vh/response])] ^HttpServerResponse res (get-in data [:request ::http/response])]
(when (map? cookies) (when (map? cookies)
(vu/doseq [[key val] cookies] (util/doseq [[key val] cookies]
(if (nil? val) (if (nil? val)
(.removeCookie res key) (.removeCookie res key)
(.addCookie res (build-cookie key val))))) (.addCookie res (build-cookie key val)))))
@ -87,7 +87,7 @@
([] (params nil)) ([] (params nil))
([{:keys [attr] :or {attr :params}}] ([{:keys [attr] :or {attr :params}}]
{:enter (fn [data] {:enter (fn [data]
(let [request (get-in data [:request ::vh/request]) (let [request (get-in data [:request ::http/request])
params (parse-params request)] params (parse-params request)]
(update data :request assoc attr params)))})) (update data :request assoc attr params)))}))
@ -97,7 +97,7 @@
([] (uploads nil)) ([] (uploads nil))
([{:keys [attr] :or {attr :uploads}}] ([{:keys [attr] :or {attr :uploads}}]
{:enter (fn [data] {:enter (fn [data]
(let [context (get-in data [:request ::vw/routing-context]) (let [context (get-in data [:request ::web/routing-context])
uploads (reduce (fn [acc ^FileUpload upload] uploads (reduce (fn [acc ^FileUpload upload]
(assoc! acc (assoc! acc
(keyword (.name upload)) (keyword (.name upload))