♻️ Reimplement workspace presence state.

Remove the use of the database for presence state.
This commit is contained in:
Andrey Antukh 2021-03-18 09:11:53 +01:00 committed by Andrés Moya
parent a16f4393b9
commit e15d93e8a4
4 changed files with 91 additions and 116 deletions

View file

@ -194,6 +194,7 @@
;; --- CONNECTION INIT ;; --- CONNECTION INIT
(declare send-presence)
(declare handle-message) (declare handle-message)
(declare start-loop!) (declare start-loop!)
@ -211,7 +212,7 @@
(.disconnect session))))))) (.disconnect session)))))))
(defn- start-loop! (defn- start-loop!
[{:keys [rcv-ch out-ch sub-ch session-id] :as cfg}] [{:keys [rcv-ch out-ch sub-ch session-id profile-id] :as cfg}]
(aa/go-try (aa/go-try
(loop [] (loop []
(let [timeout (a/timeout 30000) (let [timeout (a/timeout 30000)
@ -224,11 +225,17 @@
(aa/<? (handle-message cfg val)) (aa/<? (handle-message cfg val))
(recur)) (recur))
;; If message comes from subscription channel; we just need
;; to foreward it to the output channel. ;; Process message coming from pubsub.
(and (= port sub-ch) (some? val)) (and (= port sub-ch) (some? val))
(do (do
(when-not (= (:session-id val) session-id) (when-not (= (:session-id val) session-id)
;; If we receive a connect message of other user, we need
;; to send an update presence to all participants.
(when (= :connect (:type val))
(a/<! (send-presence cfg)))
;; Then, just forward the message
(a/>! out-ch val)) (a/>! out-ch val))
(recur)) (recur))
@ -243,51 +250,14 @@
:else :else
nil))))) nil)))))
;; --- PRESENCE HANDLING API (defn send-presence
([cfg] (send-presence cfg :presence))
(def ^:private ([{:keys [msgbus session-id profile-id file-id]} type]
sql:retrieve-presence (a/go
"select * from presence (a/<! (msgbus :pub {:topic file-id
where file_id=? :message {:type type
and (clock_timestamp() - updated_at) < '5 min'::interval") :session-id session-id
:profile-id profile-id}})))))
(def ^:private
sql:update-presence
"insert into presence (file_id, session_id, profile_id, updated_at)
values (?, ?, ?, clock_timestamp())
on conflict (file_id, session_id, profile_id)
do update set updated_at=clock_timestamp()")
(defn- retrieve-presence
[{:keys [pool file-id] :as cfg}]
(let [rows (db/exec! pool [sql:retrieve-presence file-id])]
(mapv (juxt :session-id :profile-id) rows)))
(defn- retrieve-presence*
[{:keys [executor] :as cfg}]
(aa/with-thread executor
(retrieve-presence cfg)))
(defn- update-presence
[{:keys [pool file-id session-id profile-id] :as cfg}]
(let [sql [sql:update-presence file-id session-id profile-id]]
(db/exec-one! pool sql)))
(defn- update-presence*
[{:keys [executor] :as cfg}]
(aa/with-thread executor
(update-presence cfg)))
(defn- delete-presence
[{:keys [pool file-id session-id profile-id] :as cfg}]
(db/delete! pool :presence {:file-id file-id
:profile-id profile-id
:session-id session-id}))
(defn- delete-presence*
[{:keys [executor] :as cfg}]
(aa/with-thread executor
(delete-presence cfg)))
;; --- INCOMING MSG PROCESSING ;; --- INCOMING MSG PROCESSING
@ -297,24 +267,16 @@
(defmethod handle-message :connect (defmethod handle-message :connect
[{:keys [file-id msgbus] :as cfg} _message] [{:keys [file-id msgbus] :as cfg} _message]
;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id) ;; (log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
(aa/go-try (send-presence cfg :connect))
(aa/<? (update-presence* cfg))
(let [members (aa/<? (retrieve-presence* cfg))
val {:topic file-id :message {:type :presence :sessions members}}]
(a/<! (msgbus :pub val)))))
(defmethod handle-message :disconnect (defmethod handle-message :disconnect
[{:keys [file-id msgbus] :as cfg} _message] [{:keys [file-id msgbus] :as cfg} _message]
;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id) ;; (log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id)
(aa/go-try (send-presence cfg :disconnect))
(aa/<? (delete-presence* cfg))
(let [members (aa/<? (retrieve-presence* cfg))
val {:topic file-id :message {:type :presence :sessions members}}]
(a/<! (msgbus :pub val)))))
(defmethod handle-message :keepalive (defmethod handle-message :keepalive
[cfg _message] [cfg _message]
(update-presence* cfg)) (a/go (do :nothing)))
(defmethod handle-message :pointer-update (defmethod handle-message :pointer-update
[{:keys [profile-id file-id session-id msgbus] :as cfg} message] [{:keys [profile-id file-id session-id msgbus] :as cfg} message]

View file

@ -50,12 +50,13 @@
;; --- Profile Fetched ;; --- Profile Fetched
(defn profile-fetched (defn profile-fetched
[{:keys [fullname] :as data}] [{:keys [fullname id] :as data}]
(us/verify ::profile data) (us/verify ::profile data)
(ptk/reify ::profile-fetched (ptk/reify ::profile-fetched
ptk/UpdateEvent ptk/UpdateEvent
(update [_ state] (update [_ state]
(-> state (-> state
(assoc :profile-id id)
(assoc :profile data) (assoc :profile data)
;; Safeguard if the profile is loaded after teams ;; Safeguard if the profile is loaded after teams
(assoc-in [:profile :teams] (get-in state [:profile :teams])))) (assoc-in [:profile :teams] (get-in state [:profile :teams]))))

View file

@ -70,6 +70,10 @@
(rx/filter #(s/valid? ::message %)) (rx/filter #(s/valid? ::message %))
(rx/map process-message)) (rx/map process-message))
(rx/of (handle-presence {:type :connect
:session-id (:session-id state)
:profile-id (:profile-id state)}))
;; Send back to backend all pointer messages. ;; Send back to backend all pointer messages.
(->> stream (->> stream
(rx/filter ms/pointer-event?) (rx/filter ms/pointer-event?)
@ -80,9 +84,11 @@
(defn- process-message (defn- process-message
[{:keys [type] :as msg}] [{:keys [type] :as msg}]
(case type (case type
:presence (handle-presence msg) :connect (handle-presence msg)
:presence (handle-presence msg)
:disconnect (handle-presence msg)
:pointer-update (handle-pointer-update msg) :pointer-update (handle-pointer-update msg)
:file-change (handle-file-change msg) :file-change (handle-file-change msg)
:library-change (handle-library-change msg) :library-change (handle-library-change msg)
::unknown)) ::unknown))
@ -136,41 +142,41 @@
}) })
(defn handle-presence (defn handle-presence
[{:keys [sessions] :as message}] [{:keys [type session-id profile-id] :as message}]
(letfn [(assign-color [sessions session] (letfn [(get-next-color [presence]
(if (string? (:color session)) (let [xfm (comp (map second)
session (map :color)
(let [used (into #{} (remove nil?))
(comp (map second) used (into #{} xfm presence)
(map :color) avail (set/difference presence-palette used)]
(remove nil?)) (or (first avail) "#000000")))
sessions)
avail (set/difference presence-palette used)
color (or (first avail) "#000000")]
(assoc session :color color))))
(assign-session [sessions {:keys [id profile]}] (update-color [color presence]
(let [session {:id id (if (some? color)
:fullname (:fullname profile) color
:updated-at (dt/now) (get-next-color presence)))
:photo-uri (cfg/resolve-profile-photo-url profile)}
session (assign-color sessions session)]
(assoc sessions id session)))
(update-sessions [previous profiles] (update-sesion [session presence]
(let [previous (select-keys previous (map first sessions)) ; Initial clearing (-> session
pending (->> sessions (assoc :id session-id)
(filter #(not (contains? previous (first %)))) (assoc :profile-id profile-id)
(map (fn [[session-id profile-id]] (assoc :updated-at (dt/now))
{:id session-id (update :color update-color presence)))
:profile (get profiles profile-id)})))]
(reduce assign-session previous pending)))] (update-presence [presence]
(-> presence
(update session-id update-sesion presence)
(d/without-nils)))
]
(ptk/reify ::handle-presence (ptk/reify ::handle-presence
ptk/UpdateEvent ptk/UpdateEvent
(update [_ state] (update [_ state]
(let [profiles (:users state)] ;; (let [profiles (:users state)]
(update state :workspace-presence update-sessions profiles)))))) (if (= :disconnect type)
(update state :workspace-presence dissoc session-id)
(update state :workspace-presence update-presence))))))
(defn handle-pointer-update (defn handle-pointer-update
[{:keys [page-id profile-id session-id x y] :as msg}] [{:keys [page-id profile-id session-id x y] :as msg}]

View file

@ -5,18 +5,19 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0. ;; defined by the Mozilla Public License, v. 2.0.
;; ;;
;; Copyright (c) 2020 UXBOX Labs SL ;; Copyright (c) UXBOX Labs SL
(ns app.main.ui.workspace.presence (ns app.main.ui.workspace.presence
(:require (:require
[rumext.alpha :as mf] [app.config :as cfg]
[cuerdas.core :as str]
[beicon.core :as rx]
[app.main.refs :as refs] [app.main.refs :as refs]
[app.main.store :as st] [app.main.store :as st]
[app.util.router :as rt]
[app.util.time :as dt] [app.util.time :as dt]
[app.util.timers :as tm] [app.util.timers :as ts]
[app.util.router :as rt])) [beicon.core :as rx]
[cuerdas.core :as str]
[rumext.alpha :as mf]))
(def pointer-icon-path (def pointer-icon-path
(str "M5.292 4.027L1.524.26l-.05-.01L0 0l.258 1.524 3.769 3.768zm-.45 " (str "M5.292 4.027L1.524.26l-.05-.01L0 0l.258 1.524 3.769 3.768zm-.45 "
@ -24,7 +25,7 @@
"3.39 3.39zM1.192.526l-.668.667L.431.646.64.43l.552.094z")) "3.39 3.39zM1.192.526l-.668.667L.431.646.64.43l.552.094z"))
(mf/defc session-cursor (mf/defc session-cursor
[{:keys [session] :as props}] [{:keys [session profile] :as props}]
(let [zoom (mf/deref refs/selected-zoom) (let [zoom (mf/deref refs/selected-zoom)
point (:point session) point (:point session)
color (:color session "#000000") color (:color session "#000000")
@ -32,7 +33,7 @@
[:g.multiuser-cursor {:transform transform} [:g.multiuser-cursor {:transform transform}
[:path {:fill color [:path {:fill color
:d pointer-icon-path :d pointer-icon-path
:font-family "sans-serif"}] }]
[:g {:transform "translate(0 -291.708)"} [:g {:transform "translate(0 -291.708)"}
[:rect {:width 25 [:rect {:width 25
:height 5 :height 5
@ -50,53 +51,58 @@
:overflow "hidden" :overflow "hidden"
:fill "#fff" :fill "#fff"
:stroke-width 1 :stroke-width 1
:font-family "Open Sans" :font-family "Works Sans"
:font-size 3 :font-size 3
:font-weight 400 :font-weight 400
:letter-spacing 0 :letter-spacing 0
:style { :line-height 1.25 } :style { :line-height 1.25 }
:word-spacing 0} :word-spacing 0}
(str (str/slice (:fullname session) 0 14) (str (str/slice (:fullname profile) 0 14)
(when (> (count (:fullname session)) 14) "..."))]]])) (when (> (count (:fullname profile)) 14) "..."))]]]))
(mf/defc active-cursors (mf/defc active-cursors
{::mf/wrap [mf/memo]} {::mf/wrap [mf/memo]}
[{:keys [page-id] :as props}] [{:keys [page-id] :as props}]
(let [counter (mf/use-state 0) (let [counter (mf/use-state 0)
users (mf/deref refs/users)
sessions (mf/deref refs/workspace-presence) sessions (mf/deref refs/workspace-presence)
sessions (->> (vals sessions) sessions (->> (vals sessions)
(filter #(= page-id (:page-id %))) (filter #(= page-id (:page-id %)))
(filter #(>= 3000 (- (inst-ms (dt/now)) (inst-ms (:updated-at %))))))] (filter #(>= 5000 (- (inst-ms (dt/now)) (inst-ms (:updated-at %))))))]
(mf/use-effect (mf/use-effect
nil nil
(fn [] (fn []
(let [sem (tm/schedule 1000 #(swap! counter inc))] (let [sem (ts/schedule 1000 #(swap! counter inc))]
(fn [] (rx/dispose! sem))))) (fn [] (rx/dispose! sem)))))
(for [session sessions] (for [session sessions]
(when (:point session) (when (:point session)
[:& session-cursor {:session session :key (:id session)}])))) [:& session-cursor {:session session
:profile (get users (:profile-id session))
:key (:id session)}]))))
;; --- SESSION WIDGET
(mf/defc session-widget (mf/defc session-widget
[{:keys [session self?] :as props}] [{:keys [session self? profile] :as props}]
(let [photo (:photo-uri session "/images/avatar.jpg")] [:li.tooltip.tooltip-bottom
[:li.tooltip.tooltip-bottom {:alt (:fullname profile)
{:alt (:fullname session) :on-click (when self? (st/emitf (rt/navigate :settings/profile)))}
:on-click (when self? [:img {:style {:border-color (:color session)}
#(st/emit! (rt/navigate :settings/profile)))} :src (cfg/resolve-profile-photo-url profile)}]])
[:img {:style {:border-color (:color session)}
:src photo}]]))
(mf/defc active-sessions (mf/defc active-sessions
{::mf/wrap [mf/memo]} {::mf/wrap [mf/memo]}
[] []
(let [profile (mf/deref refs/profile) (let [profile (mf/deref refs/profile)
sessions (mf/deref refs/workspace-presence)] users (mf/deref refs/users)
presence (mf/deref refs/workspace-presence)]
[:ul.active-users [:ul.active-users
(for [session (vals sessions)] (for [session (vals presence)]
[:& session-widget [:& session-widget
{:session session {:session session
:self? (= (:id session) (:id profile)) :profile (get users (:profile-id session))
:self? (= (:profile-id session) (:id profile))
:key (:id session)}])])) :key (:id session)}])]))