diff --git a/backend/src/uxbox/http/ws.clj b/backend/src/uxbox/http/ws.clj index 29351d6ce..4ad32c1c0 100644 --- a/backend/src/uxbox/http/ws.clj +++ b/backend/src/uxbox/http/ws.clj @@ -126,7 +126,7 @@ (p/recur)))))))) (defn- log-error - [err] + [^Throwable err] (log/error "Unexpected exception on websocket handler:\n" (with-out-str (.printStackTrace err (java.io.PrintWriter. *out*))))) @@ -143,6 +143,7 @@ (defn handler [{:keys [user] :as req}] - (ws/websocket :handler (partial websocket-handler req) - ;; :on-error on-error - )) + (ws/websocket + {:handler (partial websocket-handler req) + :input-buffer-size 64 + :output-buffer-size 64})) diff --git a/backend/vendor/vertx/src/vertx/web/websockets.clj b/backend/vendor/vertx/src/vertx/web/websockets.clj index 6cb4ad198..d138c30bb 100644 --- a/backend/vendor/vertx/src/vertx/web/websockets.clj +++ b/backend/vendor/vertx/src/vertx/web/websockets.clj @@ -17,7 +17,6 @@ [vertx.eventbus :as ve]) (:import java.lang.AutoCloseable - io.vertx.core.Future io.vertx.core.Promise io.vertx.core.Handler io.vertx.core.Vertx @@ -51,56 +50,60 @@ d)) (defn- default-on-error - [ws err] + [^Throwable err] (log/error "Unexpected exception on websocket handler:\n" (with-out-str - (.printStackTrace err (java.io.PrintWriter. *out*)))) - (.close ^AutoCloseable ws)) - -(defrecord WebSocketResponse [handler on-error] - vh/IAsyncResponse - (-handle-response [it request] - (let [^HttpServerRequest req (::vh/request request) - ^ServerWebSocket conn (.upgrade req) - - inp-s (vs/stream 64) - out-s (vs/stream 64) - - ctx (vu/current-context) - ws (->WebSocket conn inp-s out-s) - - impl-on-error - (fn [e] (on-error ws e)) - - impl-on-close - (fn [_] - (vs/close! inp-s) - (vs/close! out-s)) - - impl-on-message - (fn [message] - (when-not (vs/offer! inp-s message) - (.pause conn) - (prn "BUFF") - (-> (vs/put! inp-s message) - (p/then' (fn [res] - (when-not (false? res) - (.resume conn)))))))] - - (.exceptionHandler conn (vi/fn->handler impl-on-error)) - (.textMessageHandler conn (vi/fn->handler impl-on-message)) - (.closeHandler conn (vi/fn->handler impl-on-close)) - - (vs/loop [] - (p/let [msg (vs/take! out-s)] - (when-not (nil? msg) - (p/do! - (write-to-websocket conn msg) - (p/recur))))) - - (vu/run-on-context! ctx #(handler ws))))) + (.printStackTrace err (java.io.PrintWriter. *out*))))) (defn websocket - [& {:keys [handler on-error] - :or {on-error default-on-error}}] - (->WebSocketResponse handler on-error)) + [{:keys [handler on-error + input-buffer-size + output-buffer-size] + :or {on-error default-on-error + input-buffer-size 64 + output-buffer-size 64}}] + (reify + vh/IAsyncResponse + (-handle-response [it request] + (let [^HttpServerRequest req (::vh/request request) + ^ServerWebSocket conn (.upgrade req) + + inp-s (vs/stream input-buffer-size) + out-s (vs/stream output-buffer-size) + + ctx (vu/current-context) + ws (->WebSocket conn inp-s out-s) + + impl-on-error + (fn [err] + (.close ^AutoCloseable ws) + (on-error err)) + + impl-on-close + (fn [_] + (vs/close! inp-s) + (vs/close! out-s)) + + impl-on-message + (fn [message] + (when-not (vs/offer! inp-s message) + (.pause conn) + (-> (vs/put! inp-s message) + (p/then' (fn [res] + (when-not (false? res) + (.resume conn)))))))] + + (.exceptionHandler conn ^Handler (vi/fn->handler impl-on-error)) + (.textMessageHandler conn ^Handler (vi/fn->handler impl-on-message)) + (.closeHandler conn ^Handler (vi/fn->handler impl-on-close)) + + (vs/loop [] + (p/let [msg (vs/take! out-s)] + (when-not (nil? msg) + (-> (write-to-websocket conn msg) + (p/then' (fn [_] (p/recur))) + (p/catch' (fn [err] + (on-error err) + (p/recur))))))) + + (vu/run-on-context! ctx #(handler ws))))))