diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index 78f1fcdd0..b804b7fac 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -48,8 +48,8 @@ ::output-buff-size ::idle-timeout ::metrics] - :or {input-buff-size 32 - output-buff-size 32 + :or {input-buff-size 64 + output-buff-size 64 idle-timeout 30000} :as options}] (fn [_] @@ -67,10 +67,7 @@ created-at (dt/now) on-terminate - (fn [& [_ error]] - (when (ex/exception? error) - (l/warn :hint (ex-message error) :cause error)) - + (fn [& args] (when (compare-and-set! terminated false true) (call-mtx metrics :connections {:cmd :dec :by 1}) (call-mtx metrics :sessions {:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)}) @@ -80,6 +77,13 @@ (a/close! output-ch) (a/close! input-ch))) + on-error + (fn [_ error] + (on-terminate) + (when-not (or (instance? org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException error) + (instance? java.nio.channels.ClosedChannelException error)) + (l/error :hint (ex-message error) :cause error))) + on-connect (fn [conn] (call-mtx metrics :connections {:cmd :inc :by 1}) @@ -106,16 +110,19 @@ on-message (fn [_ message] (call-mtx metrics :messages {:labels ["recv"]}) - (let [message (t/decode-str message)] - (when-not (a/offer! input-ch message) - (l/warn :hint "drop messages")))) + (try + (let [message (t/decode-str message)] + (a/offer! input-ch message)) + (catch Throwable e + (l/warn :hint "error on decoding incoming message from websocket" + :cause e)))) on-pong (fn [_ buffer] (a/>!! pong-ch buffer))] {:on-connect on-connect - :on-error on-terminate + :on-error on-error :on-close on-terminate :on-text on-message :on-pong on-pong})))) @@ -174,7 +181,7 @@ [{:keys [::conn ::close-ch ::on-close ::pong-ch ::heartbeat-interval ::max-missed-heartbeats] :or {heartbeat-interval 2000 - max-missed-heartbeats 8}}] + max-missed-heartbeats 4}}] (let [beats (atom #{})] (a/go-loop [i 0] (let [[_ port] (a/alts! [close-ch (a/timeout heartbeat-interval)])]