🎉 Add websockets abstraction initial example code.

This commit is contained in:
Andrey Antukh 2019-12-17 22:09:49 +01:00
parent 9ba9e8dbae
commit 0a9ef0d345
6 changed files with 128 additions and 7 deletions

View file

@ -12,7 +12,9 @@
[uxbox.services.init]
[uxbox.services.mutations :as sm]
[uxbox.services.queries :as sq]
[uxbox.util.uuid :as uuid]))
[uxbox.util.uuid :as uuid]
[vertx.web :as vw]
[vertx.eventbus :as ve]))
(defn query-handler
[req]
@ -45,7 +47,7 @@
(p/then #(session/create (:id %) user-agent))
(p/then' (fn [token]
{:status 204
:cookies {"auth-token" {:value token}}
:cookies {"auth-token" {:value token :path "/"}}
:body ""})))))
(defn logout-handler

View file

@ -56,7 +56,7 @@
(spx/terminate (assoc data ::unauthorized true)))))
(vc/handle-on-context))))
:leave (fn [data]
(if (::unauthorized data)
(if (and (::unauthorized data) (:response data))
(update data :response
assoc :status 403 :body {:type :authentication
:code :unauthorized})

View file

@ -0,0 +1,105 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns uxbox.http.ws
"Web Socket handlers"
(:require
[promesa.core :as p]
[uxbox.emails :as emails]
[uxbox.http.session :as session]
[uxbox.services.init]
[uxbox.services.mutations :as sm]
[uxbox.services.queries :as sq]
[uxbox.util.uuid :as uuid]
[uxbox.util.blob :as blob]
[vertx.http :as vh]
[vertx.web :as vw]
[vertx.util :as vu]
[vertx.eventbus :as ve])
(:import
io.vertx.core.Future
io.vertx.core.Promise
io.vertx.core.Handler
io.vertx.core.Vertx
io.vertx.core.buffer.Buffer
io.vertx.core.http.HttpServerRequest
io.vertx.core.http.HttpServerResponse
io.vertx.core.http.ServerWebSocket))
(declare ws-websocket)
(declare ws-send!)
(declare ws-on-message!)
(declare ws-on-close!)
;; --- Public API
(declare on-message)
(declare on-close)
(declare on-eventbus-message)
(def state (atom {}))
(defn handler
[{:keys [user] :as req}]
(letfn [(on-init [ws]
(let [vsm (::vw/execution-context req)
tpc "test.foobar"
pid (get-in req [:path-params :page-id])
sem (ve/consumer vsm tpc #(on-eventbus-message ws %2))]
(swap! state update pid (fnil conj #{}) user)
(assoc ws ::sem sem)))
(on-message [ws message]
(let [pid (get-in req [:path-params :page-id])]
(ws-send! ws (str (::counter ws 0)))
(update ws ::counter (fnil inc 0))))
(on-close [ws]
(let [pid (get-in req [:path-params :page-id])]
(swap! state update pid disj user)
(.unregister (::sem ws))))]
;; (ws-websocket :on-init on-init
;; :on-message on-message
;; :on-close on-close)))
(-> (ws-websocket)
(assoc :on-init on-init
:on-message on-message
:on-close on-close))))
(defn- on-eventbus-message
[ws {:keys [body] :as message}]
(ws-send! ws body))
;; --- Internal (vertx api) (experimental)
(defrecord WebSocket [on-init on-message on-close]
vh/IAsyncResponse
(-handle-response [this ctx]
(let [^HttpServerRequest req (::vh/request ctx)
^ServerWebSocket ws (.upgrade req)
local (volatile! (assoc this :ws ws))]
(-> (p/do! (on-init @local))
(p/then (fn [data]
(vreset! local data)
(.textMessageHandler ws (vu/fn->handler
(fn [msg]
(-> (p/do! (on-message @local msg))
(p/then (fn [data]
(when (instance? WebSocket data)
(vreset! local data))
(.fetch ws 1)))))))
(.closeHandler ws (vu/fn->handler (fn [& args] (on-close @local))))))))))
(defn ws-websocket
[]
(->WebSocket nil nil nil))
(defn ws-send!
[ws msg]
(.writeTextMessage ^ServerWebSocket (:ws ws)
^String msg))