♻️ Make the worker abstraction more scalable

Start using redis for dispatcher to worker communication
and add the ability to start multiple threads to worker
for increase the concurrency.
This commit is contained in:
Andrey Antukh 2022-11-22 18:06:24 +01:00
parent 13a092b192
commit 0600b2abe4
16 changed files with 827 additions and 625 deletions

View file

@ -21,13 +21,19 @@
[promesa.core :as p])
(:import
clojure.lang.IDeref
clojure.lang.MapEntry
io.lettuce.core.KeyValue
io.lettuce.core.RedisClient
io.lettuce.core.RedisCommandInterruptedException
io.lettuce.core.RedisCommandTimeoutException
io.lettuce.core.RedisException
io.lettuce.core.RedisURI
io.lettuce.core.ScriptOutputType
io.lettuce.core.api.StatefulConnection
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.api.async.RedisScriptingAsyncCommands
io.lettuce.core.api.sync.RedisCommands
io.lettuce.core.codec.ByteArrayCodec
io.lettuce.core.codec.RedisCodec
io.lettuce.core.codec.StringCodec
@ -45,8 +51,7 @@
(declare initialize-resources)
(declare shutdown-resources)
(declare connect)
(declare close!)
(declare connect*)
(s/def ::timer
#(instance? Timer %))
@ -82,32 +87,37 @@
(s/def ::connect? ::us/boolean)
(s/def ::io-threads ::us/integer)
(s/def ::worker-threads ::us/integer)
(s/def ::cache #(instance? clojure.lang.Atom %))
(s/def ::redis
(s/keys :req [::resources ::redis-uri ::timer ::mtx/metrics]
:opt [::connection]))
(defmethod ig/pre-init-spec ::redis [_]
(s/keys :req-un [::uri ::mtx/metrics]
:opt-un [::timeout
::connect?
::io-threads
::worker-threads]))
(s/keys :req [::resources
::redis-uri
::timer
::mtx/metrics]
:opt [::connection
::cache]))
(defmethod ig/prep-key ::redis
[_ cfg]
(let [runtime (Runtime/getRuntime)
cpus (.availableProcessors ^Runtime runtime)]
(merge {:timeout (dt/duration 5000)
:io-threads (max 3 cpus)
:worker-threads (max 3 cpus)}
(merge {::timeout (dt/duration "10s")
::io-threads (max 3 cpus)
::worker-threads (max 3 cpus)}
(d/without-nils cfg))))
(defmethod ig/pre-init-spec ::redis [_]
(s/keys :req [::uri ::mtx/metrics]
:opt [::timeout
::connect?
::io-threads
::worker-threads]))
(defmethod ig/init-key ::redis
[_ {:keys [connect?] :as cfg}]
(let [cfg (initialize-resources cfg)]
(cond-> cfg
connect? (assoc ::connection (connect cfg)))))
[_ {:keys [::connect?] :as cfg}]
(let [state (initialize-resources cfg)]
(cond-> state
connect? (assoc ::connection (connect* cfg {})))))
(defmethod ig/halt-key! ::redis
[_ state]
@ -121,7 +131,7 @@
(defn- initialize-resources
"Initialize redis connection resources"
[{:keys [uri io-threads worker-threads connect? metrics] :as cfg}]
[{:keys [::uri ::io-threads ::worker-threads ::connect?] :as cfg}]
(l/info :hint "initialize redis resources"
:uri uri
:io-threads io-threads
@ -138,53 +148,60 @@
redis-uri (RedisURI/create ^String uri)]
(-> cfg
(assoc ::mtx/metrics metrics)
(assoc ::cache (atom {}))
(assoc ::resources resources)
(assoc ::timer timer)
(assoc ::redis-uri redis-uri)
(assoc ::resources resources))))
(assoc ::cache (atom {}))
(assoc ::redis-uri redis-uri))))
(defn- shutdown-resources
[{:keys [::resources ::cache ::timer]}]
(run! close! (vals @cache))
(run! d/close! (vals @cache))
(when resources
(.shutdown ^ClientResources resources))
(when timer
(.stop ^Timer timer)))
(defn connect
(defn connect*
[{:keys [::resources ::redis-uri] :as state}
& {:keys [timeout codec type]
:or {codec default-codec type :default}}]
{:keys [timeout codec type]
:or {codec default-codec type :default}}]
(us/assert! ::resources resources)
(let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri)
timeout (or timeout (:timeout state))
timeout (or timeout (::timeout state))
conn (case type
:default (.connect ^RedisClient client ^RedisCodec codec)
:pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))]
(.setTimeout ^StatefulConnection conn ^Duration timeout)
(assoc state ::connection
(reify
IDeref
(deref [_] conn)
(reify
IDeref
(deref [_] conn)
AutoCloseable
(close [_]
(.close ^StatefulConnection conn)
(.shutdown ^RedisClient client))))))
AutoCloseable
(close [_]
(.close ^StatefulConnection conn)
(.shutdown ^RedisClient client)))))
(defn connect
[state & {:as opts}]
(let [connection (connect* state opts)]
(-> state
(assoc ::connection connection)
(dissoc ::cache)
(vary-meta assoc `d/close! (fn [_] (d/close! connection))))))
(defn get-or-connect
[{:keys [::cache] :as state} key options]
(assoc state ::connection
(or (get @cache key)
(-> (swap! cache (fn [cache]
(when-let [prev (get cache key)]
(close! prev))
(assoc cache key (connect state options))))
(get key)))))
(-> state
(assoc ::connection
(or (get @cache key)
(-> (swap! cache (fn [cache]
(when-let [prev (get cache key)]
(d/close! prev))
(assoc cache key (connect* state options))))
(get key))))
(dissoc ::cache)))
(defn add-listener!
[{:keys [::connection] :as conn} listener]
@ -210,18 +227,63 @@
[{:keys [::connection] :as conn} & topics]
(us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.subscribe ^RedisPubSubCommands cmd topics)))
(try
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.subscribe ^RedisPubSubCommands cmd topics))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn unsubscribe!
"Blocking operation, intended to be used on a thread/agent thread."
[{:keys [::connection] :as conn} & topics]
(us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.unsubscribe ^RedisPubSubCommands cmd topics)))
(try
(let [topics (into-array String (map str topics))
cmd (.sync ^StatefulRedisPubSubConnection @connection)]
(.unsubscribe ^RedisPubSubCommands cmd topics))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn rpush!
[{:keys [::connection] :as conn} key payload]
(us/assert! ::connection-holder conn)
(us/assert! (or (and (vector? payload)
(every? bytes? payload))
(bytes? payload)))
(try
(let [cmd (.sync ^StatefulRedisConnection @connection)
data (if (vector? payload) payload [payload])
vals (make-array (. Class (forName "[B")) (count data))]
(loop [i 0 xs (seq data)]
(when xs
(aset ^"[[B" vals i ^bytes (first xs))
(recur (inc i) (next xs))))
(.rpush ^RedisCommands cmd
^String key
^"[[B" vals))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn blpop!
[{:keys [::connection] :as conn} timeout & keys]
(us/assert! ::connection-holder conn)
(try
(let [keys (into-array Object (map str keys))
cmd (.sync ^StatefulRedisConnection @connection)
timeout (/ (double (inst-ms timeout)) 1000.0)]
(when-let [res (.blpop ^RedisCommands cmd
^double timeout
^"[Ljava.lang.String;" keys)]
(MapEntry/create
(.getKey ^KeyValue res)
(.getValue ^KeyValue res))))
(catch RedisCommandInterruptedException cause
(throw (InterruptedException. (ex-message cause))))))
(defn open?
[{:keys [::connection] :as conn}]
@ -256,12 +318,6 @@
(when on-unsubscribe
(on-unsubscribe nil topic count)))))
(defn close!
[{:keys [::connection] :as conn}]
(us/assert! ::connection-holder conn)
(us/assert! ::connection connection)
(.close ^AutoCloseable connection))
(def ^:private scripts-cache (atom {}))
(def noop-fn (constantly nil))
@ -332,3 +388,11 @@
(eval-script sha)
(->> (load-script)
(p/mapcat eval-script))))))
(defn timeout-exception?
[cause]
(instance? RedisCommandTimeoutException cause))
(defn exception?
[cause]
(instance? RedisException cause))