diff --git a/backend/src/app/loggers/zmq.clj b/backend/src/app/loggers/zmq.clj index 09d3abc1f..3f43cab99 100644 --- a/backend/src/app/loggers/zmq.clj +++ b/backend/src/app/loggers/zmq.clj @@ -7,6 +7,7 @@ (ns app.loggers.zmq "A generic ZMQ listener." (:require + [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] [app.util.json :as json] @@ -14,7 +15,8 @@ [clojure.core.async :as a] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [integrant.core :as ig]) + [integrant.core :as ig] + [jsonista.core :as j]) (:import org.zeromq.SocketType org.zeromq.ZMQ$Socket @@ -33,7 +35,7 @@ (l/info :msg "intializing ZMQ receiver" :bind endpoint) (let [buffer (a/chan 1) output (a/chan 1 (comp (filter map?) - (map prepare))) + (keep prepare))) mult (a/mult output)] (when endpoint (a/thread (start-rcv-loop {:out buffer :endpoint endpoint}))) @@ -52,6 +54,11 @@ [_ f] (a/close! (::buffer (meta f)))) +(def json-mapper + (j/object-mapper + {:encode-key-fn str/camel + :decode-key-fn (comp keyword str/kebab)})) + (defn- start-rcv-loop ([] (start-rcv-loop nil)) ([{:keys [out endpoint] :or {endpoint "tcp://localhost:5556"}}] @@ -63,7 +70,7 @@ (.. socket (setReceiveTimeOut 5000)) (loop [] (let [msg (.recv ^ZMQ$Socket socket) - msg (json/decode msg) + msg (ex/ignoring (j/read-value msg json-mapper)) msg (if (nil? msg) :empty msg)] (if (a/>!! out msg) (recur) @@ -71,18 +78,35 @@ (.close ^java.lang.AutoCloseable socket) (.close ^java.lang.AutoCloseable zctx)))))))) + +(s/def ::logger-name string?) +(s/def ::level string?) +(s/def ::thread string?) +(s/def ::time-millis integer?) +(s/def ::message string?) +(s/def ::context-map map?) +(s/def ::throw map?) + +(s/def ::log4j-event + (s/keys :req-un [::logger-name ::level ::thread ::time-millis ::message] + :opt-un [::context-map ::thrown])) + (defn- prepare [event] - (merge - {:logger (:loggerName event) - :level (str/lower (:level event)) - :thread (:thread event) - :created-at (dt/instant (:timeMillis event)) - :message (:message event)} - (when-let [ctx (:contextMap event)] - {:context ctx}) - (when-let [thrown (:thrown event)] - {:error - {:class (:name thrown) - :message (:message thrown) - :trace (:extendedStackTrace thrown)}}))) + (if (s/valid? ::log4j-event event) + (merge + {:logger (:logger-name event) + :level (str/lower (:level event)) + :thread (:thread event) + :created-at (dt/instant (:time-millis event)) + :message (:message event)} + (when-let [ctx (:context-map event)] + {:context ctx}) + (when-let [thrown (:thrown event)] + {:error + {:class (:name thrown) + :message (:message thrown) + :trace (:extended-stack-trace thrown)}})) + (do + (l/warn :hint "invalid event" :event event) + nil)))