diff --git a/backend/src/uxbox/http/ws.clj b/backend/src/uxbox/http/ws.clj index 64e17f43e..3577ea192 100644 --- a/backend/src/uxbox/http/ws.clj +++ b/backend/src/uxbox/http/ws.clj @@ -20,11 +20,12 @@ [uxbox.util.uuid :as uuid] [vertx.eventbus :as ve] [vertx.http :as vh] - [vertx.impl :as vi] [vertx.util :as vu] + [vertx.timers :as vt] [vertx.web :as vw] [vertx.web.websockets :as ws]) (:import + java.lang.AutoCloseable io.vertx.core.Handler io.vertx.core.Promise io.vertx.core.Vertx @@ -91,9 +92,11 @@ ws (assoc ws :user-id user-id :file-id file-id) - sem (start-eventbus-consumer! ctx ws file-id)] + send-ping #(send! ws {:type :ping}) + sem1 (start-eventbus-consumer! ctx ws file-id) + sem2 (vt/schedule-periodic! ctx 30000 send-ping)] (handle-message ws {:type :connect}) - (assoc ws ::sem sem))) + (assoc ws ::sem1 sem1 ::sem2 sem2))) (defn- on-text-message [ws message] @@ -106,7 +109,8 @@ (let [file-id (:file-id ws)] (handle-message ws {:type :disconnect :file-id file-id}) - (.unregister (::sem ws)))) + (.close ^AutoCloseable (::sem1 ws)) + (.close ^AutoCloseable (::sem2 ws)))) (defn handler [{:keys [user] :as req}] diff --git a/backend/vendor/vertx/src/vertx/eventbus.clj b/backend/vendor/vertx/src/vertx/eventbus.clj index 4415d276c..35238edb8 100644 --- a/backend/vendor/vertx/src/vertx/eventbus.clj +++ b/backend/vendor/vertx/src/vertx/eventbus.clj @@ -7,15 +7,16 @@ (ns vertx.eventbus (:require [promesa.core :as p] [vertx.impl :as impl]) - (:import io.vertx.core.Vertx - io.vertx.core.Handler - io.vertx.core.Context - io.vertx.core.eventbus.Message - io.vertx.core.eventbus.MessageConsumer - io.vertx.core.eventbus.DeliveryOptions - io.vertx.core.eventbus.EventBus - io.vertx.core.eventbus.MessageCodec - java.util.function.Supplier)) + (:import + io.vertx.core.Vertx + io.vertx.core.Handler + io.vertx.core.Context + io.vertx.core.eventbus.Message + io.vertx.core.eventbus.MessageConsumer + io.vertx.core.eventbus.DeliveryOptions + io.vertx.core.eventbus.EventBus + io.vertx.core.eventbus.MessageCodec + java.util.function.Supplier)) (declare opts->delivery-opts) (declare resolve-eventbus) @@ -36,7 +37,9 @@ (.resume consumer) (.reply msg (or res err) (opts->delivery-opts {})))))))) - consumer)) + (reify java.lang.AutoCloseable + (close [it] + (.unregister consumer))))) (defn publish! ([vsm topic msg] (publish! vsm topic msg {})) diff --git a/backend/vendor/vertx/src/vertx/timers.clj b/backend/vendor/vertx/src/vertx/timers.clj index 58ed526bd..ff345a4dd 100644 --- a/backend/vendor/vertx/src/vertx/timers.clj +++ b/backend/vendor/vertx/src/vertx/timers.clj @@ -26,7 +26,7 @@ (close [_] (.cancelTimer system timer-id))))) -(defn sechdule-periodic! +(defn schedule-periodic! [vsm ms f] (let [^Vertx system (impl/resolve-system vsm) ^Handler handler (impl/fn->handler (fn [v] (f)))