🎉 Add optional rate limit support for RPC calls

This commit is contained in:
Andrey Antukh 2022-08-30 14:26:54 +02:00
parent 47b745592b
commit ec3651d85b
37 changed files with 1003 additions and 333 deletions

View file

@ -13,28 +13,14 @@
[app.common.spec :as us]
[app.common.transit :as t]
[app.config :as cfg]
[app.redis :as redis]
[app.util.async :as aa]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p])
(:import
io.lettuce.core.RedisClient
io.lettuce.core.RedisURI
io.lettuce.core.api.StatefulConnection
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.codec.ByteArrayCodec
io.lettuce.core.codec.RedisCodec
io.lettuce.core.codec.StringCodec
io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
io.lettuce.core.resource.ClientResources
io.lettuce.core.resource.DefaultClientResources
java.time.Duration))
[promesa.core :as p]))
(set! *warn-on-reflection* true)
@ -62,18 +48,14 @@
:timeout (dt/duration {:seconds 30})}
(d/without-nils cfg)))
(s/def ::timeout ::dt/duration)
(s/def ::redis-uri ::us/string)
(s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_]
(s/keys :req-un [::buffer-size ::redis-uri ::timeout ::wrk/executor]))
(s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor]))
(defmethod ig/init-key ::msgbus
[_ {:keys [buffer-size redis-uri] :as cfg}]
(l/info :hint "initialize msgbus"
:buffer-size buffer-size
:redis-uri redis-uri)
[_ {:keys [buffer-size] :as cfg}]
(l/info :hint "initialize msgbus" :buffer-size buffer-size)
(let [cmd-ch (a/chan buffer-size)
rcv-ch (a/chan (a/dropping-buffer buffer-size))
pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic)
@ -106,33 +88,17 @@
;; --- IMPL
(defn- redis-connect
[{:keys [redis-uri timeout] :as cfg}]
(let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)
resources (.. (DefaultClientResources/builder)
(ioThreadPoolSize 4)
(computationThreadPoolSize 4)
(build))
uri (RedisURI/create redis-uri)
rclient (RedisClient/create ^ClientResources resources ^RedisURI uri)
pconn (.connect ^RedisClient rclient ^RedisCodec codec)
sconn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)]
(.setTimeout ^StatefulRedisConnection pconn ^Duration timeout)
(.setTimeout ^StatefulRedisPubSubConnection sconn ^Duration timeout)
[{:keys [timeout redis] :as cfg}]
(let [pconn (redis/connect redis :timeout timeout)
sconn (redis/connect redis :type :pubsub :timeout timeout)]
(-> cfg
(assoc ::resources resources)
(assoc ::pconn pconn)
(assoc ::sconn sconn))))
(defn- redis-disconnect
[{:keys [::pconn ::sconn ::resources] :as cfg}]
(.. ^StatefulConnection pconn close)
(.. ^StatefulConnection sconn close)
(.shutdown ^ClientResources resources))
[{:keys [::pconn ::sconn] :as cfg}]
(redis/close! pconn)
(redis/close! sconn))
(defn- conj-subscription
"A low level function that is responsible to create on-demand
@ -204,27 +170,18 @@
(defn- create-listener
[rcv-ch]
(reify RedisPubSubListener
(message [_ _pattern _topic _message])
(message [_ topic message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(let [val {:topic topic :message (t/decode message)}]
(when-not (a/offer! rcv-ch val)
(l/warn :msg "dropping message on subscription loop"))))
(psubscribed [_ _pattern _count])
(punsubscribed [_ _pattern _count])
(subscribed [_ _topic _count])
(unsubscribed [_ _topic _count])))
(redis/pubsub-listener
:on-message (fn [_ topic message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(let [val {:topic topic :message (t/decode message)}]
(when-not (a/offer! rcv-ch val)
(l/warn :msg "dropping message on subscription loop"))))))
(defn start-io-loop
[{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}]
;; Add a single listener to the pubsub connection
(.addListener ^StatefulRedisPubSubConnection sconn
^RedisPubSubListener (create-listener rcv-ch))
(redis/add-listener! sconn (create-listener rcv-ch))
(letfn [(send-to-topic [topic message]
(a/go-loop [chans (seq (get-in @state [:topics topic]))
closed #{}]
@ -270,11 +227,10 @@
intended to be used in core.async go blocks."
[{:keys [::pconn] :as cfg} {:keys [topic message]}]
(let [message (t/encode message)
res (a/chan 1)
pcomm (.async ^StatefulRedisConnection pconn)]
(-> (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)
res (a/chan 1)]
(-> (redis/publish! pconn topic message)
(p/finally (fn [_ cause]
(when (and cause (.isOpen ^StatefulConnection pconn))
(when (and cause (redis/open? pconn))
(a/offer! res cause))
(a/close! res))))
res))
@ -283,14 +239,10 @@
"Create redis subscription. Blocking operation, intended to be used
inside an agent."
[{:keys [::sconn] :as cfg} topic]
(let [topic (into-array String [topic])
scomm (.sync ^StatefulRedisPubSubConnection sconn)]
(.subscribe ^RedisPubSubCommands scomm topic)))
(redis/subscribe! sconn topic))
(defn redis-unsub
"Removes redis subscription. Blocking operation, intended to be used
inside an agent."
[{:keys [::sconn] :as cfg} topic]
(let [topic (into-array String [topic])
scomm (.sync ^StatefulRedisPubSubConnection sconn)]
(.unsubscribe ^RedisPubSubCommands scomm topic)))
(redis/unsubscribe! sconn topic))