🎉 Add ping message on websocket communication.

Every 30seconds (for reset the default nginx proxy timeout).
This commit is contained in:
Andrey Antukh 2020-02-07 13:12:05 +01:00
parent e805515767
commit 044f8487e2
3 changed files with 22 additions and 15 deletions

View file

@ -20,11 +20,12 @@
[uxbox.util.uuid :as uuid] [uxbox.util.uuid :as uuid]
[vertx.eventbus :as ve] [vertx.eventbus :as ve]
[vertx.http :as vh] [vertx.http :as vh]
[vertx.impl :as vi]
[vertx.util :as vu] [vertx.util :as vu]
[vertx.timers :as vt]
[vertx.web :as vw] [vertx.web :as vw]
[vertx.web.websockets :as ws]) [vertx.web.websockets :as ws])
(:import (:import
java.lang.AutoCloseable
io.vertx.core.Handler io.vertx.core.Handler
io.vertx.core.Promise io.vertx.core.Promise
io.vertx.core.Vertx io.vertx.core.Vertx
@ -91,9 +92,11 @@
ws (assoc ws ws (assoc ws
:user-id user-id :user-id user-id
:file-id file-id) :file-id file-id)
sem (start-eventbus-consumer! ctx ws file-id)] send-ping #(send! ws {:type :ping})
sem1 (start-eventbus-consumer! ctx ws file-id)
sem2 (vt/schedule-periodic! ctx 30000 send-ping)]
(handle-message ws {:type :connect}) (handle-message ws {:type :connect})
(assoc ws ::sem sem))) (assoc ws ::sem1 sem1 ::sem2 sem2)))
(defn- on-text-message (defn- on-text-message
[ws message] [ws message]
@ -106,7 +109,8 @@
(let [file-id (:file-id ws)] (let [file-id (:file-id ws)]
(handle-message ws {:type :disconnect (handle-message ws {:type :disconnect
:file-id file-id}) :file-id file-id})
(.unregister (::sem ws)))) (.close ^AutoCloseable (::sem1 ws))
(.close ^AutoCloseable (::sem2 ws))))
(defn handler (defn handler
[{:keys [user] :as req}] [{:keys [user] :as req}]

View file

@ -7,15 +7,16 @@
(ns vertx.eventbus (ns vertx.eventbus
(:require [promesa.core :as p] (:require [promesa.core :as p]
[vertx.impl :as impl]) [vertx.impl :as impl])
(:import io.vertx.core.Vertx (:import
io.vertx.core.Handler io.vertx.core.Vertx
io.vertx.core.Context io.vertx.core.Handler
io.vertx.core.eventbus.Message io.vertx.core.Context
io.vertx.core.eventbus.MessageConsumer io.vertx.core.eventbus.Message
io.vertx.core.eventbus.DeliveryOptions io.vertx.core.eventbus.MessageConsumer
io.vertx.core.eventbus.EventBus io.vertx.core.eventbus.DeliveryOptions
io.vertx.core.eventbus.MessageCodec io.vertx.core.eventbus.EventBus
java.util.function.Supplier)) io.vertx.core.eventbus.MessageCodec
java.util.function.Supplier))
(declare opts->delivery-opts) (declare opts->delivery-opts)
(declare resolve-eventbus) (declare resolve-eventbus)
@ -36,7 +37,9 @@
(.resume consumer) (.resume consumer)
(.reply msg (or res err) (.reply msg (or res err)
(opts->delivery-opts {})))))))) (opts->delivery-opts {}))))))))
consumer)) (reify java.lang.AutoCloseable
(close [it]
(.unregister consumer)))))
(defn publish! (defn publish!
([vsm topic msg] (publish! vsm topic msg {})) ([vsm topic msg] (publish! vsm topic msg {}))

View file

@ -26,7 +26,7 @@
(close [_] (close [_]
(.cancelTimer system timer-id))))) (.cancelTimer system timer-id)))))
(defn sechdule-periodic! (defn schedule-periodic!
[vsm ms f] [vsm ms f]
(let [^Vertx system (impl/resolve-system vsm) (let [^Vertx system (impl/resolve-system vsm)
^Handler handler (impl/fn->handler (fn [v] (f))) ^Handler handler (impl/fn->handler (fn [v] (f)))