From f60d8c6c9666b82a6074d2335a8f609f320341f6 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 18 Mar 2022 12:36:42 +0100 Subject: [PATCH] :recycle: Refactor websockets subsystem (on backend) - Refactor msgbus subsystem, simplifying many parts. - Enable persistent websocket connection for the all session duration. --- backend/deps.edn | 2 +- backend/src/app/http/websocket.clj | 215 ++++++++---- backend/src/app/main.clj | 1 + backend/src/app/msgbus.clj | 433 ++++++++++++------------ backend/src/app/rpc/mutations/files.clj | 40 +-- backend/src/app/util/async.clj | 7 + backend/src/app/util/websocket.clj | 78 +++-- common/deps.edn | 10 +- common/src/app/common/data/macros.cljc | 5 +- common/src/app/common/logging.cljc | 20 +- common/src/app/common/spec.cljc | 11 + common/src/app/common/uri.cljc | 22 +- 12 files changed, 482 insertions(+), 362 deletions(-) diff --git a/backend/deps.edn b/backend/deps.edn index 438f53356..2f96d95ba 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -19,7 +19,7 @@ io.lettuce/lettuce-core {:mvn/version "6.1.6.RELEASE"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/yetti {:git/tag "v6.0" :git/sha "4c8690e" + funcool/yetti {:git/tag "v8.0" :git/sha "ea7162d" :git/url "https://github.com/funcool/yetti.git" :exclusions [org.slf4j/slf4j-api]} diff --git a/backend/src/app/http/websocket.clj b/backend/src/app/http/websocket.clj index 147e16fef..4b896e22c 100644 --- a/backend/src/app/http/websocket.clj +++ b/backend/src/app/http/websocket.clj @@ -22,51 +22,163 @@ ;; WEBSOCKET HANDLER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare send-presence!) - (defmulti handle-message - (fn [_wsp message] (:type message))) + (fn [_ message] + (:type message))) (defmethod handle-message :connect [wsp _] - (let [{:keys [msgbus file-id team-id session-id ::ws/output-ch]} @wsp - sub-ch (a/chan (a/dropping-buffer 32))] + (l/trace :fn "handle-message" :event :connect) - (swap! wsp assoc :sub-ch sub-ch) + (let [msgbus-fn (:msgbus @wsp) + profile-id (::profile-id @wsp) + session-id (::session-id @wsp) + output-ch (::ws/output-ch @wsp) - ;; Start a subscription forwarding goroutine - (a/go-loop [] - (when-let [val (a/! output-ch val)) - (recur))) - - (a/go - (a/! output-ch message) + (recur))) + + (let [state {:file-id file-id :channel channel :topic file-id}] + (swap! wsp update ::subscriptions assoc subs-id state)) + + (a/go + ;; Subscribe to file topic + (a/ message + (dissoc :subs-id) + (assoc :profile-id profile-id) + (assoc :session-id session-id))] + + (a/ (merge cfg params) - (assoc :profile-id profile-id) - (assoc :team-id (:team-id file)))] + (let [{:keys [session-id]} (us/conform ::handler-params params) + cfg (-> cfg + (assoc ::profile-id profile-id) + (assoc ::session-id session-id))] + (l/trace :hint "http request to websocket" :profile-id profile-id :session-id session-id) (cond (not profile-id) (raise (ex/error :type :authentication :hint "Authentication required.")) - (not file) - (raise (ex/error :type :not-found - :code :object-not-found)) - - (not (yws/upgrade-request? req)) (raise (ex/error :type :validation :code :websocket-request-expected @@ -129,16 +223,3 @@ (->> (ws/handler handle-message cfg) (yws/upgrade req) (respond)))))) - -(def ^:private - sql:retrieve-file - "select f.id as id, - p.team_id as team_id - from file as f - join project as p on (p.id = f.project_id) - where f.id = ?") - -(defn- retrieve-file - [conn id] - (db/exec-one! conn [sql:retrieve-file id])) - diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ec0808a8c..4d785de3d 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -65,6 +65,7 @@ :app.msgbus/msgbus {:backend (cf/get :msgbus-backend :redis) + :executor (ig/ref [::default :app.worker/executor]) :redis-uri (cf/get :redis-uri)} :app.tokens/tokens diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 7d621a352..4bae83abd 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -7,12 +7,15 @@ (ns app.msgbus "The msgbus abstraction implemented using redis as underlying backend." (:require + [app.common.data :as d] [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] + [app.common.transit :as t] [app.config :as cfg] - [app.util.blob :as blob] + [app.util.async :as aa] [app.util.time :as dt] + [app.worker :as wrk] [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] @@ -28,120 +31,83 @@ io.lettuce.core.codec.StringCodec io.lettuce.core.pubsub.RedisPubSubListener io.lettuce.core.pubsub.StatefulRedisPubSubConnection - io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands + io.lettuce.core.pubsub.api.sync.RedisPubSubCommands io.lettuce.core.resource.ClientResources io.lettuce.core.resource.DefaultClientResources java.time.Duration)) +(set! *warn-on-reflection* true) + (def ^:private prefix (cfg/get :tenant)) (defn- prefix-topic [topic] (str prefix "." topic)) -(def xform-prefix (map prefix-topic)) -(def xform-topics (map (fn [m] (update m :topics #(into #{} xform-prefix %))))) -(def xform-topic (map (fn [m] (update m :topic prefix-topic)))) +(def ^:private xform-prefix-topic + (map (fn [obj] (update obj :topic prefix-topic)))) -(s/def ::redis-uri ::us/string) -(s/def ::buffer-size ::us/integer) - -(defmulti init-backend :backend) -(defmulti stop-backend :backend) -(defmulti init-pub-loop :backend) -(defmulti init-sub-loop :backend) - -(defmethod ig/pre-init-spec ::msgbus [_] - (s/keys :opt-un [::buffer-size ::redis-uri])) +(declare ^:private redis-connect) +(declare ^:private redis-disconnect) +(declare ^:private start-io-loop) +(declare ^:private subscribe) +(declare ^:private purge) +(declare ^:private redis-pub) +(declare ^:private redis-sub) +(declare ^:private redis-unsub) (defmethod ig/prep-key ::msgbus [_ cfg] - (merge {:buffer-size 128} cfg)) + (merge {:buffer-size 128 + :timeout (dt/duration {:seconds 30})} + (d/without-nils cfg))) + +(s/def ::timeout ::dt/duration) +(s/def ::redis-uri ::us/string) +(s/def ::buffer-size ::us/integer) + +(defmethod ig/pre-init-spec ::msgbus [_] + (s/keys :req-un [::buffer-size ::redis-uri ::timeout ::wrk/executor])) (defmethod ig/init-key ::msgbus - [_ {:keys [backend buffer-size] :as cfg}] - (l/debug :action "initialize msgbus" - :backend (name backend)) - (let [cfg (init-backend cfg) + [_ {:keys [buffer-size redis-uri] :as cfg}] + (l/info :hint "initialize msgbus" + :buffer-size buffer-size + :redis-uri redis-uri) + (let [cmd-ch (a/chan buffer-size) + rcv-ch (a/chan (a/dropping-buffer buffer-size)) + pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic) + state (agent {} :error-handler #(l/error :cause % :hint "unexpected error on agent" ::l/async false)) + cfg (-> (redis-connect cfg) + (assoc ::cmd-ch cmd-ch) + (assoc ::rcv-ch rcv-ch) + (assoc ::pub-ch pub-ch) + (assoc ::state state))] - ;; Channel used for receive publications from the application. - pub-ch (-> (a/dropping-buffer buffer-size) - (a/chan xform-topic)) - - ;; Channel used for receive subscription requests. - sub-ch (a/chan 1 xform-topics) - - cfg (-> cfg - (assoc ::pub-ch pub-ch) - (assoc ::sub-ch sub-ch))] - - (init-pub-loop cfg) - (init-sub-loop cfg) + (start-io-loop cfg) (with-meta - (fn run - ([command] (run command nil)) - ([command params] - (a/go - (case command - :pub (a/>! pub-ch params) - :sub (a/>! sub-ch params))))) + (fn [& {:keys [cmd] :as params}] + (a/go + (case cmd + :pub (a/>! pub-ch params) + :sub (a/! c message) - (recur state (rest chans)) - (recur (update state topic disj c) - (rest chans))) - state))] - (recur state)) - - :else - (->> (vals state) - (mapcat identity) - (run! a/close!)))))) - - -;; Add a unique listener to connection - -;; --- REDIS BACKEND IMPL - -(declare impl-redis-open?) -(declare impl-redis-pub) -(declare impl-redis-sub) -(declare impl-redis-unsub) - - -(defmethod init-backend :redis - [{:keys [redis-uri] :as cfg}] - (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) +(defn- redis-connect + [{:keys [redis-uri timeout] :as cfg}] + (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE) resources (.. (DefaultClientResources/builder) (ioThreadPoolSize 4) @@ -151,162 +117,181 @@ uri (RedisURI/create redis-uri) rclient (RedisClient/create ^ClientResources resources ^RedisURI uri) - pub-conn (.connect ^RedisClient rclient ^RedisCodec codec) - sub-conn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] + pconn (.connect ^RedisClient rclient ^RedisCodec codec) + sconn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)] - (.setTimeout ^StatefulRedisConnection pub-conn ^Duration (dt/duration {:seconds 10})) - (.setTimeout ^StatefulRedisPubSubConnection sub-conn ^Duration (dt/duration {:seconds 10})) + (.setTimeout ^StatefulRedisConnection pconn ^Duration timeout) + (.setTimeout ^StatefulRedisPubSubConnection sconn ^Duration timeout) (-> cfg (assoc ::resources resources) - (assoc ::pub-conn pub-conn) - (assoc ::sub-conn sub-conn)))) + (assoc ::pconn pconn) + (assoc ::sconn sconn)))) -(defmethod stop-backend :redis - [{:keys [::pub-conn ::sub-conn ::resources] :as cfg}] - (.close ^StatefulRedisConnection pub-conn) - (.close ^StatefulRedisPubSubConnection sub-conn) +(defn- redis-disconnect + [{:keys [::pconn ::sconn ::resources] :as cfg}] + (.. ^StatefulConnection pconn close) + (.. ^StatefulConnection sconn close) (.shutdown ^ClientResources resources)) -(defmethod init-pub-loop :redis - [{:keys [::pub-conn ::pub-ch]}] - (let [rac (.async ^StatefulRedisConnection pub-conn)] - (a/go-loop [] - (when-let [val (a/! ch message) - (recur (rest chans) pending) - (recur (rest chans) (conj pending ch))) - pending))] - (some->> (seq pending) - (send-off chans unsubscribe-channels)) +(defn start-io-loop + [{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}] - (recur)) + ;; Add a single listener to the pubsub connection + (.addListener ^StatefulRedisPubSubConnection sconn + ^RedisPubSubListener (create-listener rcv-ch)) - ;; Stop condition; close all underlying subscriptions and - ;; exit. The close operation is performed asynchronously. - (send-off chans (fn [state] - (->> (vals state) - (mapcat identity) - (filter some?) - (run! a/close!))))))))) + (letfn [(send-to-topic [topic message] + (a/go-loop [chans (seq (get-in @state [:topics topic])) + closed #{}] + (if-let [ch (first chans)] + (if (a/>! ch message) + (recur (rest chans) closed) + (recur (rest chans) (conj closed ch))) + (seq closed)))) + (process-incoming [{:keys [topic message]}] + (a/go + (when-let [closed (a/> (vals state) + (mapcat identity) + (filter some?) + (run! a/close!)) + nil))) -(defn- impl-redis-pub - [^RedisAsyncCommands rac {:keys [topic message]}] - (let [message (blob/encode message) - res (a/chan 1)] - (-> (.publish rac ^String topic ^bytes message) - (p/finally (fn [_ e] - (when e (a/>!! res e)) + (= port rcv-ch) + (do + (a/ (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message) + (p/finally (fn [_ cause] + (when (and cause (.isOpen ^StatefulConnection pconn)) + (a/offer! res cause)) (a/close! res)))) res)) -(defn impl-redis-sub - [^RedisPubSubAsyncCommands rac topic] - (let [res (a/chan 1)] - (-> (.subscribe rac (into-array String [topic])) - (p/finally (fn [_ e] - (when e (a/>!! res e)) - (a/close! res)))) - res)) +(defn redis-sub + "Create redis subscription. Blocking operation, intended to be used + inside an agent." + [{:keys [::sconn] :as cfg} topic] + (let [topic (into-array String [topic]) + scomm (.sync ^StatefulRedisPubSubConnection sconn)] + (.subscribe ^RedisPubSubCommands scomm topic))) -(defn impl-redis-unsub - [rac topic] - (let [res (a/chan 1)] - (-> (.unsubscribe rac (into-array String [topic])) - (p/finally (fn [_ e] - (when e (a/>!! res e)) - (a/close! res)))) - res)) +(defn redis-unsub + "Removes redis subscription. Blocking operation, intended to be used + inside an agent." + [{:keys [::sconn] :as cfg} topic] + (let [topic (into-array String [topic]) + scomm (.sync ^StatefulRedisPubSubConnection sconn)] + (.unsubscribe ^RedisPubSubCommands scomm topic))) diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index 176f55570..d3c605534 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -386,31 +386,33 @@ (assoc :changes [])))))))) (defn- send-notifications - [{:keys [msgbus conn] :as cfg} {:keys [file changes session-id] :as params}] - (let [lchanges (filter library-change? changes)] + [{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}] + (let [lchanges (filter library-change? changes) + msgbus-fn (:msgbus cfg)] + ;; Asynchronously publish message to the msgbus - (msgbus :pub {:topic (:id file) - :message - {:type :file-change - :profile-id (:profile-id params) - :file-id (:id file) - :session-id (:session-id params) - :revn (:revn file) - :changes changes}}) + (msgbus-fn :cmd :pub + :topic (:id file) + :message {:type :file-change + :profile-id (:profile-id params) + :file-id (:id file) + :session-id (:session-id params) + :revn (:revn file) + :changes changes}) (when (and (:is-shared file) (seq lchanges)) (let [team-id (retrieve-team-id conn (:project-id file))] ;; Asynchronously publish message to the msgbus - (msgbus :pub {:topic team-id - :message - {:type :library-change - :profile-id (:profile-id params) - :file-id (:id file) - :session-id session-id - :revn (:revn file) - :modified-at (dt/now) - :changes lchanges}}))))) + (msgbus-fn :cmd :pub + :topic team-id + :message {:type :library-change + :profile-id (:profile-id params) + :file-id (:id file) + :session-id session-id + :revn (:revn file) + :modified-at (dt/now) + :changes lchanges}))))) (defn- retrieve-team-id [conn project-id] diff --git a/backend/src/app/util/async.clj b/backend/src/app/util/async.clj index 0973683e7..96ee32af1 100644 --- a/backend/src/app/util/async.clj +++ b/backend/src/app/util/async.clj @@ -38,6 +38,13 @@ (throw r#) r#))) +(defmacro with-closing + [ch & body] + `(try + ~@body + (finally + (some-> ~ch a/close!)))) + (defn thread-call [^Executor executor f] (let [c (a/chan 1)] diff --git a/backend/src/app/util/websocket.clj b/backend/src/app/util/websocket.clj index 045cd6a7e..e4f8a12fe 100644 --- a/backend/src/app/util/websocket.clj +++ b/backend/src/app/util/websocket.clj @@ -54,16 +54,22 @@ pong-ch (a/chan (a/sliding-buffer 6)) close-ch (a/chan) - options (-> options - (assoc ::input-ch input-ch) - (assoc ::output-ch output-ch) - (assoc ::close-ch close-ch) - (assoc ::channel channel) - (dissoc ::metrics)) + options (atom + (-> options + (assoc ::input-ch input-ch) + (assoc ::output-ch output-ch) + (assoc ::close-ch close-ch) + (assoc ::channel channel) + (dissoc ::metrics))) terminated (atom false) created-at (dt/now) + on-open + (fn [channel] + (mtx/run! metrics {:id :websocket-active-connections :inc 1}) + (yws/idle-timeout! channel (dt/duration idle-timeout))) + on-terminate (fn [& _args] (when (compare-and-set! terminated false true) @@ -79,7 +85,8 @@ (fn [_ error] (on-terminate) ;; TODO: properly log timeout exceptions - (when-not (instance? java.nio.channels.ClosedChannelException error) + (when-not (or (instance? java.nio.channels.ClosedChannelException error) + (instance? java.net.SocketException error)) (l/error :hint (ex-message error) :cause error))) on-message @@ -98,31 +105,28 @@ (fn [_ buffers] (a/>!! pong-ch (yu/copy-many buffers)))] - (mtx/run! metrics {:id :websocket-active-connections :inc 1}) + ;; launch heartbeat process + (-> @options + (assoc ::pong-ch pong-ch) + (assoc ::on-close on-terminate) + (process-heartbeat)) - (let [wsp (atom options)] - ;; Handle heartbeat - (yws/idle-timeout! channel (dt/duration idle-timeout)) - (-> @wsp - (assoc ::pong-ch pong-ch) - (assoc ::on-close on-terminate) - (process-heartbeat)) + ;; Forward all messages from output-ch to the websocket + ;; connection + (a/go-loop [] + (when-let [val (a/! output-ch {:type :error :error {:message (ex-message val)}}) (map? val) - (a/>! output-ch (cond-> val (:request-id request) (assoc :request-id (:request-id request))))) - + (a/>! output-ch (cond-> val (:request-id message) (assoc :request-id (:request-id message))))) (recur)))))) (a/> (re-seq #"([^\%]+)*(\%(\d+)?)?" s) - (remove (fn [[_ seg]] (nil? seg)))) + (remove (fn [[full seg]] (and (nil? seg) (not full))))) result [] index 0] (if-let [[_ segment var? sidx] (first items)] @@ -156,7 +156,8 @@ (recur (rest items) (conj result segment) (inc index))) - result))) + + (remove nil? result)))) (defmacro fmt "String interpolation helper. Can only be used with strings known at diff --git a/common/src/app/common/logging.cljc b/common/src/app/common/logging.cljc index 44cadc35d..ba2598853 100644 --- a/common/src/app/common/logging.cljc +++ b/common/src/app/common/logging.cljc @@ -181,16 +181,18 @@ ~level-sym (get-level ~level)] (when (enabled? ~logger-sym ~level-sym) ~(if async - `(send-off logging-agent - (fn [_#] - (with-context (merge {:id (uuid/next)} - (get-error-context ~cause) - ~context) - (->> (or ~raw (build-map-message ~props)) - (write-log! ~logger-sym ~level-sym ~cause))))) - + `(do + (send-off logging-agent + (fn [_#] + (with-context (into {:id (uuid/next)} + (get-error-context ~cause) + ~context) + (->> (or ~raw (build-map-message ~props)) + (write-log! ~logger-sym ~level-sym ~cause))))) + nil) `(let [message# (or ~raw (build-map-message ~props))] - (write-log! ~logger-sym ~level-sym ~cause message#)))))))) + (write-log! ~logger-sym ~level-sym ~cause message#) + nil))))))) (defmacro info [& params] diff --git a/common/src/app/common/spec.cljc b/common/src/app/common/spec.cljc index 03f3ddc37..5fc4b8d9f 100644 --- a/common/src/app/common/spec.cljc +++ b/common/src/app/common/spec.cljc @@ -16,6 +16,7 @@ ;; because of some strange interaction with cljs.spec.alpha and ;; modules splitting. [app.common.exceptions :as ex] + [app.common.uri :as u] [app.common.uuid :as uuid] [cuerdas.core :as str] [expound.alpha :as expound])) @@ -96,6 +97,7 @@ :else ::s/invalid)) + ;; --- Default Specs (s/def ::keyword (s/conformer keyword-conformer name)) @@ -192,6 +194,15 @@ (fn [v] (str/join " " v)))) +(s/def ::uri + (s/conformer + (fn [s] + (cond + (u/uri? s) s + (string? s) (u/uri s) + :else ::s/invalid)) + str)) + ;; --- SPEC: set-of-str (s/def ::set-of-str diff --git a/common/src/app/common/uri.cljc b/common/src/app/common/uri.cljc index 8560c2a5e..627c0b5d3 100644 --- a/common/src/app/common/uri.cljc +++ b/common/src/app/common/uri.cljc @@ -9,7 +9,9 @@ (:require [app.common.data.macros :as dm] [lambdaisland.uri :as u] - [lambdaisland.uri.normalize :as un])) + [lambdaisland.uri.normalize :as un]) + #?(:clj + (:import lambdaisland.uri.URI))) (dm/export u/uri) (dm/export u/join) @@ -25,6 +27,11 @@ [v] (if (keyword? v) (name v) v)) +(defn get-domain + [{:keys [host port] :as uri}] + (cond-> host + port (str ":" port))) + (defn map->query-string ([params] (map->query-string params nil)) ([params {:keys [value-fn key-fn] @@ -35,3 +42,16 @@ (remove #(nil? (second %))) (map (fn [[k v]] [(key-fn k) (value-fn v)])))) (u/map->query-string)))) + +#?(:clj + (defmethod print-method lambdaisland.uri.URI [^URI this ^java.io.Writer writer] + (.write writer "#") + (.write writer (str u/edn-tag)) + (.write writer " ") + (.write writer (pr-str (.toString this)))) + + :cljs + (extend-type u/URI + IPrintWithWriter + (-pr-writer [this writer _opts] + (write-all writer "#" (str u/edn-tag) " " (pr-str (.toString this))))))