♻️ Enable receiving frontend audit log on backend.

This commit is contained in:
Andrey Antukh 2021-08-25 10:38:15 +02:00
parent 3dffb9c8a0
commit e768600df3
8 changed files with 130 additions and 34 deletions

View file

@ -23,7 +23,8 @@
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[lambdaisland.uri :as u]))
[lambdaisland.uri :as u]
[promesa.exec :as px]))
(defn parse-client-ip
[{:keys [headers] :as request}]
@ -67,6 +68,65 @@
(update event :props #(-> % clean-common clean-profile-id clean-complex-data))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HTTP Handler
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare persist-http-events)
(s/def ::profile-id ::us/uuid)
(s/def ::name ::us/string)
(s/def ::type ::us/string)
(s/def ::props (s/map-of ::us/keyword any?))
(s/def ::timestamp dt/instant?)
(s/def ::context (s/map-of ::us/keyword any?))
(s/def ::event
(s/keys :req-un [::type ::name ::props ::timestamp ::profile-id]
:opt-un [::context]))
(s/def ::events (s/every ::event))
(defmethod ig/init-key ::http-handler
[_ {:keys [executor enabled] :as cfg}]
(fn [{:keys [params headers cookies profile-id] :as request}]
(when enabled
(let [events (->> (:events params)
(remove #(not= profile-id (:profile-id %)))
(us/conform ::events))
ip-addr (parse-client-ip request)
cfg (-> cfg
(assoc :source "frontend")
(assoc :events events)
(assoc :ip-addr ip-addr))]
(px/run! executor #(persist-http-events cfg))))
{:status 204 :body ""}))
(defn- persist-http-events
[{:keys [pool events ip-addr source] :as cfg}]
(try
(let [columns [:id :name :source :type :tracked-at :profile-id :ip-addr :props :context]
prepare-xf (map (fn [event]
[(uuid/next)
(:name event)
source
(:type event)
(:timestamp event)
(:profile-id event)
(db/inet ip-addr)
(db/tjson (:props event))
(db/tjson (d/without-nils (:context event)))]))
events (us/conform ::events events)
rows (into [] prepare-xf events)]
(db/insert-multi! pool :audit-log columns rows))
(catch Throwable e
(let [xdata (ex-data e)]
(if (= :spec-validation (:code xdata))
(l/error ::l/raw (str "spec validation on persist-events:\n"
(:explain xdata)))
(l/error :hint "error on persist-events"
:cause e))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Collector
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -103,7 +163,9 @@
(recur)))
(fn [& {:keys [cmd] :as params}]
(let [params (dissoc params :cmd)]
(let [params (-> params
(dissoc :cmd)
(assoc :tracked-at (dt/now)))]
(case cmd
:stop (a/close! input)
:submit (when-not (a/offer! input params)
@ -117,13 +179,14 @@
(:name event)
(:type event)
(:profile-id event)
(:tracked-at event)
(some-> (:ip-addr event) db/inet)
(db/tjson (:props event))])]
(db/tjson (:props event))
"backend"])]
(aa/with-thread executor
(db/with-atomic [conn pool]
(db/insert-multi! conn :audit-log
[:id :name :type :profile-id :ip-addr :props]
[:id :name :type :profile-id :tracked-at :ip-addr :props :source]
(sequence (map event->row) events))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -144,16 +207,19 @@
(defmethod ig/init-key ::archive-task
[_ {:keys [uri enabled] :as cfg}]
(fn [_]
(when (and enabled (not uri))
(ex/raise :type :internal
:code :task-not-configured
:hint "archive task not configured, missing uri"))
(loop []
(let [res (archive-events cfg)]
(when (= res :continue)
(aa/thread-sleep 200)
(recur))))))
(fn [props]
(let [enabled (or enabled (:enabled props false))
uri (or uri (:uri props))]
(when (and enabled (not uri))
(ex/raise :type :internal
:code :task-not-configured
:hint "archive task not configured, missing uri"))
(when enabled
(loop []
(let [res (archive-events cfg)]
(when (= res :continue)
(aa/thread-sleep 200)
(recur))))))))
(def sql:retrieve-batch-of-audit-log
"select * from audit_log
@ -164,22 +230,27 @@
(defn archive-events
[{:keys [pool uri tokens] :as cfg}]
(letfn [(decode-row [{:keys [props ip-addr] :as row}]
(letfn [(decode-row [{:keys [props ip-addr context] :as row}]
(cond-> row
(db/pgobject? props)
(assoc :props (db/decode-transit-pgobject props))
(db/pgobject? context)
(assoc :context (db/decode-transit-pgobject context))
(db/pgobject? ip-addr "inet")
(assoc :ip-addr (db/decode-inet ip-addr))))
(row->event [{:keys [name type created-at profile-id props ip-addr]}]
(cond-> {:type type
:name name
:timestamp created-at
:profile-id profile-id
:props props}
(some? ip-addr)
(update :context assoc :source-ip ip-addr)))
(row->event [row]
(select-keys row [:type
:name
:source
:created-at
:tracked-at
:profile-id
:ip-addr
:props
:context]))
(send [events]
(let [token (tokens :generate {:iss "authentication"