🚧 Initial work on websocket communication.

This commit is contained in:
Andrey Antukh 2019-12-18 18:15:25 +01:00
parent 9c1c613c90
commit 9d93b0d3fb
9 changed files with 219 additions and 75 deletions

View file

@ -7,7 +7,7 @@
(ns uxbox.http.interceptors
(:require
[vertx.web :as vw]
[uxbox.util.blob :as blob]
[uxbox.util.transit :as t]
[uxbox.util.exceptions :as ex])
(:import
io.vertx.ext.web.RoutingContext
@ -20,7 +20,7 @@
mtype (get-in request [:headers "content-type"])]
(if (= "application/transit+json" mtype)
(try
(let [params (blob/decode-from-json body)]
(let [params (t/decode (t/buffer->bytes body))]
(update data :request assoc :body-params params))
(catch Exception e
(ex/raise :type :parse
@ -35,7 +35,7 @@
(coll? body)
(-> data
(assoc-in [:response :body]
(blob/encode-with-json body true))
(t/bytes->buffer (t/encode body)))
(update-in [:response :headers]
assoc "content-type" "application/transit+json"))

View file

@ -7,6 +7,7 @@
(ns uxbox.http.ws
"Web Socket handlers"
(:require
[clojure.tools.logging :as log]
[promesa.core :as p]
[uxbox.emails :as emails]
[uxbox.http.session :as session]
@ -14,6 +15,7 @@
[uxbox.services.mutations :as sm]
[uxbox.services.queries :as sq]
[uxbox.util.uuid :as uuid]
[uxbox.util.transit :as t]
[uxbox.util.blob :as blob]
[vertx.http :as vh]
[vertx.web :as vw]
@ -31,41 +33,69 @@
(declare ws-websocket)
(declare ws-send!)
(declare ws-on-message!)
(declare ws-on-close!)
;; --- Public API
;; --- State Management
(declare on-message)
(declare on-close)
(declare on-eventbus-message)
(defonce state
(atom {}))
(def state (atom {}))
(defn send!
[ws message]
(ws-send! ws (-> (t/encode message)
(t/bytes->str))))
(defmulti handle-message
(fn [ws message] (:type message)))
(defmethod handle-message :connect
[ws {:keys [file-id user-id] :as message}]
(let [local (swap! state assoc-in [file-id user-id] ws)
sessions (get local file-id)
message {:type :who :users (set (keys sessions))}]
(run! #(send! % message) (vals sessions))))
(defmethod handle-message :disconnect
[{:keys [user-id] :as ws} {:keys [file-id] :as message}]
(swap! state update file-id dissoc user-id)
nil)
(defmethod handle-message :who
[{:keys [file-id] :as ws} message]
(let [users (keys (get @state file-id))]
(send! ws {:type :who :users (set users)})))
;; --- Handler
(declare start-eventbus-consumer!)
(defn handler
[{:keys [user] :as req}]
(letfn [(on-init [ws]
(let [vsm (::vw/execution-context req)
tpc "test.foobar"
pid (get-in req [:path-params :page-id])
sem (ve/consumer vsm tpc #(on-eventbus-message ws %2))]
(swap! state update pid (fnil conj #{}) user)
(assoc ws ::sem sem)))
fid (get-in req [:path-params :file-id])
sem (start-eventbus-consumer! vsm ws fid)]
(handle-message ws {:type :connect :file-id fid :user-id user})
(assoc ws
::sem sem
:user-id user
:file-id fid)))
(on-message [ws message]
(let [pid (get-in req [:path-params :page-id])]
(ws-send! ws (str (::counter ws 0)))
(update ws ::counter (fnil inc 0))))
(try
(->> (t/str->bytes message)
(t/decode)
(handle-message ws))
(catch Throwable err
(log/error "Unexpected exception:\n"
(with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*)))))))
(on-close [ws]
(let [pid (get-in req [:path-params :page-id])]
(swap! state update pid disj user)
(let [fid (get-in req [:path-params :file-id])]
(handle-message ws {:type :disconnect :file-id fid})
(.unregister (::sem ws))))]
;; (ws-websocket :on-init on-init
;; :on-message on-message
;; :on-close on-close)))
(-> (ws-websocket)
(assoc :on-init on-init
:on-message on-message
@ -73,8 +103,14 @@
(defn- on-eventbus-message
[ws {:keys [body] :as message}]
;; TODO
(ws-send! ws body))
(defn- start-eventbus-consumer!
[vsm ws fid]
(let [topic (str "internal.uxbox.file." fid)]
(ve/consumer vsm topic #(on-eventbus-message ws %2))))
;; --- Internal (vertx api) (experimental)
(defrecord WebSocket [on-init on-message on-close]