From 10bf6c5e56c6594b82b3088c7dcdcd27c8226c19 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 22 Nov 2022 13:04:33 +0100 Subject: [PATCH] :recycle: Normalize redis api and its usage in msgbus module --- backend/src/app/msgbus.clj | 55 +++++++++--------- backend/src/app/redis.clj | 113 +++++++++++++++++++++---------------- common/deps.edn | 2 +- 3 files changed, 94 insertions(+), 76 deletions(-) diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj index 7d3959931..371af7de0 100644 --- a/backend/src/app/msgbus.clj +++ b/backend/src/app/msgbus.clj @@ -20,7 +20,8 @@ [clojure.core.async :as a] [clojure.spec.alpha :as s] [integrant.core :as ig] - [promesa.core :as p])) + [promesa.core :as p] + [promesa.exec :as px])) (set! *warn-on-reflection* true) @@ -52,8 +53,8 @@ (s/def ::rcv-ch ::aa/channel) (s/def ::pub-ch ::aa/channel) (s/def ::state ::us/agent) -(s/def ::pconn ::redis/connection) -(s/def ::sconn ::redis/connection) +(s/def ::pconn ::redis/connection-holder) +(s/def ::sconn ::redis/connection-holder) (s/def ::msgbus (s/keys :req [::cmd-ch ::rcv-ch ::pub-ch ::state ::pconn ::sconn ::wrk/executor])) @@ -205,31 +206,33 @@ (when-let [closed (a/> (vals state) + (mapcat identity) + (filter some?) + (run! a/close!)) + nil))) - (a/go-loop [] - (let [[val port] (a/alts! [pub-ch rcv-ch])] - (cond - (nil? val) - (do - (l/trace :hint "stopping io-loop, nil received") - (send-via executor state (fn [state] - (->> (vals state) - (mapcat identity) - (filter some?) - (run! a/close!)) - nil))) + (= port rcv-ch) + (do + (a/ state ::connection deref) - cmd (.async ^StatefulRedisConnection rconn) + (let [cmd (.async ^StatefulRedisConnection @connection) keys (into-array String (map str (::rscript/keys script))) vals (into-array String (map str (::rscript/vals script))) sname (::rscript/name script)] @@ -276,20 +291,20 @@ (if (instance? io.lettuce.core.RedisNoScriptException cause) (do (l/error :hint "no script found" :name sname :cause cause) - (-> (load-script) - (p/then eval-script))) + (->> (load-script) + (p/mapcat eval-script))) (if-let [on-error (::rscript/on-error script)] (on-error cause) (p/rejected cause)))) (eval-script [sha] (let [tpoint (dt/tpoint)] - (-> (.evalsha ^RedisScriptingAsyncCommands cmd - ^String sha - ^ScriptOutputType ScriptOutputType/MULTI - ^"[Ljava.lang.String;" keys - ^"[Ljava.lang.String;" vals) - (p/then (fn [result] + (->> (.evalsha ^RedisScriptingAsyncCommands cmd + ^String sha + ^ScriptOutputType ScriptOutputType/MULTI + ^"[Ljava.lang.String;" keys + ^"[Ljava.lang.String;" vals) + (p/map (fn [result] (let [elapsed (tpoint)] (mtx/run! metrics {:id :redis-eval-timing :labels [(name sname)] @@ -300,20 +315,20 @@ :params (str/join "," (::rscript/vals script)) :elapsed (dt/format-duration elapsed)) result))) - (p/catch on-error)))) + (p/error on-error)))) (read-script [] (-> script ::rscript/path io/resource slurp)) (load-script [] (l/trace :hint "load script" :name sname) - (-> (.scriptLoad ^RedisScriptingAsyncCommands cmd + (->> (.scriptLoad ^RedisScriptingAsyncCommands cmd ^String (read-script)) - (p/then (fn [sha] + (p/map (fn [sha] (swap! scripts-cache assoc sname sha) sha))))] (if-let [sha (get @scripts-cache sname)] (eval-script sha) - (-> (load-script) - (p/then eval-script)))))) + (->> (load-script) + (p/mapcat eval-script)))))) diff --git a/common/deps.edn b/common/deps.edn index 2ffa93000..1e3652f86 100644 --- a/common/deps.edn +++ b/common/deps.edn @@ -23,7 +23,7 @@ com.cognitect/transit-cljs {:mvn/version "0.8.280"} java-http-clj/java-http-clj {:mvn/version "0.4.3"} - funcool/promesa {:mvn/version "9.0.507"} + funcool/promesa {:mvn/version "9.1.539"} funcool/cuerdas {:mvn/version "2022.06.16-403"} lambdaisland/uri {:mvn/version "1.13.95"