🚧 More work on collaborative edition (in real time).

This commit is contained in:
Andrey Antukh 2019-12-19 13:13:08 +01:00
parent 34291fc4b4
commit 758c70f7c3
8 changed files with 282 additions and 110 deletions

View file

@ -48,7 +48,7 @@
(fn [ws message] (:type message))) (fn [ws message] (:type message)))
(defmethod handle-message :connect (defmethod handle-message :connect
[ws {:keys [file-id user-id] :as message}] [{:keys [file-id user-id] :as ws} message]
(let [local (swap! state assoc-in [file-id user-id] ws) (let [local (swap! state assoc-in [file-id user-id] ws)
sessions (get local file-id) sessions (get local file-id)
message {:type :who :users (set (keys sessions))}] message {:type :who :users (set (keys sessions))}]
@ -66,22 +66,35 @@
(let [users (keys (get @state file-id))] (let [users (keys (get @state file-id))]
(send! ws {:type :who :users (set users)}))) (send! ws {:type :who :users (set users)})))
;; --- Handler (defmethod handle-message :pointer-update
[{:keys [user-id file-id] :as ws} message]
(let [sessions (->> (vals (get @state file-id))
(remove #(= user-id (:user-id %))))
message (assoc message :user-id user-id)]
(run! #(send! % message) sessions)))
(declare start-eventbus-consumer!) (defn- on-eventbus-message
[{:keys [file-id user-id] :as ws} {:keys [body] :as message}]
(send! ws body))
(defn- start-eventbus-consumer!
[vsm ws fid]
(let [topic (str "internal.uxbox.file." fid)]
(ve/consumer vsm topic #(on-eventbus-message ws %2))))
;; --- Handler
(defn handler (defn handler
[{:keys [user] :as req}] [{:keys [user] :as req}]
(letfn [(on-init [ws] (letfn [(on-init [ws]
(let [vsm (::vw/execution-context req) (let [vsm (::vw/execution-context req)
fid (get-in req [:path-params :file-id]) fid (get-in req [:path-params :file-id])
ws (assoc ws
:user-id user
:file-id fid)
sem (start-eventbus-consumer! vsm ws fid)] sem (start-eventbus-consumer! vsm ws fid)]
(handle-message ws {:type :connect})
(handle-message ws {:type :connect :file-id fid :user-id user}) (assoc ws ::sem sem)))
(assoc ws
::sem sem
:user-id user
:file-id fid)))
(on-message [ws message] (on-message [ws message]
(try (try
@ -103,16 +116,6 @@
:on-message on-message :on-message on-message
:on-close on-close)))) :on-close on-close))))
(defn- on-eventbus-message
[ws {:keys [body] :as message}]
;; TODO
(ws-send! ws body))
(defn- start-eventbus-consumer!
[vsm ws fid]
(let [topic (str "internal.uxbox.file." fid)]
(ve/consumer vsm topic #(on-eventbus-message ws %2))))
;; --- Internal (vertx api) (experimental) ;; --- Internal (vertx api) (experimental)
(defrecord WebSocket [on-init on-message on-close] (defrecord WebSocket [on-init on-message on-close]

View file

@ -8,17 +8,18 @@
(:require (:require
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[promesa.core :as p] [promesa.core :as p]
[uxbox.common.pages :as cp]
[uxbox.db :as db] [uxbox.db :as db]
[uxbox.services.mutations :as sm] [uxbox.services.mutations :as sm]
[uxbox.services.mutations.project-files :as files] [uxbox.services.mutations.project-files :as files]
[uxbox.services.queries.project-pages :refer [decode-row]] [uxbox.services.queries.project-pages :refer [decode-row]]
[uxbox.services.util :as su] [uxbox.services.util :as su]
[uxbox.common.pages :as cp]
[uxbox.util.exceptions :as ex]
[uxbox.util.blob :as blob] [uxbox.util.blob :as blob]
[uxbox.util.sql :as sql] [uxbox.util.exceptions :as ex]
[uxbox.util.spec :as us] [uxbox.util.spec :as us]
[uxbox.util.uuid :as uuid])) [uxbox.util.sql :as sql]
[uxbox.util.uuid :as uuid]
[vertx.eventbus :as ve]))
;; --- Helpers & Specs ;; --- Helpers & Specs
@ -100,7 +101,7 @@
[conn {:keys [user-id id version data operations]}] [conn {:keys [user-id id version data operations]}]
(let [sql "insert into project_page_snapshots (user_id, page_id, version, data, operations) (let [sql "insert into project_page_snapshots (user_id, page_id, version, data, operations)
values ($1, $2, $3, $4, $5) values ($1, $2, $3, $4, $5)
returning id, version, operations"] returning id, page_id, user_id, version, operations"]
(db/query-one conn [sql user-id id version data operations]))) (db/query-one conn [sql user-id id version data operations])))
;; --- Mutation: Rename Page ;; --- Mutation: Rename Page
@ -169,7 +170,14 @@
(-> (update-page-data conn page) (-> (update-page-data conn page)
(p/then (fn [_] (insert-page-snapshot conn page))) (p/then (fn [_] (insert-page-snapshot conn page)))
(p/then (fn [s] (retrieve-lagged-operations conn s params)))))) (p/then (fn [s]
(let [topic (str "internal.uxbox.file." (:file-id page))]
(p/do! (ve/publish! uxbox.core/system topic {:type :page-snapshot
:user-id (:user-id s)
:page-id (:page-id s)
:version (:version s)
:operations ops})
(retrieve-lagged-operations conn s params))))))))
(su/defstr sql:lagged-snapshots (su/defstr sql:lagged-snapshots
"select s.id, s.operations "select s.id, s.operations
@ -182,7 +190,7 @@
(let [sql sql:lagged-snapshots] (let [sql sql:lagged-snapshots]
(-> (db/query conn [sql (:id params) (:version params) #_(:id snapshot)]) (-> (db/query conn [sql (:id params) (:version params) #_(:id snapshot)])
(p/then (fn [rows] (p/then (fn [rows]
{:id (:id params) {:page-id (:id params)
:version (:version snapshot) :version (:version snapshot)
:operations (into [] (comp (map decode-row) :operations (into [] (comp (map decode-row)
(map :operations) (map :operations)

View file

@ -30,7 +30,9 @@
[uxbox.util.spec :as us] [uxbox.util.spec :as us]
[uxbox.util.transit :as t] [uxbox.util.transit :as t]
[uxbox.util.time :as dt] [uxbox.util.time :as dt]
[uxbox.util.uuid :as uuid])) [uxbox.util.uuid :as uuid]
[vendor.randomcolor]))
;; TODO: temporal workaround ;; TODO: temporal workaround
(def clear-ruler nil) (def clear-ruler nil)
@ -113,6 +115,128 @@
(defn interrupt? [e] (= e :interrupt)) (defn interrupt? [e] (= e :interrupt))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Websockets Events
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; --- Initialize WebSocket
(declare fetch-users)
(declare handle-who)
(declare handle-pointer-update)
(declare handle-page-snapshot)
(declare shapes-changes-commited)
(s/def ::type keyword?)
(s/def ::message
(s/keys :req-un [::type]))
(defn initialize-ws
[file-id]
(ptk/reify ::initialize
ptk/UpdateEvent
(update [_ state]
(let [uri (str "ws://localhost:6060/sub/" file-id)]
(assoc-in state [:ws file-id] (ws/open uri))))
ptk/WatchEvent
(watch [_ state stream]
(let [wsession (get-in state [:ws file-id])]
(->> (rx/merge
(rx/of (fetch-users file-id))
(->> (ws/-stream wsession)
(rx/filter #(= :message (:type %)))
(rx/map (comp t/decode :payload))
(rx/filter #(s/valid? ::message %))
(rx/map (fn [{:keys [type] :as msg}]
(case type
:who (handle-who msg)
:pointer-update (handle-pointer-update msg)
:page-snapshot (handle-page-snapshot msg)
::unknown)))))
(rx/take-until
(rx/filter #(= ::finalize %) stream)))))))
;; --- Finalize Websocket
(defn finalize-ws
[file-id]
(ptk/reify ::finalize
ptk/WatchEvent
(watch [_ state stream]
(ws/-close (get-in state [:ws file-id]))
(rx/of ::finalize))))
;; --- Fetch Workspace Users
(declare users-fetched)
(defn fetch-users
[file-id]
(ptk/reify ::fetch-users
ptk/WatchEvent
(watch [_ state stream]
(->> (rp/query :project-file-users {:file-id file-id})
(rx/map users-fetched)))))
(defn users-fetched
[users]
(ptk/reify ::users-fetched
ptk/UpdateEvent
(update [_ state]
(reduce (fn [state user]
(update-in state [:workspace-users :by-id (:id user)] merge user))
state
users))))
;; --- Handle: Who
;; TODO: assign color
(defn- assign-user-color
[state user-id]
(let [user (get-in state [:workspace-users :by-id user-id])
color (js/randomcolor)
user (if (string? (:color user))
user
(assoc user :color color))]
(prn "assign-user-color" user-id)
(assoc-in state [:workspace-users :by-id user-id] user)))
(defn handle-who
[{:keys [users] :as msg}]
(s/assert set? users)
(ptk/reify ::handle-who
ptk/UpdateEvent
(update [_ state]
(prn "handle-who" users)
(as-> state $$
(assoc-in $$ [:workspace-users :active] users)
(reduce assign-user-color $$ users)))))
(defn handle-pointer-update
[{:keys [user-id page-id x y] :as msg}]
(ptk/reify ::handle-pointer-update
ptk/UpdateEvent
(update [_ state]
(assoc-in state [:workspace-users :pointer user-id]
{:page-id page-id
:user-id user-id
:x x
:y y}))))
(defn handle-page-snapshot
[{:keys [user-id page-id version operations] :as msg}]
(ptk/reify ::handle-page-snapshot
ptk/WatchEvent
(watch [_ state stream]
(let [local (:workspace-local state)]
(when (= (:page-id local) page-id)
(prn "handle-page-snapshot" msg)
(rx/of (shapes-changes-commited msg)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; General workspace events ;; General workspace events
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -132,7 +256,6 @@
:tooltip nil}) :tooltip nil})
(declare initialized) (declare initialized)
;; (declare watch-events)
(defn initialize (defn initialize
"Initialize the workspace state." "Initialize the workspace state."
@ -147,9 +270,6 @@
:page-id page-id)] :page-id page-id)]
(-> state (-> state
(assoc :workspace-layout default-layout) (assoc :workspace-layout default-layout)
;; (update :workspace-layout
;; (fn [data]
;; (if (nil? data) default-layout data)))
(assoc :workspace-local local)))) (assoc :workspace-local local))))
ptk/WatchEvent ptk/WatchEvent
@ -172,16 +292,18 @@
(rx/mapcat #(rx/of (initialized file-id page-id) (rx/mapcat #(rx/of (initialized file-id page-id)
#_(initialize-alignment page-id)))) #_(initialize-alignment page-id))))
;; When workspace is initialized, run the event watchers. (->> stream
(->> (rx/filter (ptk/type? ::initialized) stream) (rx/filter uxbox.main.ui.workspace.streams/pointer-event?)
(rx/take 1) (rx/sample 150)
(rx/ignore)))) (rx/tap (fn [{:keys [pt] :as event}]
(let [msg {:type :pointer-update
:page-id page-id
:x (:x pt)
:y (:y pt)}]
(ws/-send (get-in state [:ws file-id]) (t/encode msg)))))
(rx/ignore)
(rx/take-until (rx/filter #(= ::stop-watcher %) stream)))))))
ptk/EffectEvent
(effect [_ state stream]
;; Optimistic prefetch of projects if them are not already fetched
#_(when-not (seq (:projects state))
(st/emit! (dp/fetch-projects))))))
(defn- initialized (defn- initialized
[file-id page-id] [file-id page-id]
@ -212,7 +334,6 @@
(disj flags flag) (disj flags flag)
(conj flags flag))))))) (conj flags flag)))))))
;; --- Workspace Flags ;; --- Workspace Flags
(defn activate-flag (defn activate-flag
@ -235,7 +356,6 @@
(update [_ state] (update [_ state]
(update-in state [:workspace-local :flags] disj flag)))) (update-in state [:workspace-local :flags] disj flag))))
(defn toggle-flag (defn toggle-flag
[flag] [flag]
(s/assert keyword? flag) (s/assert keyword? flag)
@ -851,8 +971,6 @@
(rx/of (commit-shapes-changes changes) (rx/of (commit-shapes-changes changes)
#(dissoc state ::tmp-changes))))))) #(dissoc state ::tmp-changes)))))))
(declare shapes-changes-commited)
(defn commit-shapes-changes (defn commit-shapes-changes
[operations] [operations]
(s/assert ::cp/operations operations) (s/assert ::cp/operations operations)
@ -871,21 +989,23 @@
:version (:version page) :version (:version page)
:operations operations}] :operations operations}]
(->> (rp/mutation :update-project-page params) (->> (rp/mutation :update-project-page params)
(rx/tap #(prn "KAKAKAKA" %))
(rx/map shapes-changes-commited)))))) (rx/map shapes-changes-commited))))))
(s/def ::shapes-changes-commited (s/def ::shapes-changes-commited
(s/keys :req-un [::id ::version ::cp/operations])) (s/keys :req-un [::page-id ::version ::cp/operations]))
(defn shapes-changes-commited (defn shapes-changes-commited
[{:keys [id version operations] :as params}] [{:keys [page-id version operations] :as params}]
(prn "shapes-changes-commited" params)
(s/assert ::shapes-changes-commited params) (s/assert ::shapes-changes-commited params)
(ptk/reify ::shapes-changes-commited (ptk/reify ::shapes-changes-commited
ptk/UpdateEvent ptk/UpdateEvent
(update [_ state] (update [_ state]
(-> state (-> state
(assoc-in [:workspace-page :version] version) (assoc-in [:workspace-page :version] version)
(assoc-in [:pages id :version] version) (assoc-in [:pages page-id :version] version)
(update-in [:pages-data id] cp/process-ops operations) (update-in [:pages-data page-id] cp/process-ops operations)
(update :workspace-data cp/process-ops operations))))) (update :workspace-data cp/process-ops operations)))))
;; --- Start shape "edition mode" ;; --- Start shape "edition mode"

View file

@ -24,6 +24,8 @@
(declare fetch-users) (declare fetch-users)
(declare handle-who) (declare handle-who)
(declare handle-pointer-update)
(declare handle-page-snapshot)
(s/def ::type keyword?) (s/def ::type keyword?)
(s/def ::message (s/def ::message
@ -35,29 +37,37 @@
ptk/UpdateEvent ptk/UpdateEvent
(update [_ state] (update [_ state]
(let [uri (str "ws://localhost:6060/sub/" file-id)] (let [uri (str "ws://localhost:6060/sub/" file-id)]
(assoc-in state [::ws file-id] (ws/open uri)))) (assoc-in state [:ws file-id] (ws/open uri))))
ptk/WatchEvent ptk/WatchEvent
(watch [_ state stream] (watch [_ state stream]
(rx/merge (let [wsession (get-in state [:ws file-id])]
(rx/of (fetch-users file-id)) (->> (rx/merge
(->> (ws/-stream (get-in state [::ws file-id])) (rx/of (fetch-users file-id))
(rx/filter #(= :message (:type %))) (->> (ws/-stream wsession)
(rx/map (comp t/decode :payload)) (rx/filter #(= :message (:type %)))
(rx/filter #(s/valid? ::message %)) (rx/map (comp t/decode :payload))
(rx/map (fn [{:keys [type] :as msg}] (rx/filter #(s/valid? ::message %))
(case type (rx/map (fn [{:keys [type] :as msg}]
:who (handle-who msg) (case type
::unknown)))))))) :who (handle-who msg)
:pointer-update (handle-pointer-update msg)
:page-snapshot (handle-page-snapshot msg)
::unknown)))))
(rx/take-until
(rx/filter #(= ::finalize %) stream)))))))
;; --- Finalize Websocket ;; --- Finalize Websocket
(defn finalize (defn finalize
[file-id] [file-id]
(ptk/reify ::finalize (ptk/reify ::finalize
ptk/EffectEvent ptk/WatchEvent
(effect [_ state stream] (watch [_ state stream]
(ws/-close (get-in state [::ws file-id]))))) (ws/-close (get-in state [:ws file-id]))
(rx/of ::finalize))))
;; --- Fetch Workspace Users ;; --- Fetch Workspace Users
@ -93,3 +103,25 @@
ptk/UpdateEvent ptk/UpdateEvent
(update [_ state] (update [_ state]
(assoc-in state [:workspace-users :active] users)))) (assoc-in state [:workspace-users :active] users))))
(defn handle-pointer-update
[{:keys [user-id page-id x y] :as msg}]
(ptk/reify ::handle-pointer-update
ptk/UpdateEvent
(update [_ state]
(assoc-in state [:workspace-users :pointer user-id]
{:page-id page-id
:user-id user-id
:x x
:y y}))))
(defn handle-page-snapshot
[{:keys [user-id page-id version operations :as msg]}]
(ptk/reify ::handle-page-snapshot
ptk/UpdateEvent
(update [_ state]
(-> state
(assoc-in [:workspace-page :version] version)
(assoc-in [:pages page-id :version] version)
(update-in [:pages-data page-id] cp/process-ops operations)
(update :workspace-data cp/process-ops operations)))))

View file

@ -96,6 +96,12 @@
(mf/defc workspace (mf/defc workspace
[{:keys [file-id page-id] :as props}] [{:keys [file-id page-id] :as props}]
(mf/use-effect
{:deps #js [(str file-id)]
:fn (fn []
(st/emit! (dw/initialize-ws file-id))
#(st/emit! (dw/finalize-ws file-id)))})
(mf/use-effect (mf/use-effect
{:deps #js [(str file-id) {:deps #js [(str file-id)
(str page-id)] (str page-id)]
@ -104,12 +110,6 @@
(st/emit! (dw/initialize file-id page-id)) (st/emit! (dw/initialize file-id page-id))
#(rx/cancel! sub)))}) #(rx/cancel! sub)))})
(mf/use-effect
{:deps #js [(str file-id)]
:fn (fn []
(st/emit! (dws/initialize file-id))
#(st/emit! (dws/finalize file-id)))})
(let [layout (mf/deref refs/workspace-layout) (let [layout (mf/deref refs/workspace-layout)
file (mf/deref refs/workspace-file) file (mf/deref refs/workspace-file)
page (mf/deref refs/workspace-page) page (mf/deref refs/workspace-page)

View file

@ -41,24 +41,25 @@
;; --- Header Users ;; --- Header Users
(mf/defc user-item (mf/defc user-widget
[{:keys [user self?] :as props}] [{:keys [user self?] :as props}]
[:li.tooltip.tooltip-bottom [:li.tooltip.tooltip-bottom
{:alt (:fullname user) {:alt (:fullname user)
:on-click (when self? :on-click (when self?
#(st/emit! (rt/navigate :settings/profile)))} #(st/emit! (rt/navigate :settings/profile)))}
[:img {:src "/images/avatar.jpg"}]]) [:img {:style {:border-color (:color user)}
:src "/images/avatar.jpg"}]])
(mf/defc users-list (mf/defc active-users
[props] [props]
(let [profile (mf/deref refs/profile) (let [profile (mf/deref refs/profile)
users (mf/deref refs/workspace-users)] users (mf/deref refs/workspace-users)]
[:ul.user-multi [:ul.user-multi
[:& user-item {:user profile :self? true}] [:& user-widget {:user profile :self? true}]
(for [id (->> (:active users) (for [id (->> (:active users)
(remove #(= % (:id profile))))] (remove #(= % (:id profile))))]
[:& user-item {:user (get-in users [:by-id id]) [:& user-widget {:user (get-in users [:by-id id])
:key id}])])) :key id}])]))
;; --- Header Component ;; --- Header Component
@ -80,7 +81,7 @@
:on-click #(st/emit! (dw/toggle-layout-flag :sitemap))} :on-click #(st/emit! (dw/toggle-layout-flag :sitemap))}
[:span (:project-name file) " / " (:name file)]] [:span (:project-name file) " / " (:name file)]]
[:& users-list] [:& active-users]
[:div.workspace-options [:div.workspace-options
[:ul.options-btn [:ul.options-btn

View file

@ -143,7 +143,7 @@
;; --- Viewport ;; --- Viewport
(declare remote-user-cursor) (declare remote-user-cursors)
(mf/defc canvas-and-shapes (mf/defc canvas-and-shapes
{:wrap [mf/wrap-memo]} {:wrap [mf/wrap-memo]}
@ -295,42 +295,51 @@
;; -- METER CURSOR MULTIUSUARIO ;; -- METER CURSOR MULTIUSUARIO
;;[:& remote-user-cursor] [:& remote-user-cursors {:page page}]
[:& selrect {:data (:selrect local)}]]]))) [:& selrect {:data (:selrect local)}]]])))
(mf/defc remote-user-cursor (mf/defc remote-user-cursor
[props] [{:keys [pointer user] :as props}]
[:g.multiuser-cursor #_{:transform "translate(100, 100) scale(2)"} [:g.multiuser-cursor {:key (:user-id pointer)
[:svg {:x "100" :transform (str "translate(" (:x pointer) "," (:y pointer) ") scale(4)")}
:y "100" [:path {:fill (:color user)
:style {:fill "#000"} :d "M5.292 4.027L1.524.26l-.05-.01L0 0l.258 1.524 3.769 3.768zm-.45 0l-.313.314L1.139.95l.314-.314zm-.5.5l-.315.316-3.39-3.39.315-.315 3.39 3.39zM1.192.526l-.668.667L.431.646.64.43l.552.094z"
:width "106.824" :font-family "sans-serif"}]
:height "20.176" [:g {:transform "translate(0 -291.708)"}
:viewBox "0 0 28.264 5.338"} [:rect {:width "21.415"
[:path {:d "M5.292 4.027L1.524.26l-.05-.01L0 0l.258 1.524 3.769 3.768zm-.45 0l-.313.314L1.139.95l.314-.314zm-.5.5l-.315.316-3.39-3.39.315-.315 3.39 3.39zM1.192.526l-.668.667L.431.646.64.43l.552.094z" :height "5.292"
:font-family "sans-serif"}] :x "6.849"
[:g {:transform "translate(0 -291.708)"} :y "291.755"
[:rect {:width "21.415" :fill (:color user)
:height "5.292" :fill-opacity ".893"
:x "6.849" :paint-order "stroke fill markers"
:y "291.755" :rx ".794"
:fill-opacity ".893" :ry ".794"}]
:paint-order "stroke fill markers" [:text {:x "9.811"
:rx ".794" :y "295.216"
:ry ".794"}] :fill "#fff"
[:text {:x "9.811" :stroke-width ".265"
:y "295.216" :font-family "Open Sans"
:fill "#fff" :font-size"2.91"
:stroke-width ".265" :font-weight "400"
:font-family "Open Sans" :letter-spacing"0"
:font-size"2.91" :style {:line-height "1.25"}
:font-weight "400" :word-spacing "0"
:letter-spacing"0" ;; :style="line-height:1
:line-height "1.25" }
:word-spacing "0" (:fullname user)]]])
;; :style="line-height:1
} (mf/defc remote-user-cursors
"User 2"]]]]) [{:keys [page] :as props}]
(let [users (mf/deref refs/workspace-users)
pointers (->> (vals (:pointer users))
(remove #(not= (:id page) (:page-id %)))
(filter #((:active users) (:user-id %))))]
(for [pointer pointers]
(let [user (get-in users [:by-id (:user-id pointer)])]
[:& remote-user-cursor {:pointer pointer
:user user
:key (:user-id pointer)}]))))

View file

@ -48,4 +48,3 @@
(ev/unlistenByKey lk1) (ev/unlistenByKey lk1)
(ev/unlistenByKey lk2) (ev/unlistenByKey lk2)
(ev/unlistenByKey lk3))))) (ev/unlistenByKey lk3)))))