From fd24831c7122b97f57c40f18546ca475d24a9610 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 25 Mar 2024 10:46:15 +0100 Subject: [PATCH] :sparkles: Move audit tasks to separated namespace files --- backend/src/app/loggers/audit.clj | 141 ------------------ .../src/app/loggers/audit/archive_task.clj | 140 +++++++++++++++++ backend/src/app/loggers/audit/gc_task.clj | 31 ++++ backend/src/app/main.clj | 9 +- 4 files changed, 175 insertions(+), 146 deletions(-) create mode 100644 backend/src/app/loggers/audit/archive_task.clj create mode 100644 backend/src/app/loggers/audit/gc_task.clj diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index aead09110..211799d30 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -9,31 +9,24 @@ (:require [app.common.data :as d] [app.common.data.macros :as dm] - [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] - [app.common.transit :as t] [app.common.uuid :as uuid] [app.config :as cf] [app.db :as db] [app.http :as-alias http] [app.http.access-token :as-alias actoken] - [app.http.client :as http.client] [app.loggers.audit.tasks :as-alias tasks] [app.loggers.webhooks :as-alias webhooks] - [app.main :as-alias main] [app.rpc :as-alias rpc] [app.rpc.retry :as rtry] [app.setup :as-alias setup] - [app.tokens :as tokens] [app.util.services :as-alias sv] [app.util.time :as dt] [app.worker :as wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str] [integrant.core :as ig] - [lambdaisland.uri :as u] - [promesa.exec :as px] [ring.request :as rreq])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -249,137 +242,3 @@ (rtry/invoke! cfg db/tx-run! handle-event! event)) (catch Throwable cause (l/error :hint "unexpected error processing event" :cause cause)))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; TASK: ARCHIVE -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -;; This is a task responsible to send the accumulated events to -;; external service for archival. - -(declare archive-events) - -(s/def ::tasks/uri ::us/string) - -(defmethod ig/pre-init-spec ::tasks/archive-task [_] - (s/keys :req [::db/pool ::setup/props ::http.client/client])) - -(defmethod ig/init-key ::tasks/archive - [_ cfg] - (fn [params] - ;; NOTE: this let allows overwrite default configured values from - ;; the repl, when manually invoking the task. - (let [enabled (or (contains? cf/flags :audit-log-archive) - (:enabled params false)) - uri (cf/get :audit-log-archive-uri) - uri (or uri (:uri params)) - cfg (assoc cfg ::uri uri)] - - (when (and enabled (not uri)) - (ex/raise :type :internal - :code :task-not-configured - :hint "archive task not configured, missing uri")) - - (when enabled - (loop [total 0] - (let [n (archive-events cfg)] - (if n - (do - (px/sleep 100) - (recur (+ total ^long n))) - (when (pos? total) - (l/dbg :hint "events archived" :total total))))))))) - -(def ^:private sql:retrieve-batch-of-audit-log - "select * - from audit_log - where archived_at is null - order by created_at asc - limit 128 - for update skip locked;") - -(defn archive-events - [{:keys [::db/pool ::uri] :as cfg}] - (letfn [(decode-row [{:keys [props ip-addr context] :as row}] - (cond-> row - (db/pgobject? props) - (assoc :props (db/decode-transit-pgobject props)) - - (db/pgobject? context) - (assoc :context (db/decode-transit-pgobject context)) - - (db/pgobject? ip-addr "inet") - (assoc :ip-addr (db/decode-inet ip-addr)))) - - (row->event [row] - (select-keys row [:type - :name - :source - :created-at - :tracked-at - :profile-id - :ip-addr - :props - :context])) - - (send [events] - (let [token (tokens/generate (::setup/props cfg) - {:iss "authentication" - :iat (dt/now) - :uid uuid/zero}) - body (t/encode {:events events}) - headers {"content-type" "application/transit+json" - "origin" (cf/get :public-uri) - "cookie" (u/map->query-string {:auth-token token})} - params {:uri uri - :timeout 12000 - :method :post - :headers headers - :body body} - resp (http.client/req! cfg params)] - (if (= (:status resp) 204) - true - (do - (l/error :hint "unable to archive events" - :resp-status (:status resp) - :resp-body (:body resp)) - false)))) - - (mark-as-archived [conn rows] - (db/exec-one! conn ["update audit_log set archived_at=now() where id = ANY(?)" - (->> (map :id rows) - (db/create-array conn "uuid"))]))] - - (db/with-atomic [conn pool] - (let [rows (db/exec! conn [sql:retrieve-batch-of-audit-log]) - xform (comp (map decode-row) - (map row->event)) - events (into [] xform rows)] - (when-not (empty? events) - (l/trc :hint "archive events chunk" :uri uri :events (count events)) - (when (send events) - (mark-as-archived conn rows) - (count events))))))) - -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; GC Task -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -(def ^:private sql:clean-archived - "delete from audit_log - where archived_at is not null") - -(defn- clean-archived - [{:keys [::db/pool]}] - (let [result (db/exec-one! pool [sql:clean-archived]) - result (:next.jdbc/update-count result)] - (l/debug :hint "delete archived audit log entries" :deleted result) - result)) - -(defmethod ig/pre-init-spec ::tasks/gc [_] - (s/keys :req [::db/pool])) - -(defmethod ig/init-key ::tasks/gc - [_ cfg] - (fn [_] - (clean-archived cfg))) diff --git a/backend/src/app/loggers/audit/archive_task.clj b/backend/src/app/loggers/audit/archive_task.clj new file mode 100644 index 000000000..046fb8068 --- /dev/null +++ b/backend/src/app/loggers/audit/archive_task.clj @@ -0,0 +1,140 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.loggers.audit.archive-task + (:require + [app.common.exceptions :as ex] + [app.common.logging :as l] + [app.common.transit :as t] + [app.common.uuid :as uuid] + [app.config :as cf] + [app.db :as db] + [app.http.client :as http] + [app.setup :as-alias setup] + [app.tokens :as tokens] + [app.util.time :as dt] + [clojure.spec.alpha :as s] + [integrant.core :as ig] + [lambdaisland.uri :as u] + [promesa.exec :as px])) + +;; This is a task responsible to send the accumulated events to +;; external service for archival. + +(defn- decode-row + [{:keys [props ip-addr context] :as row}] + (cond-> row + (db/pgobject? props) + (assoc :props (db/decode-transit-pgobject props)) + + (db/pgobject? context) + (assoc :context (db/decode-transit-pgobject context)) + + (db/pgobject? ip-addr "inet") + (assoc :ip-addr (db/decode-inet ip-addr)))) + +(def ^:private event-keys + [:type + :name + :source + :created-at + :tracked-at + :profile-id + :ip-addr + :props + :context]) + +(defn- row->event + [row] + (select-keys row event-keys)) + +(defn- send! + [{:keys [::uri] :as cfg} events] + (let [token (tokens/generate (::setup/props cfg) + {:iss "authentication" + :iat (dt/now) + :uid uuid/zero}) + body (t/encode {:events events}) + headers {"content-type" "application/transit+json" + "origin" (cf/get :public-uri) + "cookie" (u/map->query-string {:auth-token token})} + params {:uri uri + :timeout 12000 + :method :post + :headers headers + :body body} + resp (http/req! cfg params)] + (if (= (:status resp) 204) + true + (do + (l/error :hint "unable to archive events" + :resp-status (:status resp) + :resp-body (:body resp)) + false)))) + +(defn- mark-archived! + [{:keys [::db/conn]} rows] + (let [ids (db/create-array conn "uuid" (map :id rows))] + (db/exec-one! conn ["update audit_log set archived_at=now() where id = ANY(?)" ids]))) + +(def ^:private xf:create-event + (comp (map decode-row) + (map row->event))) + +(def ^:private sql:get-audit-log-chunk + "SELECT * + FROM audit_log + WHERE archived_at is null + ORDER BY created_at ASC + LIMIT 128 + FOR UPDATE + SKIP LOCKED") + +(defn- get-event-rows + [{:keys [::db/conn] :as cfg}] + (->> (db/exec! conn [sql:get-audit-log-chunk]) + (not-empty))) + +(defn- archive-events! + [{:keys [::uri] :as cfg}] + (db/tx-run! cfg (fn [cfg] + (when-let [rows (get-event-rows cfg)] + (let [events (into [] xf:create-event rows)] + (l/trc :hint "archive events chunk" :uri uri :events (count events)) + (when (send! cfg events) + (mark-archived! cfg rows) + (count events))))))) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool ::setup/props ::http/client])) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [params] + ;; NOTE: this let allows overwrite default configured values from + ;; the repl, when manually invoking the task. + (let [enabled (or (contains? cf/flags :audit-log-archive) + (:enabled params false)) + + uri (cf/get :audit-log-archive-uri) + uri (or uri (:uri params)) + cfg (assoc cfg ::uri uri)] + + (when (and enabled (not uri)) + (ex/raise :type :internal + :code :task-not-configured + :hint "archive task not configured, missing uri")) + + (when enabled + (loop [total 0] + (if-let [n (archive-events! cfg)] + (do + (px/sleep 100) + (recur (+ total ^long n))) + + (when (pos? total) + (l/dbg :hint "events archived" :total total)))))))) + diff --git a/backend/src/app/loggers/audit/gc_task.clj b/backend/src/app/loggers/audit/gc_task.clj new file mode 100644 index 000000000..7f94217a4 --- /dev/null +++ b/backend/src/app/loggers/audit/gc_task.clj @@ -0,0 +1,31 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.loggers.audit.gc-task + (:require + [app.common.logging :as l] + [app.db :as db] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(def ^:private sql:clean-archived + "DELETE FROM audit_log + WHERE archived_at IS NOT NULL") + +(defn- clean-archived! + [{:keys [::db/pool]}] + (let [result (db/exec-one! pool [sql:clean-archived]) + result (db/get-update-count result)] + (l/debug :hint "delete archived audit log entries" :deleted result) + result)) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [_] + (clean-archived! cfg))) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index e0177110f..056c99cc8 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -21,7 +21,6 @@ [app.http.session :as-alias session] [app.http.session.tasks :as-alias session.tasks] [app.http.websocket :as http.ws] - [app.loggers.audit.tasks :as-alias audit.tasks] [app.loggers.webhooks :as-alias webhooks] [app.metrics :as-alias mtx] [app.metrics.definition :as-alias mdef] @@ -346,8 +345,8 @@ :storage-gc-deleted (ig/ref ::sto.gc-deleted/handler) :storage-gc-touched (ig/ref ::sto.gc-touched/handler) :session-gc (ig/ref ::session.tasks/gc) - :audit-log-archive (ig/ref ::audit.tasks/archive) - :audit-log-gc (ig/ref ::audit.tasks/gc) + :audit-log-archive (ig/ref :app.loggers.audit.archive-task/handler) + :audit-log-gc (ig/ref :app.loggers.audit.gc-task/handler) :process-webhook-event (ig/ref ::webhooks/process-event-handler) @@ -411,12 +410,12 @@ ::svgo/optimizer {} - ::audit.tasks/archive + :app.loggers.audit.archive-task/handler {::setup/props (ig/ref ::setup/props) ::db/pool (ig/ref ::db/pool) ::http.client/client (ig/ref ::http.client/client)} - ::audit.tasks/gc + :app.loggers.audit.gc-task/handler {::db/pool (ig/ref ::db/pool)} ::webhooks/process-event-handler