🎉 Add optional loki integration.

And refactor internal error reporting.
This commit is contained in:
Andrey Antukh 2021-02-15 13:14:05 +01:00 committed by Andrés Moya
parent 90d7efe3a9
commit c1476d0397
15 changed files with 331 additions and 98 deletions

View file

@ -21,7 +21,8 @@
(def defaults
{:http-server-port 6060
:host "devenv"
:tenant "dev"
:database-uri "postgresql://127.0.0.1/penpot"
:database-username "penpot"
:database-password "penpot"
@ -87,11 +88,17 @@
})
(s/def ::http-server-port ::us/integer)
(s/def ::host ::us/string)
(s/def ::tenant ::us/string)
(s/def ::database-username (s/nilable ::us/string))
(s/def ::database-password (s/nilable ::us/string))
(s/def ::database-uri ::us/string)
(s/def ::redis-uri ::us/string)
(s/def ::loggers-loki-uri ::us/string)
(s/def ::loggers-zmq-uri ::us/string)
(s/def ::storage-backend ::us/keyword)
(s/def ::storage-fs-directory ::us/string)
@ -185,6 +192,7 @@
::google-client-id
::google-client-secret
::http-server-port
::host
::ldap-auth-avatar-attribute
::ldap-auth-base-dn
::ldap-auth-email-attribute
@ -221,6 +229,8 @@
::srepl-host
::srepl-port
::local-assets-uri
::loggers-loki-uri
::loggers-zmq-uri
::storage-s3-bucket
::storage-s3-region
::telemetry-enabled
@ -228,6 +238,7 @@
::telemetry-server-enabled
::telemetry-server-port
::telemetry-uri
::tenant
::initial-data-file
::initial-data-project-name]))

View file

@ -11,7 +11,6 @@
"A errors handling for the http server."
(:require
[app.common.uuid :as uuid]
[app.config :as cfg]
[app.util.log4j :refer [update-thread-context!]]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
@ -30,16 +29,10 @@
:path (:uri request)
:method (:request-method request)
:params (:params request)
:version (:full cfg/version)
:host (:public-uri cfg/config)
:class (.getCanonicalName ^java.lang.Class (class error))
:hint (ex-message error)
:data edata}
(let [headers (:headers request)]
{:user-agent (get headers "user-agent")
:frontend-version (get headers "x-frontend-version" "unknown")})
(when (and (map? edata) (:data edata))
{:explain (explain-error edata)}))))

View file

@ -0,0 +1,92 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020-2021 UXBOX Labs SL
(ns app.loggers.loki
"A Loki integration."
(:require
[app.common.spec :as us]
[app.config :as cfg]
[app.util.async :as aa]
[app.util.http :as http]
[app.util.json :as json]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare handle-event)
(s/def ::uri ::us/string)
(s/def ::receiver fn?)
(defmethod ig/pre-init-spec ::reporter [_]
(s/keys :req-un [::wrk/executor ::receiver]
:opt-un [::uri]))
(defmethod ig/init-key ::reporter
[_ {:keys [receiver uri] :as cfg}]
(when uri
(log/info "Intializing loki reporter.")
(let [output (a/chan (a/sliding-buffer 1024))]
(receiver :sub output)
(a/go-loop []
(let [msg (a/<! output)]
(if (nil? msg)
(log/info "Stoping error reporting loop.")
(do
(a/<! (handle-event cfg msg))
(recur)))))
output)))
(defmethod ig/halt-key! ::reporter
[_ output]
(when output
(a/close! output)))
(defn- prepare-payload
[event]
(let [labels {:host (cfg/get :host)
:tenant (cfg/get :tenant)
:version (:full cfg/version)
:logger (:logger event)
:level (:level event)}]
{:streams
[{:stream labels
:values [[(str (* (inst-ms (:created-at event)) 1000000))
(str (:message event)
(when-let [error (:error event)]
(str "\n" (:trace error))))]]}]}))
(defn- send-log
[uri payload i]
(try
(let [response (http/send! {:uri uri
:timeout 6000
:method :post
:headers {"content-type" "application/json"}
:body (json/encode payload)})]
(if (= (:status response) 204)
true
(do
(log/errorf "Error on sending log to loki (try %s).\n%s" i (pr-str response))
false)))
(catch Exception e
(log/errorf e "Error on sending message to loki (try %s)." i)
false)))
(defn- handle-event
[{:keys [executor uri]} event]
(aa/with-thread executor
(let [payload (prepare-payload event)]
(loop [i 1]
(when (and (not (send-log uri payload i)) (< i 20))
(Thread/sleep (* i 2000))
(recur (inc i)))))))

View file

@ -5,9 +5,9 @@
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
;; Copyright (c) 2020-2021 UXBOX Labs SL
(ns app.error-reporter
(ns app.loggers.mattermost
"A mattermost integration for error reporting."
(:require
[app.common.exceptions :as ex]
@ -15,6 +15,7 @@
[app.common.uuid :as uuid]
[app.config :as cfg]
[app.db :as db]
[app.util.async :as aa]
[app.util.http :as http]
[app.util.json :as json]
[app.util.template :as tmpl]
@ -24,11 +25,7 @@
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.exec :as px])
(:import
org.apache.logging.log4j.core.LogEvent
org.apache.logging.log4j.util.ReadOnlyStringMap))
[integrant.core :as ig]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Error Listener
@ -37,76 +34,51 @@
(declare handle-event)
(defonce enabled-mattermost (atom true))
(defonce queue (a/chan (a/sliding-buffer 64)))
(defonce queue-fn (fn [event] (a/>!! queue event)))
(s/def ::uri ::us/string)
(defmethod ig/pre-init-spec ::reporter [_]
(s/keys :req-un [::wrk/executor ::db/pool]
(s/keys :req-un [::wrk/executor ::db/pool ::receiver]
:opt-un [::uri]))
(defmethod ig/init-key ::reporter
[_ {:keys [executor] :as cfg}]
(log/info "Intializing error reporter.")
(let [close-ch (a/chan 1)]
[_ {:keys [receiver] :as cfg}]
(log/info "Intializing mattermost error reporter.")
(let [output (a/chan (a/sliding-buffer 128)
(filter #(= (:level %) "error")))]
(receiver :sub output)
(a/go-loop []
(let [[val port] (a/alts! [close-ch queue])]
(cond
(= port close-ch)
(let [msg (a/<! output)]
(if (nil? msg)
(log/info "Stoping error reporting loop.")
(nil? val)
(log/info "Stoping error reporting loop.")
:else
(do
(px/run! executor #(handle-event cfg val))
(a/<! (handle-event cfg msg))
(recur)))))
close-ch))
output))
(defmethod ig/halt-key! ::reporter
[_ close-ch]
(a/close! close-ch))
(defn- get-context-data
[event]
(let [^LogEvent levent (deref event)
^ReadOnlyStringMap rosm (.getContextData levent)]
(into {:message (str event)
:id (uuid/next)} ; set default uuid for cases when it not comes.
(comp
(map (fn [[key val]]
(cond
(= "id" key) [:id (uuid/uuid val)]
(= "profile-id" key) [:profile-id (uuid/uuid val)]
(str/blank? val) nil
(string? key) [(keyword key) val]
:else [key val])))
(filter some?))
(.toMap rosm))))
[_ output]
(a/close! output))
(defn- send-mattermost-notification!
[cfg {:keys [message host version id] :as cdata}]
[cfg {:keys [host version id error] :as cdata}]
(try
(let [uri (:uri cfg)
prefix (str "Unhandled exception (@channel):\n"
"- detail: " (:public-uri cfg/config) "/dbg/error-by-id/" id "\n"
"- host: `" host "`\n"
"- version: `" version "`\n")
text (str prefix "```\n" message "\n```")
(let [uri (:uri cfg)
text (str "Unhandled exception (@channel):\n"
"- detail: " (:public-uri cfg/config) "/dbg/error-by-id/" id "\n"
"- host: `" host "`\n"
"- version: `" version "`\n"
(when error
(str "```\n" (:trace error) "\n```")))
rsp (http/send! {:uri uri
:method :post
:headers {"content-type" "application/json"}
:body (json/encode-str {:text text})})]
(when (not= (:status rsp) 200)
(log/warnf "Error reporting webhook replying with unexpected status: %s\n%s"
(:status rsp)
(pr-str rsp))))
(log/errorf "Error on sending data to mattermost\n%s" (pr-str rsp))))
(catch Exception e
(log/warnf e "Unexpected exception on error reporter."))))
(log/error e "Unexpected exception on error reporter."))))
(defn- persist-on-database!
[{:keys [pool] :as cfg} {:keys [id] :as cdata}]
@ -114,15 +86,37 @@
(db/insert! conn :server-error-report
{:id id :content (db/tjson cdata)})))
(defn- parse-context
[event]
(reduce-kv
(fn [acc k v]
(cond
(= k :id) (assoc acc k (uuid/uuid v))
(= k :profile-id) (assoc acc k (uuid/uuid v))
(str/blank? v) acc
:else (assoc acc k v)))
{}
(:context event)))
(defn- parse-event
[event]
(-> (parse-context event)
(merge (dissoc event :context))
(assoc :tenant (cfg/get :tenant))
(assoc :host (cfg/get :host))
(assoc :public-uri (cfg/get :public-uri))
(assoc :version (:full cfg/version))))
(defn handle-event
[cfg event]
(try
(let [cdata (get-context-data event)]
(when (and (:uri cfg) @enabled-mattermost)
(send-mattermost-notification! cfg cdata))
(persist-on-database! cfg cdata))
(catch Exception e
(log/warnf e "Unexpected exception on error reporter."))))
[{:keys [executor] :as cfg} event]
(aa/with-thread executor
(try
(let [cdata (parse-event event)]
(when (and (:uri cfg) @enabled-mattermost)
(send-mattermost-notification! cfg cdata))
(persist-on-database! cfg cdata))
(catch Exception e
(log/error e "Unexpected exception on error reporter.")))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Http Handler

View file

@ -0,0 +1,92 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020-2021 UXBOX Labs SL
(ns app.loggers.zmq
"A generic ZMQ listener."
(:require
[app.common.data :as d]
[app.common.spec :as us]
[app.util.json :as json]
[app.util.time :as dt]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig])
(:import
org.zeromq.SocketType
org.zeromq.ZMQ$Socket
org.zeromq.ZContext))
(declare prepare)
(declare start-rcv-loop)
(s/def ::endpoint ::us/string)
(defmethod ig/pre-init-spec ::receiver [_]
(s/keys :opt-un [::endpoint]))
(defmethod ig/init-key ::receiver
[_ {:keys [endpoint] :as cfg}]
(log/infof "Intializing ZMQ receiver on '%s'." endpoint)
(let [buffer (a/chan 1)
output (a/chan 1 (comp (filter map?)
(map prepare)))
mult (a/mult output)]
(when endpoint
(a/thread (start-rcv-loop {:out buffer :endpoint endpoint})))
(a/pipe buffer output)
(with-meta
(fn [cmd ch]
(case cmd
:sub (a/tap mult ch)
:unsub (a/untap mult ch))
ch)
{::output output
::buffer buffer
::mult mult})))
(defmethod ig/halt-key! ::receiver
[_ f]
(a/close! (::buffer (meta f))))
(defn- start-rcv-loop
([] (start-rcv-loop nil))
([{:keys [out endpoint] :or {endpoint "tcp://localhost:5556"}}]
(let [out (or out (a/chan 1))
zctx (ZContext.)
socket (.. zctx (createSocket SocketType/SUB))]
(.. socket (connect ^String endpoint))
(.. socket (subscribe ""))
(.. socket (setReceiveTimeOut 5000))
(loop []
(let [msg (.recv ^ZMQ$Socket socket)
msg (json/decode msg)
msg (if (nil? msg) :empty msg)]
(if (a/>!! out msg)
(recur)
(do
(.close ^java.lang.AutoCloseable socket)
(.close ^java.lang.AutoCloseable zctx))))))))
(defn- prepare
[event]
(d/merge
{:logger (:loggerName event)
:level (str/lower (:level event))
:thread (:thread event)
:created-at (dt/instant (:timeMillis event))
:message (:message event)}
(when-let [ctx (:contextMap event)]
{:context ctx})
(when-let [thrown (:thrown event)]
{:error
{:class (:name thrown)
:message (:message thrown)
:trace (:extendedStackTrace thrown)}})))

View file

@ -95,7 +95,7 @@
:svgparse (ig/ref :app.svgparse/handler)
:storage (ig/ref :app.storage/storage)
:sns-webhook (ig/ref :app.http.awsns/handler)
:error-report-handler (ig/ref :app.error-reporter/handler)}
:error-report-handler (ig/ref :app.loggers.mattermost/handler)}
:app.http.assets/handlers
{:metrics (ig/ref :app.metrics/metrics)
@ -280,12 +280,21 @@
:app.sprops/props
{:pool (ig/ref :app.db/pool)}
:app.error-reporter/reporter
:app.loggers.zmq/receiver
{:endpoint (:loggers-zmq-uri config)}
:app.loggers.loki/reporter
{:uri (:loggers-loki-uri config)
:receiver (ig/ref :app.loggers.zmq/receiver)
:executor (ig/ref :app.worker/executor)}
:app.loggers.mattermost/reporter
{:uri (:error-report-webhook config)
:receiver (ig/ref :app.loggers.zmq/receiver)
:pool (ig/ref :app.db/pool)
:executor (ig/ref :app.worker/executor)}
:app.error-reporter/handler
:app.loggers.mattermost/handler
{:pool (ig/ref :app.db/pool)}
:app.storage/storage

View file

@ -16,10 +16,18 @@
[v]
(j/write-value-as-string v j/keyword-keys-object-mapper))
(defn encode
[v]
(j/write-value-as-bytes v j/keyword-keys-object-mapper))
(defn decode-str
[v]
(j/read-value v j/keyword-keys-object-mapper))
(defn decode
[v]
(j/read-value v j/keyword-keys-object-mapper))
(defn read
[v]
(j/read-value v j/keyword-keys-object-mapper))

View file

@ -93,6 +93,10 @@
[t1 t2]
(Duration/between t1 t2))
(defn instant
[ms]
(Instant/ofEpochMilli ms))
(defn parse-duration
[s]
(Duration/parse s))

View file

@ -12,7 +12,6 @@
(:require
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cfg]
[app.db :as db]
[app.util.async :as aa]
[app.util.log4j :refer [update-thread-context!]]
@ -210,10 +209,6 @@
[error item]
(let [edata (ex-data error)]
{:id (uuid/next)
:version (:full cfg/version)
:host (:public-uri cfg/config)
:class (.getCanonicalName ^java.lang.Class (class error))
:hint (ex-message error)
:data edata
:params item}))