From 7cf120e2e17bb5f704b2a869322d81946c9920db Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Sun, 9 May 2021 14:00:23 +0200 Subject: [PATCH] :sparkles: Move events batching to a util/async ns. --- backend/src/app/http/session.clj | 35 +++----------------------------- backend/src/app/util/async.clj | 31 ++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 32 deletions(-) diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj index af0fd6d8f..d1b0170bb 100644 --- a/backend/src/app/http/session.clj +++ b/backend/src/app/http/session.clj @@ -106,7 +106,6 @@ ;; --- STATE INIT: SESSION UPDATER -(declare batch-events) (declare update-sessions) (s/def ::session map?) @@ -129,7 +128,9 @@ (l/info :action "initialize session updater" :max-batch-age (str (:max-batch-age cfg)) :max-batch-size (str (:max-batch-size cfg))) - (let [input (batch-events cfg (::events-ch session)) + (let [input (aa/batch (::events-ch session) + {:max-batch-size (:max-batch-size cfg) + :max-batch-age (inst-ms (:max-batch-age cfg))}) mcnt (mtx/create {:name "http_session_update_total" :help "A counter of session update batch events." @@ -149,36 +150,6 @@ :count result)) (recur)))))) -(defn- timeout-chan - [cfg] - (a/timeout (inst-ms (:max-batch-age cfg)))) - -(defn- batch-events - [cfg in] - (let [out (a/chan)] - (a/go-loop [tch (timeout-chan cfg) - buf #{}] - (let [[val port] (a/alts! [tch in])] - (cond - (identical? port tch) - (if (empty? buf) - (recur (timeout-chan cfg) buf) - (do - (a/>! out [:timeout buf]) - (recur (timeout-chan cfg) #{}))) - - (nil? val) - (a/close! out) - - (identical? port in) - (let [buf (conj buf val)] - (if (>= (count buf) (:max-batch-size cfg)) - (do - (a/>! out [:size buf]) - (recur (timeout-chan cfg) #{})) - (recur tch buf)))))) - out)) - (defn- update-sessions [{:keys [pool executor]} ids] (aa/with-thread executor diff --git a/backend/src/app/util/async.clj b/backend/src/app/util/async.clj index fb17e6a7e..a0492e086 100644 --- a/backend/src/app/util/async.clj +++ b/backend/src/app/util/async.clj @@ -60,3 +60,34 @@ (if (= executor ::default) `(a/thread-call (^:once fn* [] (try ~@body (catch Exception e# e#)))) `(thread-call ~executor (^:once fn* [] ~@body)))) + +(defn batch + [in {:keys [max-batch-size + max-batch-age + init] + :or {max-batch-size 200 + max-batch-age (* 30 1000) + init #{}} + :as opts}] + (let [out (a/chan)] + (a/go-loop [tch (a/timeout max-batch-age) buf init] + (let [[val port] (a/alts! [tch in])] + (cond + (identical? port tch) + (if (empty? buf) + (recur (a/timeout max-batch-age) buf) + (do + (a/>! out [:timeout buf]) + (recur (a/timeout max-batch-age) init))) + + (nil? val) + (a/close! out) + + (identical? port in) + (let [buf (conj buf val)] + (if (>= (count buf) max-batch-size) + (do + (a/>! out [:size buf]) + (recur (a/timeout max-batch-age) init)) + (recur tch buf)))))) + out))