♻️ Refactor websockets subsystem.

This commit is contained in:
Andrey Antukh 2020-02-07 12:15:38 +01:00
parent f37c9a5adb
commit e805515767
2 changed files with 140 additions and 67 deletions

View file

@ -9,32 +9,30 @@
(:require (:require
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[promesa.core :as p] [promesa.core :as p]
[uxbox.common.exceptions :as ex]
[uxbox.emails :as emails] [uxbox.emails :as emails]
[uxbox.http.session :as session] [uxbox.http.session :as session]
[uxbox.services.init] [uxbox.services.init]
[uxbox.services.mutations :as sm] [uxbox.services.mutations :as sm]
[uxbox.services.queries :as sq] [uxbox.services.queries :as sq]
[uxbox.util.uuid :as uuid]
[uxbox.util.transit :as t]
[uxbox.util.blob :as blob] [uxbox.util.blob :as blob]
[uxbox.util.transit :as t]
[uxbox.util.uuid :as uuid]
[vertx.eventbus :as ve]
[vertx.http :as vh] [vertx.http :as vh]
[vertx.web :as vw]
[vertx.impl :as vi] [vertx.impl :as vi]
[vertx.util :as vu] [vertx.util :as vu]
[vertx.eventbus :as ve]) [vertx.web :as vw]
[vertx.web.websockets :as ws])
(:import (:import
io.vertx.core.Future
io.vertx.core.Promise
io.vertx.core.Handler io.vertx.core.Handler
io.vertx.core.Promise
io.vertx.core.Vertx io.vertx.core.Vertx
io.vertx.core.buffer.Buffer io.vertx.core.buffer.Buffer
io.vertx.core.http.HttpServerRequest io.vertx.core.http.HttpServerRequest
io.vertx.core.http.HttpServerResponse io.vertx.core.http.HttpServerResponse
io.vertx.core.http.ServerWebSocket)) io.vertx.core.http.ServerWebSocket))
(declare ws-websocket)
(declare ws-send!)
;; --- State Management ;; --- State Management
(defonce state (defonce state
@ -42,7 +40,7 @@
(defn send! (defn send!
[ws message] [ws message]
(ws-send! ws (-> (t/encode message) (ws/send! ws (-> (t/encode message)
(t/bytes->str)))) (t/bytes->str))))
(defmulti handle-message (defmulti handle-message
@ -85,63 +83,35 @@
;; --- Handler ;; --- Handler
(defn- on-init
[req ws]
(let [ctx (vu/current-context)
file-id (get-in req [:path-params :file-id])
user-id (:user req)
ws (assoc ws
:user-id user-id
:file-id file-id)
sem (start-eventbus-consumer! ctx ws file-id)]
(handle-message ws {:type :connect})
(assoc ws ::sem sem)))
(defn- on-text-message
[ws message]
(->> (t/str->bytes message)
(t/decode)
(handle-message ws)))
(defn- on-close
[ws]
(let [file-id (:file-id ws)]
(handle-message ws {:type :disconnect
:file-id file-id})
(.unregister (::sem ws))))
(defn handler (defn handler
[{:keys [user] :as req}] [{:keys [user] :as req}]
(letfn [(on-init [ws] (ws/websocket :on-init (partial on-init req)
(let [ctx (vu/current-context) :on-text-message on-text-message
fid (get-in req [:path-params :file-id]) ;; :on-error on-error
ws (assoc ws :on-close on-close))
:user-id user
:file-id fid)
sem (start-eventbus-consumer! ctx ws fid)]
(handle-message ws {:type :connect})
(assoc ws ::sem sem)))
(on-message [ws message]
(try
(->> (t/str->bytes message)
(t/decode)
(handle-message ws))
(catch Throwable err
(log/error "Unexpected exception:\n"
(with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*)))))))
(on-close [ws]
(let [fid (get-in req [:path-params :file-id])]
(handle-message ws {:type :disconnect :file-id fid})
(.unregister (::sem ws))))]
(-> (ws-websocket)
(assoc :on-init on-init
:on-message on-message
:on-close on-close))))
;; --- Internal (vertx api) (experimental)
(defrecord WebSocket [on-init on-message on-close]
vh/IAsyncResponse
(-handle-response [this ctx]
(let [^HttpServerRequest req (::vh/request ctx)
^ServerWebSocket ws (.upgrade req)
local (volatile! (assoc this :ws ws))]
(-> (p/do! (on-init @local))
(p/then (fn [data]
(vreset! local data)
(.textMessageHandler ws (vi/fn->handler
(fn [msg]
(-> (p/do! (on-message @local msg))
(p/then (fn [data]
(when (instance? WebSocket data)
(vreset! local data))
(.fetch ws 1)))))))
(.closeHandler ws (vi/fn->handler (fn [& args] (on-close @local))))))))))
(defn ws-websocket
[]
(->WebSocket nil nil nil))
(defn ws-send!
[ws msg]
(.writeTextMessage ^ServerWebSocket (:ws ws)
^String msg))

View file

@ -0,0 +1,103 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019-2020 Andrey Antukh <niwi@niwi.nz>
(ns vertx.web.websockets
"Web Sockets."
(:require
[clojure.tools.logging :as log]
[promesa.core :as p]
[vertx.http :as vh]
[vertx.web :as vw]
[vertx.impl :as vi]
[vertx.util :as vu]
[vertx.eventbus :as ve])
(:import
java.lang.AutoCloseable
io.vertx.core.Future
io.vertx.core.Promise
io.vertx.core.Handler
io.vertx.core.Vertx
io.vertx.core.buffer.Buffer
io.vertx.core.http.HttpServerRequest
io.vertx.core.http.HttpServerResponse
io.vertx.core.http.ServerWebSocket))
(defprotocol IWebSocket
(send! [it message]))
(defrecord WebSocket [conn]
AutoCloseable
(close [it]
(.close ^ServerWebSocket conn))
IWebSocket
(send! [it message]
(let [d (p/deferred)]
(cond
(string? message)
(.writeTextMessage ^ServerWebSocket conn
^String message
^Handler (vi/deferred->handler d))
(instance? Buffer message)
(.writeBinaryMessage ^ServerWebSocket conn
^Buffer message
^Handler (vi/deferred->handler d))
:else
(p/reject! (ex-info "invalid message type" {:message message})))
d)))
(defn- default-on-error
[ws err]
(log/error "Unexpected exception on websocket handler:\n"
(with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*))))
(.close ^AutoCloseable ws))
(defrecord WebSocketResponse [on-init on-text-message on-error on-close]
vh/IAsyncResponse
(-handle-response [it ctx]
(let [^HttpServerRequest req (::vh/request ctx)
^ServerWebSocket conn (.upgrade req)
wsref (volatile! (->WebSocket conn))
impl-on-error (fn [e] (on-error @wsref e))
impl-on-close (fn [_] (on-close @wsref))
impl-on-message
(fn [message]
(-> (p/do! (on-text-message @wsref message))
(p/finally (fn [res err]
(if err
(impl-on-error err)
(do
(.fetch conn 1)
(when (instance? WebSocket res)
(vreset! wsref res))))))))]
(-> (p/do! (on-init @wsref))
(p/finally (fn [data error]
(cond
(not (nil? error))
(impl-on-error error)
(instance? WebSocket data)
(do
(vreset! wsref data)
(.exceptionHandler conn (vi/fn->handler impl-on-error))
(.textMessageHandler conn (vi/fn->handler impl-on-message))
(.closeHandler conn (vi/fn->handler impl-on-close)))
:else
(.reject conn)))))
nil)))
(defn websocket
[& {:keys [on-init on-text-message on-error on-close]
:or {on-error default-on-error}}]
(->WebSocketResponse on-init on-text-message on-error on-close))