Merge branch 'staging'

This commit is contained in:
Andrey Antukh 2022-03-01 11:10:27 +01:00
commit fae79d67e6
332 changed files with 12392 additions and 6659 deletions

View file

@ -24,6 +24,7 @@
[cuerdas.core :as str]
[integrant.core :as ig]
[lambdaisland.uri :as u]
[promesa.core :as p]
[promesa.exec :as px]))
(defn parse-client-ip
@ -41,33 +42,26 @@
(defn clean-props
[{:keys [profile-id] :as event}]
(letfn [(clean-common [props]
(-> props
(dissoc :session-id)
(dissoc :password)
(dissoc :old-password)
(dissoc :token)))
(let [invalid-keys #{:session-id
:password
:old-password
:token}
xform (comp
(remove (fn [kv]
(qualified-keyword? (first kv))))
(remove (fn [kv]
(contains? invalid-keys (first kv))))
(remove (fn [[k v]]
(and (= k :profile-id)
(= v profile-id))))
(filter (fn [[_ v]]
(or (string? v)
(keyword? v)
(uuid? v)
(boolean? v)
(number? v)))))]
(clean-profile-id [props]
(cond-> props
(= profile-id (:profile-id props))
(dissoc :profile-id)))
(clean-complex-data [props]
(reduce-kv (fn [props k v]
(cond-> props
(or (string? v)
(uuid? v)
(boolean? v)
(number? v))
(assoc k v)
(keyword? v)
(assoc k (name v))))
{}
props))]
(update event :props #(-> % clean-common clean-profile-id clean-complex-data))))
(update event :props #(into {} xform %))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HTTP Handler
@ -82,52 +76,62 @@
(s/def ::timestamp dt/instant?)
(s/def ::context (s/map-of ::us/keyword any?))
(s/def ::event
(s/def ::frontend-event
(s/keys :req-un [::type ::name ::props ::timestamp ::profile-id]
:opt-un [::context]))
(s/def ::events (s/every ::event))
(s/def ::frontend-events (s/every ::frontend-event))
(defmethod ig/init-key ::http-handler
[_ {:keys [executor] :as cfg}]
(fn [{:keys [params profile-id] :as request}]
(when (contains? cf/flags :audit-log)
(let [events (->> (:events params)
(remove #(not= profile-id (:profile-id %)))
(us/conform ::events))
ip-addr (parse-client-ip request)
cfg (-> cfg
(assoc :source "frontend")
(assoc :events events)
(assoc :ip-addr ip-addr))]
(px/run! executor #(persist-http-events cfg))))
{:status 204 :body ""}))
[_ {:keys [executor pool] :as cfg}]
(if (or (db/read-only? pool) (not (contains? cf/flags :audit-log)))
(do
(l/warn :hint "audit log http handler disabled or db is read-only")
(fn [_ respond _]
(respond {:status 204 :body ""})))
(letfn [(handler [{:keys [params profile-id] :as request}]
(let [events (->> (:events params)
(remove #(not= profile-id (:profile-id %)))
(us/conform ::frontend-events))
ip-addr (parse-client-ip request)
cfg (-> cfg
(assoc :source "frontend")
(assoc :events events)
(assoc :ip-addr ip-addr))]
(persist-http-events cfg)))
(handle-error [cause]
(let [xdata (ex-data cause)]
(if (= :spec-validation (:code xdata))
(l/error ::l/raw (str "spec validation on persist-events:\n" (us/pretty-explain xdata)))
(l/error :hint "error on persist-events" :cause cause))))]
(fn [request respond _]
;; Fire and forget, log error in case of errro
(-> (px/submit! executor #(handler request))
(p/catch handle-error))
(respond {:status 204 :body ""})))))
(defn- persist-http-events
[{:keys [pool events ip-addr source] :as cfg}]
(try
(let [columns [:id :name :source :type :tracked-at :profile-id :ip-addr :props :context]
prepare-xf (map (fn [event]
[(uuid/next)
(:name event)
source
(:type event)
(:timestamp event)
(:profile-id event)
(db/inet ip-addr)
(db/tjson (:props event))
(db/tjson (d/without-nils (:context event)))]))
events (us/conform ::events events)]
(when (seq events)
(->> (into [] prepare-xf events)
(db/insert-multi! pool :audit-log columns))))
(catch Throwable e
(let [xdata (ex-data e)]
(if (= :spec-validation (:code xdata))
(l/error ::l/raw (str "spec validation on persist-events:\n"
(:explain xdata)))
(l/error :hint "error on persist-events"
:cause e))))))
(let [columns [:id :name :source :type :tracked-at :profile-id :ip-addr :props :context]
prepare-xf (map (fn [event]
[(uuid/next)
(:name event)
source
(:type event)
(:timestamp event)
(:profile-id event)
(db/inet ip-addr)
(db/tjson (:props event))
(db/tjson (d/without-nils (:context event)))]))]
(when (seq events)
(->> (into [] prepare-xf events)
(db/insert-multi! pool :audit-log columns)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Collector
@ -142,49 +146,65 @@
(defmethod ig/pre-init-spec ::collector [_]
(s/keys :req-un [::db/pool ::wrk/executor]))
(def event-xform
(s/def ::ip-addr string?)
(s/def ::backend-event
(s/keys :req-un [::type ::name ::profile-id]
:opt-un [::ip-addr ::props]))
(def ^:private backend-event-xform
(comp
(filter :profile-id)
(filter #(us/valid? ::backend-event %))
(map clean-props)))
(defmethod ig/init-key ::collector
[_ cfg]
(when (contains? cf/flags :audit-log)
(l/info :msg "initializing audit log collector")
(let [input (a/chan 512 event-xform)
[_ {:keys [pool] :as cfg}]
(cond
(not (contains? cf/flags :audit-log))
(do
(l/info :hint "audit log collection disabled")
(constantly nil))
(db/read-only? pool)
(do
(l/warn :hint "audit log collection disabled, db is read-only")
(constantly nil))
:else
(let [input (a/chan 512 backend-event-xform)
buffer (aa/batch input {:max-batch-size 100
:max-batch-age (* 10 1000) ; 10s
:init []})]
(l/info :hint "audit log collector initialized")
(a/go-loop []
(when-let [[_type events] (a/<! buffer)]
(let [res (a/<! (persist-events cfg events))]
(when (ex/exception? res)
(l/error :hint "error on persisting events"
:cause res)))
(recur)))
(l/error :hint "error on persisting events" :cause res))
(recur))))
(fn [& {:keys [cmd] :as params}]
(let [params (-> params
(dissoc :cmd)
(assoc :tracked-at (dt/now)))]
(case cmd
:stop (a/close! input)
:submit (when-not (a/offer! input params)
(l/warn :msg "activity channel is full"))))))))
(case cmd
:stop
(a/close! input)
:submit
(let [params (-> params
(dissoc :cmd)
(assoc :tracked-at (dt/now)))]
(when-not (a/offer! input params)
(l/warn :hint "activity channel is full"))))))))
(defn- persist-events
[{:keys [pool executor] :as cfg} events]
(letfn [(event->row [event]
(when (:profile-id event)
[(uuid/next)
(:name event)
(:type event)
(:profile-id event)
(:tracked-at event)
(some-> (:ip-addr event) db/inet)
(db/tjson (:props event))
"backend"]))]
[(uuid/next)
(:name event)
(:type event)
(:profile-id event)
(:tracked-at event)
(some-> (:ip-addr event) db/inet)
(db/tjson (:props event))
"backend"])]
(aa/with-thread executor
(when (seq events)
(db/with-atomic [conn pool]
@ -217,6 +237,7 @@
(:enabled props false))
uri (or uri (:uri props))
cfg (assoc cfg :uri uri)]
(when (and enabled (not uri))
(ex/raise :type :internal
:code :task-not-configured

View file

@ -27,8 +27,9 @@
(defonce enabled (atom true))
(defn- persist-on-database!
[{:keys [pool]} {:keys [id] :as event}]
(db/insert! pool :server-error-report {:id id :content (db/tjson event)}))
[{:keys [pool] :as cfg} {:keys [id] :as event}]
(when-not (db/read-only? pool)
(db/insert! pool :server-error-report {:id id :content (db/tjson event)})))
(defn- parse-event-data
[event]