Adds a worker message buffer for selection queries

This commit is contained in:
alonso.torres 2021-03-30 14:31:57 +02:00 committed by Andrey Antukh
parent d83d241c39
commit 6891826c78
4 changed files with 128 additions and 32 deletions

View file

@ -96,7 +96,8 @@
(mf/deps page-id) (mf/deps page-id)
(fn [point] (fn [point]
(let [rect (gsh/center->rect point 8 8)] (let [rect (gsh/center->rect point 8 8)]
(uw/ask! {:cmd :selection/query (uw/ask-buffered!
{:cmd :selection/query
:page-id page-id :page-id page-id
:rect rect :rect rect
:include-frames? true})))) :include-frames? true}))))

View file

@ -25,3 +25,7 @@
(defn ask! (defn ask!
[message] [message]
(uw/ask! instance message)) (uw/ask! instance message))
(defn ask-buffered!
[message]
(uw/ask-buffered! instance message))

View file

@ -17,39 +17,58 @@
(declare handle-response) (declare handle-response)
(defrecord Worker [instance stream]) (defrecord Worker [instance stream])
(defn ask! (defn- send-message! [worker {sender-id :sender-id :as message}]
[w message] (let [data (t/encode message)
(let [sender-id (uuid/next) instance (:instance worker)]
data (t/encode {:payload message :sender-id sender-id})
instance (:instance w)]
(.postMessage instance data) (.postMessage instance data)
(->> (:stream w) (->> (:stream worker)
(rx/filter #(= (:reply-to %) sender-id)) (rx/filter #(= (:reply-to %) sender-id))
(rx/map handle-response) (rx/take 1)
(rx/first)))) (rx/map handle-response))))
(defn ask!
[worker message]
(send-message!
worker
{:sender-id (uuid/next)
:payload message}))
(defn ask-buffered!
[worker message]
(send-message!
worker
{:sender-id (uuid/next)
:payload message
:buffer? true}))
(defn init (defn init
"Return a initialized webworker instance." "Return a initialized webworker instance."
[path on-error] [path on-error]
(let [ins (js/Worker. path) (let [instance (js/Worker. path)
bus (rx/subject) bus (rx/subject)
wrk (Worker. ins bus)] worker (Worker. instance bus)
(.addEventListener ins "message"
handle-message
(fn [event] (fn [event]
(let [data (.-data event) (let [data (.-data event)
data (t/decode data)] data (t/decode data)]
(if (:error data) (if (:error data)
(on-error (:error data)) (on-error (:error data))
(rx/push! bus data))))) (rx/push! bus data))))
(.addEventListener ins "error"
(fn [error]
(on-error wrk (.-data error))))
wrk)) handle-error
(fn [error]
(on-error worker (.-data error)))]
(.addEventListener instance "message" handle-message)
(.addEventListener instance "error" handle-error)
worker))
(defn- handle-response (defn- handle-response
[{:keys [payload error] :as response}] [{:keys [payload error dropped] :as response}]
(when-not dropped
(if-let [{:keys [data message]} error] (if-let [{:keys [data message]} error]
(throw (ex-info message data)) (throw (ex-info message data))
payload)) payload)))

View file

@ -28,14 +28,23 @@
;; --- Messages Handling ;; --- Messages Handling
(s/def ::cmd keyword?) (s/def ::cmd keyword?)
(s/def ::payload (s/def ::payload
(s/keys :req-un [::cmd])) (s/keys :req-un [::cmd]))
(s/def ::sender-id uuid?) (s/def ::sender-id uuid?)
(s/def ::buffer? boolean?)
(s/def ::message (s/def ::message
(s/keys :req-un [::payload ::sender-id])) (s/keys
:req-opt [::buffer?]
:req-un [::payload ::sender-id]))
(def buffer (rx/subject))
(defn- handle-message (defn- handle-message
"Process the message and returns to the client"
[{:keys [sender-id payload] :as message}] [{:keys [sender-id payload] :as message}]
(us/assert ::message message) (us/assert ::message message)
(try (try
@ -68,20 +77,83 @@
:message (ex-message e)}}] :message (ex-message e)}}]
(.postMessage js/self (t/encode message)))))) (.postMessage js/self (t/encode message))))))
(defn- drop-message
"Sends to the client a notifiction that its messages have been dropped"
[{:keys [sender-id payload] :as message}]
(us/assert ::message message)
(.postMessage js/self (t/encode {:reply-to sender-id
:dropped true})))
(defn subscribe-buffer-messages
"Creates a subscription to process the buffer messages"
[]
(let [empty [{} [] ::clear]]
(->> buffer
;; We want async processing to not block the main loop
(rx/observe-on :async)
;; This scan will store the last message per type in `messages`
;; when a previous message is dropped is stored in `dropped`
;; we also store the last message processed in order to detect
;; posible infinite loops
(rx/scan
(fn [[messages dropped last] message]
(let [cmd (get-in message [:payload :cmd])
;; The previous message is dropped
dropped
(cond-> dropped
(contains? messages cmd)
(conj (get messages cmd)))
;; This is the new "head" for its type
messages
(assoc messages cmd message)]
;; When a "clear" message is detected we empty the buffer
(if (= message ::clear)
empty
[messages dropped message])))
empty)
;; 1ms debounce, after 1ms without messages will process the buffer
(rx/debounce 1)
(rx/subs (fn [[messages dropped last]]
;; Send back the dropped messages replies
(doseq [msg dropped]
(drop-message msg))
;; Process the message
(doseq [msg (vals messages)]
(handle-message msg))
;; After process the buffer we send a clear
(when-not (= last ::clear)
(rx/push! buffer ::clear)))))))
(defonce process-message-sub (subscribe-buffer-messages))
(defn- on-message (defn- on-message
[event] [event]
(when (nil? (.-source event)) (when (nil? (.-source event))
(let [message (.-data event) (let [message (.-data event)
message (t/decode message)] message (t/decode message)]
(handle-message message)))) (if (:buffer? message)
(rx/push! buffer message)
(handle-message message)))))
(.addEventListener js/self "message" on-message) (.addEventListener js/self "message" on-message)
(defn ^:dev/before-load stop [] (defn ^:dev/before-load stop []
(rx/-dispose process-message-sub)
(.removeEventListener js/self "message" on-message)) (.removeEventListener js/self "message" on-message))
(defn ^:dev/after-load start [] (defn ^:dev/after-load start []
[] []
(set! process-message-sub (subscribe-buffer-messages))
(.addEventListener js/self "message" on-message)) (.addEventListener js/self "message" on-message))