♻️ Refactor presence and realtime cursors handling.

This commit is contained in:
Andrey Antukh 2020-04-27 08:54:43 +02:00 committed by Alonso Torres
parent 1c3664921d
commit 285735e35f
15 changed files with 519 additions and 318 deletions

View file

@ -30,12 +30,13 @@
:allow-methods #{:post :get :patch :head :options :put}
:allow-headers #{:x-requested-with :content-type :cookie}}
routes [["/sub/:file-id" {:middleware [[vwm/cookies]
[vwm/cors cors-opts]
[middleware/format-response-body]
[session/auth]]
:handler ws/handler
:method :get}]
routes [["/notifications/:file-id/:session-id"
{:middleware [[vwm/cookies]
[vwm/cors cors-opts]
[middleware/format-response-body]
[session/auth]]
:handler ws/handler
:method :get}]
["/api" {:middleware [[vwm/cookies]
[vwm/params]

View file

@ -2,147 +2,17 @@
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
;; Copyright (c) 2020 UXBOX Labs SL
(ns uxbox.http.ws
"Web Socket handlers"
(:require
[clojure.tools.logging :as log]
[promesa.core :as p]
[uxbox.common.exceptions :as ex]
[uxbox.emails :as emails]
[uxbox.http.session :as session]
[uxbox.services.init]
[uxbox.services.mutations :as sm]
[uxbox.services.queries :as sq]
[uxbox.util.blob :as blob]
[uxbox.util.transit :as t]
[uxbox.common.uuid :as uuid]
[vertx.eventbus :as ve]
[vertx.http :as vh]
[vertx.util :as vu]
[vertx.timers :as vt]
[vertx.web :as vw]
[vertx.stream :as vs]
[vertx.web.websockets :as ws])
(:import
java.lang.AutoCloseable
io.vertx.core.Handler
io.vertx.core.Promise
io.vertx.core.Vertx
io.vertx.core.buffer.Buffer
io.vertx.core.http.HttpServerRequest
io.vertx.core.http.HttpServerResponse
io.vertx.core.http.ServerWebSocket))
;; --- State Management
(def state (atom {}))
(defn send!
[{:keys [output] :as ws} message]
(let [msg (-> (t/encode message)
(t/bytes->str))]
(vs/put! output msg)))
(defmulti handle-message
(fn [ws message] (:type message)))
(defmethod handle-message :connect
[{:keys [file-id profile-id] :as ws} message]
(let [local (swap! state assoc-in [file-id profile-id] ws)
sessions (get local file-id)
message {:type :who :users (set (keys sessions))}]
(p/run! #(send! % message) (vals sessions))))
(defmethod handle-message :disconnect
[{:keys [profile-id] :as ws} {:keys [file-id] :as message}]
(let [local (swap! state update file-id dissoc profile-id)
sessions (get local file-id)
message {:type :who :users (set (keys sessions))}]
(p/run! #(send! % message) (vals sessions))))
(defmethod handle-message :who
[{:keys [file-id] :as ws} message]
(let [users (keys (get @state file-id))]
(send! ws {:type :who :users (set users)})))
(defmethod handle-message :pointer-update
[{:keys [profile-id file-id] :as ws} message]
(let [sessions (->> (vals (get @state file-id))
(remove #(= profile-id (:profile-id %))))
message (assoc message :profile-id profile-id)]
(p/run! #(send! % message) sessions)))
(defn- on-eventbus-message
[{:keys [file-id profile-id] :as ws} {:keys [body] :as message}]
(send! ws body))
(defn- start-eventbus-consumer!
[vsm ws fid]
(let [topic (str "internal.uxbox.file." fid)]
(ve/consumer vsm topic #(on-eventbus-message ws %2))))
;; --- Handler
(defn- on-init
[ws req]
(let [ctx (vu/current-context)
file-id (get-in req [:path-params :file-id])
profile-id (:profile-id req)
ws (assoc ws
:profile-id profile-id
:file-id file-id)
send-ping #(send! ws {:type :ping})
sem1 (start-eventbus-consumer! ctx ws file-id)
sem2 (vt/schedule-periodic! ctx 5000 send-ping)]
(handle-message ws {:type :connect})
(p/resolved (assoc ws ::sem1 sem1 ::sem2 sem2))))
(defn- on-message
[ws message]
(->> (t/str->bytes message)
(t/decode)
(handle-message ws)))
(defn- on-close
[ws]
(let [file-id (:file-id ws)]
(handle-message ws {:type :disconnect
:file-id file-id})
(when-let [sem1 (::sem1 ws)]
(.close ^AutoCloseable sem1))
(when-let [sem2 (::sem2 ws)]
(.close ^AutoCloseable sem2))))
(defn- rcv-loop
[{:keys [input] :as ws}]
(vs/loop []
(-> (vs/take! input)
(p/then (fn [message]
(when message
(p/do! (on-message ws message)
(p/recur))))))))
(defn- log-error
[^Throwable err]
(log/error "Unexpected exception on websocket handler:\n"
(with-out-str
(.printStackTrace err (java.io.PrintWriter. *out*)))))
(defn websocket-handler
[req ws]
(p/let [ws (on-init ws req)]
(-> (rcv-loop ws)
(p/finally (fn [_ error]
(.close ^AutoCloseable ws)
(on-close ws)
(when error
(log-error error)))))))
[uxbox.services.notifications :as nf]
[vertx.web.websockets :as ws]))
(defn handler
[{:keys [user] :as req}]
(ws/websocket
{:handler (partial websocket-handler req)
{:handler #(nf/websocket req %)
:input-buffer-size 64
:output-buffer-size 64}))

View file

@ -33,17 +33,21 @@
:stop (.close ^AutoCloseable client))
(defstate conn
:start (redis/connect client)
:start @(redis/connect client)
:stop (.close ^AutoCloseable conn))
;; --- API FORWARD
(defmacro with-conn
[& args]
`(redis/with-conn ~@args))
(defn subscribe
[topic]
(redis/subscribe client topic))
(defn run!
[conn cmd params]
[cmd params]
(let [ctx (vu/get-or-create-context system)]
(-> (redis/run! conn cmd params)
(vu/handle-on-context ctx))))
(defn run
[cmd params]
(redis/run conn cmd params))

View file

@ -0,0 +1,176 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2020 UXBOX Labs SL
(ns uxbox.services.notifications
"A websocket based notifications mechanism."
(:require
[clojure.tools.logging :as log]
[clojure.core.async :as a :refer [>! <!]]
[promesa.core :as p]
[uxbox.common.exceptions :as ex]
[uxbox.util.transit :as t]
[uxbox.redis :as redis]
[uxbox.common.uuid :as uuid]
[vertx.util :as vu :refer [<?]]))
(defn- decode-message
[message]
(->> (t/str->bytes message)
(t/decode)))
(defn- encode-message
[message]
(->> (t/encode message)
(t/bytes->str)))
;; --- Redis Interactions
(defn- publish
[channel message]
(vu/go-try
(let [message (encode-message message)]
(<? (redis/run :publish {:channel (str channel)
:message message})))))
(defn- retrieve-presence
[key]
(vu/go-try
(let [data (<? (redis/run :hgetall {:key key}))]
(into [] (map (fn [[k v]] [(uuid/uuid k) (uuid/uuid v)])) data))))
(defn- join-room
[file-id session-id profile-id]
(let [key (str file-id)
field (str session-id)
value (str profile-id)]
(vu/go-try
(<? (redis/run :hset {:key key :field field :value value}))
(<? (retrieve-presence key)))))
(defn- leave-room
[file-id session-id profile-id]
(let [key (str file-id)
field (str session-id)]
(vu/go-try
(<? (redis/run :hdel {:key key :field field}))
(<? (retrieve-presence key)))))
;; --- WebSocket Messages Handling
(defmulti handle-message
(fn [ws message] (:type message)))
;; TODO: check permissions for join a file-id channel (probably using
;; single use token for avoid explicit database query).
(defmethod handle-message :connect
[{:keys [file-id profile-id session-id output] :as ws} message]
(log/info (str "profile " profile-id " is connected to " file-id))
(vu/go-try
(let [members (<? (join-room file-id session-id profile-id))]
(<? (publish file-id {:type :presence :sessions members})))))
(defmethod handle-message :disconnect
[{:keys [profile-id file-id session-id] :as ws} message]
(log/info (str "profile " profile-id " is disconnected from " file-id))
(vu/go-try
(let [members (<? (leave-room file-id session-id profile-id))]
(<? (publish file-id {:type :presence :sessions members})))))
(defmethod handle-message :default
[ws message]
(a/go
(log/warn (str "received unexpected message: " message))))
(defmethod handle-message :pointer-update
[{:keys [profile-id file-id session-id] :as ws} message]
(vu/go-try
(let [message (assoc message
:profile-id profile-id
:session-id session-id)]
(<? (publish file-id message)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; WebSocket Handler
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- process-message
[ws message]
(vu/go-try
(let [message (decode-message message)]
(<? (handle-message ws message)))))
(defn- forward-message
[{:keys [output session-id profile-id] :as ws} message]
(vu/go-try
(let [message' (decode-message message)]
(when-not (= (:session-id message') session-id)
(>! output message)))))
(defn- close-all!
[{:keys [sch] :as ws}]
(a/close! sch)
(.close ^java.lang.AutoCloseable ws))
(defn start-loop!
[{:keys [input output sch on-error] :as ws}]
(vu/go-try
(loop []
(let [timeout (a/timeout 30000)
[val port] (a/alts! [input sch timeout])]
;; (prn "alts" val "from" (cond (= port input) "input"
;; (= port sch) "redis"
;; :else "timeout"))
(cond
;; Process message coming from connected client
(and (= port input) (not (nil? val)))
(do
(<? (process-message ws val))
(recur))
;; Forward message to the websocket
(and (= port sch) (not (nil? val)))
(do
(<? (forward-message ws val))
(recur))
;; Timeout channel signaling
(= port timeout)
(do
(>! output (encode-message {:type :ping}))
(recur))
:else
nil)))))
(defn- on-subscribed
[{:keys [on-error] :as ws} sch]
(let [ws (assoc ws :sch sch)]
(a/go
(try
(<? (handle-message ws {:type :connect}))
(<? (start-loop! ws))
(<? (handle-message ws {:type :disconnect}))
(close-all! ws)
(catch Throwable e
(on-error e)
(close-all! ws))))))
(defn websocket
[req {:keys [input on-error] :as ws}]
(let [fid (uuid/uuid (get-in req [:path-params :file-id]))
sid (uuid/uuid (get-in req [:path-params :session-id]))
pid (:profile-id req)
ws (assoc ws
:profile-id pid
:file-id fid
:session-id sid)]
(-> (redis/subscribe (str fid))
(p/finally (fn [sch error]
(if error
(on-error error)
(on-subscribed ws sch)))))))

View file

@ -207,18 +207,19 @@
(defn retrieve-file-users
[conn id]
(db/query conn [sql:file-users id]))
(-> (db/query conn [sql:file-users id])
(p/then (fn [rows]
(mapv #(images/resolve-media-uris % [:photo :photo-uri]) rows)))))
(s/def ::file-with-users
(s/def ::file-users
(s/keys :req-un [::profile-id ::id]))
(sq/defquery ::file-with-users
(sq/defquery ::file-users
[{:keys [profile-id id] :as params}]
(db/with-atomic [conn db/pool]
(check-edition-permissions! conn profile-id id)
(p/let [file (retrieve-file conn id)
users (retrieve-file-users conn id)]
(assoc file :users users))))
(retrieve-file-users conn id)))
(s/def ::file
(s/keys :req-un [::profile-id ::id]))

View file

@ -6,26 +6,32 @@
(ns uxbox.util.redis
"Asynchronous posgresql client."
(:refer-clojure :exclude [get set run!])
(:refer-clojure :exclude [run!])
(:require
[promesa.core :as p])
[promesa.core :as p]
[clojure.core.async :as a])
(:import
io.lettuce.core.RedisClient
io.lettuce.core.RedisURI
io.lettuce.core.codec.StringCodec
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
))
(defrecord Client [conn uri]
(defrecord Client [client uri]
java.lang.AutoCloseable
(close [_]
(.shutdown ^RedisClient conn)))
(.shutdown ^RedisClient client)))
(defrecord Connection [cmd conn]
(defrecord Connection [^RedisAsyncCommands cmd]
java.lang.AutoCloseable
(close [_]
(.close ^StatefulRedisConnection conn)))
(let [conn (.getStatefulConnection cmd)]
(.close ^StatefulRedisConnection conn))))
(defn client
[uri]
@ -34,30 +40,51 @@
(defn connect
[client]
(let [^RedisURI uri (:uri client)
^RedisClient conn (:conn client)
^StatefulRedisConnection conn' (.connect conn StringCodec/UTF8 uri)]
(->Connection (.async conn') conn')))
^RedisClient client (:client client)]
(-> (.connectAsync client StringCodec/UTF8 uri)
(p/then' (fn [^StatefulRedisConnection conn]
(->Connection (.async conn)))))))
(declare impl-with-conn)
(defn- impl-subscribe
[^String topic ^StatefulRedisPubSubConnection conn]
(let [cmd (.async conn)
output (a/chan 1 (filter string?))
buffer (a/chan (a/sliding-buffer 64))
listener (reify RedisPubSubListener
(message [it pattern channel message])
(message [it channel message]
;; There are no back pressure, so we use a
;; slidding buffer for cases when the pubsub
;; broker sends more messages that we can
;; process.
(a/put! buffer message))
(psubscribed [it pattern count]
#_(prn "psubscribed" pattern count))
(punsubscribed [it pattern count]
#_(prn "punsubscribed" pattern count))
(subscribed [it channel count]
#_(prn "subscribed" channel count))
(unsubscribed [it channel count]
#_(prn "unsubscribed" channel count)))]
(.addListener conn listener)
(a/go-loop []
(let [[val port] (a/alts! [buffer (a/timeout 5000)])
message (if (= port buffer) val ::keepalive)]
(if (a/>! output message)
(recur)
(do
(a/close! buffer)
(when (.isOpen conn)
(.close conn))))))
(-> (.subscribe ^RedisPubSubAsyncCommands cmd (into-array String [topic]))
(p/then' (constantly output)))))
(defmacro with-conn
[[csym sym] & body]
`(impl-with-conn ~sym (fn [~csym] ~@body)))
(defn impl-with-conn
[client f]
(defn subscribe
[client topic]
(let [^RedisURI uri (:uri client)
^RedisClient conn (:conn client)]
(-> (.connectAsync conn StringCodec/UTF8 uri)
(p/then (fn [^StatefulRedisConnection conn]
(let [cmd (.async conn)
conn (->Connection cmd conn)]
(-> (p/do! (f conn))
(p/handle (fn [v e]
(.close conn)
(if e
(throw e)
v))))))))))
^RedisClient client (:client client)]
(-> (.connectPubSubAsync client StringCodec/UTF8 uri)
(p/then (partial impl-subscribe topic)))))
(defn- resolve-to-bool
[v]
@ -72,6 +99,18 @@
(let [^RedisAsyncCommands conn (:cmd conn)]
(impl-run conn cmd params)))
(defn run
[conn cmd params]
(let [res (a/chan 1)]
(if (instance? Connection conn)
(-> (run! conn cmd params)
(p/finally (fn [v e]
(if e
(a/offer! res e)
(a/offer! res v)))))
(a/close! res))
res))
(defmethod impl-run :get
[conn _ {:keys [key]}]
(.get ^RedisAsyncCommands conn ^String key))
@ -97,3 +136,21 @@
(-> (.srem ^RedisAsyncCommands conn ^String key ^"[S;" keys)
(p/then resolve-to-bool))))
(defmethod impl-run :publish
[conn _ {:keys [channel message]}]
(-> (.publish ^RedisAsyncCommands conn ^String channel ^String message)
(p/then resolve-to-bool)))
(defmethod impl-run :hset
[^RedisAsyncCommands conn _ {:keys [key field value]}]
(.hset conn key field value))
(defmethod impl-run :hgetall
[^RedisAsyncCommands conn _ {:keys [key]}]
(.hgetall conn key))
(defmethod impl-run :hdel
[^RedisAsyncCommands conn _ {:keys [key field]}]
(let [fields (into-array String [field])]
(.hdel conn key fields)))