Minor improvements on websocket impl.

This commit is contained in:
Andrey Antukh 2020-02-08 21:26:01 +01:00
parent 0cc5c7f7bb
commit a63aff560f
2 changed files with 58 additions and 54 deletions

View file

@ -126,7 +126,7 @@
(p/recur)))))))) (p/recur))))))))
(defn- log-error (defn- log-error
[err] [^Throwable err]
(log/error "Unexpected exception on websocket handler:\n" (log/error "Unexpected exception on websocket handler:\n"
(with-out-str (with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*))))) (.printStackTrace err (java.io.PrintWriter. *out*)))))
@ -143,6 +143,7 @@
(defn handler (defn handler
[{:keys [user] :as req}] [{:keys [user] :as req}]
(ws/websocket :handler (partial websocket-handler req) (ws/websocket
;; :on-error on-error {:handler (partial websocket-handler req)
)) :input-buffer-size 64
:output-buffer-size 64}))

View file

@ -17,7 +17,6 @@
[vertx.eventbus :as ve]) [vertx.eventbus :as ve])
(:import (:import
java.lang.AutoCloseable java.lang.AutoCloseable
io.vertx.core.Future
io.vertx.core.Promise io.vertx.core.Promise
io.vertx.core.Handler io.vertx.core.Handler
io.vertx.core.Vertx io.vertx.core.Vertx
@ -51,26 +50,34 @@
d)) d))
(defn- default-on-error (defn- default-on-error
[ws err] [^Throwable err]
(log/error "Unexpected exception on websocket handler:\n" (log/error "Unexpected exception on websocket handler:\n"
(with-out-str (with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*)))) (.printStackTrace err (java.io.PrintWriter. *out*)))))
(.close ^AutoCloseable ws))
(defrecord WebSocketResponse [handler on-error] (defn websocket
[{:keys [handler on-error
input-buffer-size
output-buffer-size]
:or {on-error default-on-error
input-buffer-size 64
output-buffer-size 64}}]
(reify
vh/IAsyncResponse vh/IAsyncResponse
(-handle-response [it request] (-handle-response [it request]
(let [^HttpServerRequest req (::vh/request request) (let [^HttpServerRequest req (::vh/request request)
^ServerWebSocket conn (.upgrade req) ^ServerWebSocket conn (.upgrade req)
inp-s (vs/stream 64) inp-s (vs/stream input-buffer-size)
out-s (vs/stream 64) out-s (vs/stream output-buffer-size)
ctx (vu/current-context) ctx (vu/current-context)
ws (->WebSocket conn inp-s out-s) ws (->WebSocket conn inp-s out-s)
impl-on-error impl-on-error
(fn [e] (on-error ws e)) (fn [err]
(.close ^AutoCloseable ws)
(on-error err))
impl-on-close impl-on-close
(fn [_] (fn [_]
@ -81,26 +88,22 @@
(fn [message] (fn [message]
(when-not (vs/offer! inp-s message) (when-not (vs/offer! inp-s message)
(.pause conn) (.pause conn)
(prn "BUFF")
(-> (vs/put! inp-s message) (-> (vs/put! inp-s message)
(p/then' (fn [res] (p/then' (fn [res]
(when-not (false? res) (when-not (false? res)
(.resume conn)))))))] (.resume conn)))))))]
(.exceptionHandler conn (vi/fn->handler impl-on-error)) (.exceptionHandler conn ^Handler (vi/fn->handler impl-on-error))
(.textMessageHandler conn (vi/fn->handler impl-on-message)) (.textMessageHandler conn ^Handler (vi/fn->handler impl-on-message))
(.closeHandler conn (vi/fn->handler impl-on-close)) (.closeHandler conn ^Handler (vi/fn->handler impl-on-close))
(vs/loop [] (vs/loop []
(p/let [msg (vs/take! out-s)] (p/let [msg (vs/take! out-s)]
(when-not (nil? msg) (when-not (nil? msg)
(p/do! (-> (write-to-websocket conn msg)
(write-to-websocket conn msg) (p/then' (fn [_] (p/recur)))
(p/recur))))) (p/catch' (fn [err]
(on-error err)
(p/recur)))))))
(vu/run-on-context! ctx #(handler ws))))) (vu/run-on-context! ctx #(handler ws))))))
(defn websocket
[& {:keys [handler on-error]
:or {on-error default-on-error}}]
(->WebSocketResponse handler on-error))