Ensure valid messages on zmq listener.

This commit is contained in:
Andrey Antukh 2021-12-22 14:28:09 +01:00
parent 0bf883d5b2
commit 634ec1b113

View file

@ -7,6 +7,7 @@
(ns app.loggers.zmq (ns app.loggers.zmq
"A generic ZMQ listener." "A generic ZMQ listener."
(:require (:require
[app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us] [app.common.spec :as us]
[app.util.json :as json] [app.util.json :as json]
@ -14,7 +15,8 @@
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig]) [integrant.core :as ig]
[jsonista.core :as j])
(:import (:import
org.zeromq.SocketType org.zeromq.SocketType
org.zeromq.ZMQ$Socket org.zeromq.ZMQ$Socket
@ -33,7 +35,7 @@
(l/info :msg "intializing ZMQ receiver" :bind endpoint) (l/info :msg "intializing ZMQ receiver" :bind endpoint)
(let [buffer (a/chan 1) (let [buffer (a/chan 1)
output (a/chan 1 (comp (filter map?) output (a/chan 1 (comp (filter map?)
(map prepare))) (keep prepare)))
mult (a/mult output)] mult (a/mult output)]
(when endpoint (when endpoint
(a/thread (start-rcv-loop {:out buffer :endpoint endpoint}))) (a/thread (start-rcv-loop {:out buffer :endpoint endpoint})))
@ -52,6 +54,11 @@
[_ f] [_ f]
(a/close! (::buffer (meta f)))) (a/close! (::buffer (meta f))))
(def json-mapper
(j/object-mapper
{:encode-key-fn str/camel
:decode-key-fn (comp keyword str/kebab)}))
(defn- start-rcv-loop (defn- start-rcv-loop
([] (start-rcv-loop nil)) ([] (start-rcv-loop nil))
([{:keys [out endpoint] :or {endpoint "tcp://localhost:5556"}}] ([{:keys [out endpoint] :or {endpoint "tcp://localhost:5556"}}]
@ -63,7 +70,7 @@
(.. socket (setReceiveTimeOut 5000)) (.. socket (setReceiveTimeOut 5000))
(loop [] (loop []
(let [msg (.recv ^ZMQ$Socket socket) (let [msg (.recv ^ZMQ$Socket socket)
msg (json/decode msg) msg (ex/ignoring (j/read-value msg json-mapper))
msg (if (nil? msg) :empty msg)] msg (if (nil? msg) :empty msg)]
(if (a/>!! out msg) (if (a/>!! out msg)
(recur) (recur)
@ -71,18 +78,35 @@
(.close ^java.lang.AutoCloseable socket) (.close ^java.lang.AutoCloseable socket)
(.close ^java.lang.AutoCloseable zctx)))))))) (.close ^java.lang.AutoCloseable zctx))))))))
(s/def ::logger-name string?)
(s/def ::level string?)
(s/def ::thread string?)
(s/def ::time-millis integer?)
(s/def ::message string?)
(s/def ::context-map map?)
(s/def ::throw map?)
(s/def ::log4j-event
(s/keys :req-un [::logger-name ::level ::thread ::time-millis ::message]
:opt-un [::context-map ::thrown]))
(defn- prepare (defn- prepare
[event] [event]
(if (s/valid? ::log4j-event event)
(merge (merge
{:logger (:loggerName event) {:logger (:logger-name event)
:level (str/lower (:level event)) :level (str/lower (:level event))
:thread (:thread event) :thread (:thread event)
:created-at (dt/instant (:timeMillis event)) :created-at (dt/instant (:time-millis event))
:message (:message event)} :message (:message event)}
(when-let [ctx (:contextMap event)] (when-let [ctx (:context-map event)]
{:context ctx}) {:context ctx})
(when-let [thrown (:thrown event)] (when-let [thrown (:thrown event)]
{:error {:error
{:class (:name thrown) {:class (:name thrown)
:message (:message thrown) :message (:message thrown)
:trace (:extendedStackTrace thrown)}}))) :trace (:extended-stack-trace thrown)}}))
(do
(l/warn :hint "invalid event" :event event)
nil)))