♻️ Refactor cocurrency model on backend

Mainly the followin changes:

- Pass majority of code to the old and plain synchronous style
  and start using virtual threads for the RPC (and partially some
  HTTP server middlewares).
- Make some improvements on how CLIMIT is handled, simplifying code
- Improve considerably performance reducing the reflection and
  unnecesary funcion calls on the whole stack-trace of an RPC call.
- Improve efficiency reducing considerably the total threads number.
This commit is contained in:
Andrey Antukh 2023-03-02 16:57:28 +01:00
parent 2e717882f1
commit aafbf6bc15
47 changed files with 1409 additions and 1477 deletions

View file

@ -8,11 +8,13 @@
"The msgbus abstraction implemented using redis as underlying backend."
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
[app.metrics :as mtx]
[app.redis.script :as-alias rscript]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.core :as c]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
@ -45,6 +47,10 @@
io.lettuce.core.resource.DefaultClientResources
io.netty.util.HashedWheelTimer
io.netty.util.Timer
java.util.function.Function
com.github.benmanes.caffeine.cache.Cache
com.github.benmanes.caffeine.cache.Caffeine
com.github.benmanes.caffeine.cache.RemovalListener
java.lang.AutoCloseable
java.time.Duration))
@ -88,7 +94,7 @@
(s/def ::connect? ::us/boolean)
(s/def ::io-threads ::us/integer)
(s/def ::worker-threads ::us/integer)
(s/def ::cache #(instance? clojure.lang.Atom %))
(s/def ::cache some?)
(s/def ::redis
(s/keys :req [::resources
@ -130,6 +136,20 @@
(def string-codec
(RedisCodec/of StringCodec/UTF8 StringCodec/UTF8))
(defn- create-cache
[{:keys [::wrk/executor] :as cfg}]
(let [listener (reify RemovalListener
(onRemoval [_ key cache cause]
(l/trace :hint "cache: remove" :key key :reason (str cause) :repr (pr-str cache))
(some-> cache d/close!)))
]
(.. (Caffeine/newBuilder)
(weakValues)
(executor executor)
(removalListener listener)
(build))))
(defn- initialize-resources
"Initialize redis connection resources"
[{:keys [::uri ::io-threads ::worker-threads ::connect?] :as cfg}]
@ -146,17 +166,18 @@
(timer ^Timer timer)
(build))
redis-uri (RedisURI/create ^String uri)]
redis-uri (RedisURI/create ^String uri)
cfg (-> cfg
(assoc ::resources resources)
(assoc ::timer timer)
(assoc ::redis-uri redis-uri))]
(-> cfg
(assoc ::resources resources)
(assoc ::timer timer)
(assoc ::cache (atom {}))
(assoc ::redis-uri redis-uri))))
(assoc cfg ::cache (create-cache cfg))))
(defn- shutdown-resources
[{:keys [::resources ::cache ::timer]}]
(run! d/close! (vals @cache))
(.invalidateAll ^Cache cache)
(when resources
(.shutdown ^ClientResources resources))
(when timer
@ -174,6 +195,7 @@
:default (.connect ^RedisClient client ^RedisCodec codec)
:pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))]
(l/trc :hint "connect" :hid (hash client))
(.setTimeout ^StatefulConnection conn ^Duration timeout)
(reify
IDeref
@ -181,8 +203,9 @@
AutoCloseable
(close [_]
(.close ^StatefulConnection conn)
(.shutdown ^RedisClient client)))))
(ex/ignoring (.close ^StatefulConnection conn))
(ex/ignoring (.shutdown ^RedisClient client))
(l/trc :hint "disconnect" :hid (hash client))))))
(defn connect
[state & {:as opts}]
@ -195,15 +218,16 @@
(defn get-or-connect
[{:keys [::cache] :as state} key options]
(us/assert! ::redis state)
(-> state
(assoc ::connection
(or (get @cache key)
(-> (swap! cache (fn [cache]
(when-let [prev (get cache key)]
(d/close! prev))
(assoc cache key (connect* state options))))
(get key))))
(dissoc ::cache)))
;; FIXME: the cache causes vthread pinning
(let [connection (.get ^Cache cache
^Object key
^Function (reify
Function
(apply [_ _key]
(connect* state options))))]
(-> state
(dissoc ::cache)
(assoc ::connection connection))))
(defn add-listener!
[{:keys [::connection] :as conn} listener]
@ -345,7 +369,7 @@
(do
(l/error :hint "no script found" :name sname :cause cause)
(->> (load-script)
(p/mapcat eval-script)))
(p/mcat eval-script)))
(if-let [on-error (::rscript/on-error script)]
(on-error cause)
(p/rejected cause))))
@ -376,15 +400,16 @@
(load-script []
(l/trace :hint "load script" :name sname)
(->> (.scriptLoad ^RedisScriptingAsyncCommands cmd
^String (read-script))
(p/map (fn [sha]
(swap! scripts-cache assoc sname sha)
sha))))]
^String (read-script))
(p/fmap (fn [sha]
(swap! scripts-cache assoc sname sha)
sha))))]
(if-let [sha (get @scripts-cache sname)]
(eval-script sha)
(->> (load-script)
(p/mapcat eval-script))))))
(p/await!
(if-let [sha (get @scripts-cache sname)]
(eval-script sha)
(->> (load-script)
(p/mapcat eval-script)))))))
(defn timeout-exception?
[cause]