🎉 Add more reliable presence mechanism.

This commit is contained in:
Andrey Antukh 2020-05-14 23:41:25 +02:00 committed by Alonso Torres
parent 4b31a147a9
commit 817c22dc3c
8 changed files with 105 additions and 49 deletions

View file

@ -24,8 +24,6 @@ CREATE TRIGGER task__modified_at__tgr
BEFORE UPDATE ON task BEFORE UPDATE ON task
FOR EACH ROW EXECUTE PROCEDURE update_modified_at(); FOR EACH ROW EXECUTE PROCEDURE update_modified_at();
CREATE TABLE scheduled_task ( CREATE TABLE scheduled_task (
id text PRIMARY KEY, id text PRIMARY KEY,

View file

@ -0,0 +1,9 @@
CREATE TABLE presence (
file_id uuid NOT NULL REFERENCES file(id) ON DELETE CASCADE,
profile_id uuid NOT NULL REFERENCES profile(id) ON DELETE CASCADE,
session_id uuid NOT NULL,
updated_at timestamptz NOT NULL DEFAULT clock_timestamp(),
PRIMARY KEY (file_id, session_id, profile_id)
);

View file

@ -29,7 +29,10 @@
:fn (mg/resource "migrations/0004.tasks.sql")} :fn (mg/resource "migrations/0004.tasks.sql")}
{:desc "Initial libraries tables" {:desc "Initial libraries tables"
:name "0005-libraries" :name "0005-libraries"
:fn (mg/resource "migrations/0005.libraries.sql")}]}) :fn (mg/resource "migrations/0005.libraries.sql")}
{:desc "Initial presence tables"
:name "0006-presence"
:fn (mg/resource "migrations/0006.presence.sql")}]})
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Entry point ;; Entry point

View file

@ -14,7 +14,8 @@
[uxbox.common.exceptions :as ex] [uxbox.common.exceptions :as ex]
[uxbox.common.uuid :as uuid] [uxbox.common.uuid :as uuid]
[uxbox.redis :as redis] [uxbox.redis :as redis]
[ring.util.codec :as codec] [uxbox.db :as db]
[uxbox.util.time :as dt]
[uxbox.util.transit :as t])) [uxbox.util.transit :as t]))
(defmacro go-try (defmacro go-try
@ -31,47 +32,55 @@
(throw r#) (throw r#)
r#))) r#)))
(defn- decode-message (defmacro thread-try
[message] [& body]
(->> (t/str->bytes message) `(a/thread
(t/decode))) (try
~@body
(defn- encode-message (catch Throwable e#
[message] e#))))
(->> (t/encode message)
(t/bytes->str)))
;; --- Redis Interactions ;; --- Redis Interactions
(defn- publish (defn- publish
[channel message] [channel message]
(go-try (go-try
(let [message (encode-message message)] (let [message (t/encode-str message)]
(<? (redis/run :publish {:channel (str channel) (<? (redis/run :publish {:channel (str channel)
:message message}))))) :message message})))))
(def ^:private
sql:retrieve-presence
"select * from presence
where file_id=?
and (clock_timestamp() - updated_at) < '5 min'::interval")
(defn- retrieve-presence (defn- retrieve-presence
[key] [file-id]
(go-try (thread-try
(let [data (<? (redis/run :hgetall {:key key}))] (let [rows (db/exec! db/pool [sql:retrieve-presence file-id])]
(into [] (map (fn [[k v]] [(uuid/uuid k) (uuid/uuid v)])) data)))) (mapv (juxt :session-id :profile-id) rows))))
(defn- join-room (def ^:private
[file-id session-id profile-id] sql:update-presence
(let [key (str file-id) "insert into presence (file_id, session_id, profile_id, updated_at)
field (str session-id) values (?, ?, ?, clock_timestamp())
value (str profile-id)] on conflict (file_id, session_id, profile_id)
(go-try do update set updated_at=clock_timestamp()")
(<? (redis/run :hset {:key key :field field :value value}))
(<? (retrieve-presence key)))))
(defn- leave-room (defn- update-presence
[file-id session-id profile-id] [file-id session-id profile-id]
(let [key (str file-id) (thread-try
field (str session-id)] (let [now (dt/now)
(go-try sql [sql:update-presence file-id session-id profile-id]]
(<? (redis/run :hdel {:key key :field field})) (db/exec-one! db/pool sql))))
(<? (retrieve-presence key)))))
(defn- delete-presence
[file-id session-id profile-id]
(thread-try
(db/delete! db/pool :presence {:file-id file-id
:profile-id profile-id
:session-id session-id})))
;; --- WebSocket Messages Handling ;; --- WebSocket Messages Handling
@ -85,29 +94,34 @@
[{:keys [file-id profile-id session-id output] :as ws} message] [{:keys [file-id profile-id session-id output] :as ws} message]
(log/info (str "profile " profile-id " is connected to " file-id)) (log/info (str "profile " profile-id " is connected to " file-id))
(go-try (go-try
(let [members (<? (join-room file-id session-id profile-id))] (<? (update-presence file-id session-id profile-id))
(<? (publish file-id {:type :presence :sessions members}))))) (let [members (<? (retrieve-presence file-id))]
(<? (publish file-id {:type :presence :sessions members})))))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
[{:keys [profile-id file-id session-id] :as ws} message] [{:keys [profile-id file-id session-id] :as ws} message]
(log/info (str "profile " profile-id " is disconnected from " file-id)) (log/info (str "profile " profile-id " is disconnected from " file-id))
(go-try (go-try
(let [members (<? (leave-room file-id session-id profile-id))] (<? (delete-presence file-id session-id profile-id))
(let [members (<? (retrieve-presence file-id))]
(<? (publish file-id {:type :presence :sessions members}))))) (<? (publish file-id {:type :presence :sessions members})))))
(defmethod handle-message :keepalive
[{:keys [profile-id file-id session-id] :as ws} message]
(update-presence file-id session-id profile-id))
(defmethod handle-message :pointer-update
[{:keys [profile-id file-id session-id] :as ws} message]
(let [message (assoc message
:profile-id profile-id
:session-id session-id)]
(publish file-id message)))
(defmethod handle-message :default (defmethod handle-message :default
[ws message] [ws message]
(a/go (a/go
(log/warn (str "received unexpected message: " message)))) (log/warn (str "received unexpected message: " message))))
(defmethod handle-message :pointer-update
[{:keys [profile-id file-id session-id] :as ws} message]
(go-try
(let [message (assoc message
:profile-id profile-id
:session-id session-id)]
(<? (publish file-id message)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; WebSocket Handler ;; WebSocket Handler
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -176,7 +190,7 @@
(a/go-loop [] (a/go-loop []
(let [val (a/<! out)] (let [val (a/<! out)]
(when-not (nil? val) (when-not (nil? val)
(jetty/send! conn (encode-message val)) (jetty/send! conn (t/encode-str val))
(recur))))) (recur)))))
(defn websocket (defn websocket
@ -184,7 +198,7 @@
(let [in (a/chan 32) (let [in (a/chan 32)
out (a/chan 32)] out (a/chan 32)]
{:on-connect (fn [conn] {:on-connect (fn [conn]
(let [xf (map decode-message) (let [xf (map t/decode-str)
sub (redis/subscribe (str file-id) xf) sub (redis/subscribe (str file-id) xf)
ws (WebSocket. conn in out sub nil params)] ws (WebSocket. conn in out sub nil params)]
(start-rcv-loop! ws) (start-rcv-loop! ws)
@ -203,7 +217,7 @@
(a/close! in)) (a/close! in))
:on-text (fn [ws message] :on-text (fn [ws message]
(let [message (decode-message message)] (let [message (t/decode-str message)]
;; (prn "websocket" :on-text message) ;; (prn "websocket" :on-text message)
(a/>!! in message))) (a/>!! in message)))

View file

@ -72,7 +72,7 @@
(defn migrate! (defn migrate!
"Main entry point for apply a migration." "Main entry point for apply a migration."
([conn migrations] ([conn migrations]
(migrate! conn migrations nil)) (impl-migrate conn migrations nil))
([conn migrations options] ([conn migrations options]
(impl-migrate conn migrations options))) (impl-migrate conn migrations options)))

View file

@ -54,9 +54,16 @@
(integer? ms-or-obj) (integer? ms-or-obj)
(Duration/ofMillis ms-or-obj) (Duration/ofMillis ms-or-obj)
(string? ms-or-obj)
(Duration/parse ms-or-obj)
:else :else
(obj->duration ms-or-obj))) (obj->duration ms-or-obj)))
(defn duration-between
[t1 t2]
(Duration/between t1 t2))
(defn parse-duration (defn parse-duration
[s] [s]
(assert (string? s)) (assert (string? s))

View file

@ -52,6 +52,9 @@
;; --- High-Level Api ;; --- High-Level Api
(declare str->bytes)
(declare bytes->str)
(defn decode (defn decode
([data] ([data]
(decode data nil)) (decode data nil))
@ -68,6 +71,16 @@
(write! w data) (write! w data)
(.toByteArray out))))) (.toByteArray out)))))
(defn decode-str
[message]
(->> (str->bytes message)
(decode)))
(defn encode-str
[message]
(->> (encode message)
(bytes->str)))
;; --- Helpers ;; --- Helpers
(defn str->bytes (defn str->bytes
@ -83,4 +96,3 @@
(bytes->str data "UTF-8")) (bytes->str data "UTF-8"))
([^bytes data, ^String encoding] ([^bytes data, ^String encoding]
(String. data encoding))) (String. data encoding)))

View file

@ -28,6 +28,7 @@
(declare handle-pointer-update) (declare handle-pointer-update)
(declare handle-page-change) (declare handle-page-change)
(declare handle-pointer-send) (declare handle-pointer-send)
(declare send-keepalive)
(s/def ::type keyword?) (s/def ::type keyword?)
(s/def ::message (s/def ::message
@ -46,8 +47,11 @@
ptk/WatchEvent ptk/WatchEvent
(watch [_ state stream] (watch [_ state stream]
(let [wsession (get-in state [:ws file-id]) (let [wsession (get-in state [:ws file-id])
stoper (rx/filter #(= ::finalize %) stream)] stoper (rx/filter #(= ::finalize %) stream)
interval (* 1000 60)]
(->> (rx/merge (->> (rx/merge
(->> (rx/timer interval interval)
(rx/map #(send-keepalive file-id)))
(->> (ws/-stream wsession) (->> (ws/-stream wsession)
(rx/filter #(= :message (:type %))) (rx/filter #(= :message (:type %)))
(rx/map (comp t/decode :payload)) (rx/map (comp t/decode :payload))
@ -66,6 +70,15 @@
(rx/take-until stoper)))))) (rx/take-until stoper))))))
(defn send-keepalive
[file-id]
(ptk/reify ::send-keepalive
ptk/EffectEvent
(effect [_ state stream]
(prn "send-keepalive" file-id)
(when-let [ws (get-in state [:ws file-id])]
(ws/-send ws (t/encode {:type :keepalive}))))))
;; --- Finalize Websocket ;; --- Finalize Websocket
(defn finalize (defn finalize