Improve events/sse internal API

For make code cleaner and more evident for a quick view
This commit is contained in:
Andrey Antukh 2025-05-15 09:07:56 +02:00
parent 6524e75770
commit cf274099c4
2 changed files with 29 additions and 30 deletions

View file

@ -9,7 +9,6 @@
(:refer-clojure :exclude [tap]) (:refer-clojure :exclude [tap])
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.transit :as t] [app.common.transit :as t]
[app.http.errors :as errors] [app.http.errors :as errors]
@ -54,16 +53,20 @@
::yres/status 200 ::yres/status 200
::yres/body (yres/stream-body ::yres/body (yres/stream-body
(fn [_ output] (fn [_ output]
(binding [events/*channel* (sp/chan :buf buf :xf (keep encode))] (let [channel (sp/chan :buf buf :xf (keep encode))
(let [listener (events/start-listener listener (events/start-listener
(partial write! output) channel
(partial pu/close! output))] (partial write! output)
(try (partial pu/close! output))]
(try
(binding [events/*channel* channel]
(let [result (handler)] (let [result (handler)]
(events/tap :end result)) (events/tap :end result)))
(catch Throwable cause
(let [result (errors/handle' cause request)] (catch Throwable cause
(events/tap :error result))) (let [result (errors/handle' cause request)]
(finally (events/tap channel :error result)))
(sp/close! events/*channel*)
(px/await! listener)))))))})) (finally
(sp/close! channel)
(px/await! listener))))))}))

View file

@ -10,7 +10,6 @@
to them. Mainly used in http.sse for progress reporting." to them. Mainly used in http.sse for progress reporting."
(:refer-clojure :exclude [tap run!]) (:refer-clojure :exclude [tap run!])
(:require (:require
[app.common.data.macros :as dm]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[promesa.exec :as px] [promesa.exec :as px]
@ -18,33 +17,30 @@
(def ^:dynamic *channel* nil) (def ^:dynamic *channel* nil)
(defn channel
[]
(sp/chan :buf 32))
(defn tap (defn tap
[type data] ([type data]
(when-let [channel *channel*] (when-let [channel *channel*]
(sp/put! channel [type data]) (sp/put! channel [type data])
nil)) nil))
([channel type data]
(when channel
(sp/put! channel [type data])
nil)))
(defn start-listener (defn start-listener
[on-event on-close] [channel on-event on-close]
(assert (sp/chan? channel) "expected active events channel")
(dm/assert!
"expected active events channel"
(sp/chan? *channel*))
(px/thread (px/thread
{:virtual true} {:virtual true}
(try (try
(loop [] (loop []
(when-let [event (sp/take! *channel*)] (when-let [event (sp/take! channel)]
(let [result (ex/try! (on-event event))] (let [result (ex/try! (on-event event))]
(if (ex/exception? result) (if (ex/exception? result)
(do (do
(l/wrn :hint "unexpected exception" :cause result) (l/wrn :hint "unexpected exception" :cause result)
(sp/close! *channel*)) (sp/close! channel))
(recur))))) (recur)))))
(finally (finally
(on-close))))) (on-close)))))
@ -55,7 +51,7 @@
[f on-event] [f on-event]
(binding [*channel* (sp/chan :buf 32)] (binding [*channel* (sp/chan :buf 32)]
(let [listener (start-listener on-event (constantly nil))] (let [listener (start-listener *channel* on-event (constantly nil))]
(try (try
(f) (f)
(finally (finally