diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 69888a54c..819b3da75 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -99,6 +99,12 @@ :initial-project-skey "initial-project" }) +(s/def ::audit-enabled ::us/boolean) +(s/def ::audit-archive-enabled ::us/boolean) +(s/def ::audit-archive-uri ::us/string) +(s/def ::audit-archive-gc-enabled ::us/boolean) +(s/def ::audit-archive-gc-max-age ::dt/duration) + (s/def ::secret-key ::us/string) (s/def ::allow-demo-users ::us/boolean) (s/def ::asserts-enabled ::us/boolean) @@ -182,6 +188,11 @@ (s/def ::config (s/keys :opt-un [::secret-key ::allow-demo-users + ::audit-enabled + ::audit-archive-enabled + ::audit-archive-uri + ::audit-archive-gc-enabled + ::audit-archive-gc-max-age ::asserts-enabled ::database-password ::database-uri diff --git a/backend/src/app/loggers/activity.clj b/backend/src/app/loggers/activity.clj deleted file mode 100644 index 4f02fb701..000000000 --- a/backend/src/app/loggers/activity.clj +++ /dev/null @@ -1,127 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) UXBOX Labs SL - -(ns app.loggers.activity - "Activity registry logger consumer." - (:require - [app.common.data :as d] - [app.common.spec :as us] - [app.config :as cf] - [app.util.async :as aa] - [app.util.http :as http] - [app.util.logging :as l] - [app.util.time :as dt] - [app.util.transit :as t] - [app.worker :as wrk] - [clojure.core.async :as a] - [clojure.spec.alpha :as s] - [integrant.core :as ig] - [lambdaisland.uri :as u])) - -(declare process-event) -(declare handle-event) - -(s/def ::uri ::us/string) - -(defmethod ig/pre-init-spec ::reporter [_] - (s/keys :req-un [::wrk/executor] - :opt-un [::uri])) - -(defmethod ig/init-key ::reporter - [_ {:keys [uri] :as cfg}] - (if (string? uri) - (do - (l/info :msg "intializing activity reporter" :uri uri) - (let [xform (comp (map process-event) - (filter map?)) - input (a/chan (a/sliding-buffer 1024) xform)] - (a/go-loop [] - (when-let [event (a/ params - (or (string? v) - (uuid? v) - (number? v)) - (assoc k v))) - {} - params))) - -(defn- process-event - [{:keys [type name params result] :as event}] - (let [profile-id (:profile-id params)] - (if (uuid? profile-id) - {:type (str "backend:" (d/name type)) - :name name - :timestamp (dt/now) - :profile-id profile-id - :props (clean-params params)} - (cond - (= "register-profile" name) - {:type (str "backend:" (d/name type)) - :name name - :timestamp (dt/now) - :profile-id (:id result) - :props (clean-params (:props result))} - - :else nil)))) - -(defn- send-activity - [{:keys [uri tokens]} event i] - (try - (let [token (tokens :generate {:iss "authentication" - :iat (dt/now) - :uid (:profile-id event)}) - body (t/encode {:events [event]}) - headers {"content-type" "application/transit+json" - "origin" (cf/get :public-uri) - "cookie" (u/map->query-string {:auth-token token})} - params {:uri uri - :timeout 6000 - :method :post - :headers headers - :body body} - response (http/send! params)] - (if (= (:status response) 204) - true - (do - (l/error :hint "error on sending activity" - :try i - :rsp (pr-str response)) - false))) - (catch Exception e - (l/error :hint "error on sending message to loki" - :cause e - :try i) - false))) - -(defn- handle-event - [{:keys [executor] :as cfg} event] - (aa/with-thread executor - (loop [i 1] - (when (and (not (send-activity cfg event i)) (< i 20)) - (Thread/sleep (* i 2000)) - (recur (inc i)))))) - diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj new file mode 100644 index 000000000..7d8c7e583 --- /dev/null +++ b/backend/src/app/loggers/audit.clj @@ -0,0 +1,212 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.loggers.audit + "Services related to the user activity (audit log)." + (:require + [app.common.exceptions :as ex] + [app.common.spec :as us] + [app.common.uuid :as uuid] + [app.config :as cf] + [app.db :as db] + [app.util.async :as aa] + [app.util.http :as http] + [app.util.logging :as l] + [app.util.time :as dt] + [app.util.transit :as t] + [app.worker :as wrk] + [clojure.core.async :as a] + [clojure.spec.alpha :as s] + [integrant.core :as ig] + [lambdaisland.uri :as u])) + +(defn clean-props + "Cleans the params from complex data, only accept strings, numbers and + uuids and removing sensitive data such as :password and related + props." + [params] + (let [params (dissoc params :session-id :password :old-password :token)] + (reduce-kv (fn [params k v] + (cond-> params + (or (string? v) + (uuid? v) + (number? v)) + (assoc k v))) + {} + params))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Collector +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; Defines a service that collects the audit/activity log using +;; internal database. Later this audit log can be transferred to +;; an external storage and data cleared. + +(declare persist-events) +(s/def ::enabled ::us/boolean) + +(defmethod ig/pre-init-spec ::collector [_] + (s/keys :req-un [::db/pool ::wrk/executor ::enabled])) + +(defmethod ig/init-key ::collector + [_ {:keys [enabled] :as cfg}] + (when enabled + (l/info :msg "intializing audit collector") + (let [input (a/chan) + buffer (aa/batch input {:max-batch-size 100 + :max-batch-age (* 5 1000) + :init []})] + (a/go-loop [] + (when-let [[type events] (a/row [event] + [(uuid/next) + (:name event) + (:type event) + (:profile-id event) + (db/tjson (:props event))])] + + (aa/with-thread executor + (db/with-atomic [conn pool] + (db/insert-multi! conn :audit-log + [:id :name :type :profile-id :props] + (sequence (map event->row) events)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Archive Task +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; This is a task responsible to send the accomulated events to an +;; external service for archival. + +(declare archive-events) + +(s/def ::uri ::us/string) +(s/def ::tokens fn?) + +(defmethod ig/pre-init-spec ::archive-task [_] + (s/keys :req-un [::db/pool ::tokens ::enabled] + :opt-un [::uri])) + +(defmethod ig/init-key ::archive-task + [_ {:keys [uri enabled] :as cfg}] + (fn [_] + (when (and enabled (not uri)) + (ex/raise :type :internal + :code :task-not-configured + :hint "archive task not configured, missing uri")) + (l/debug :msg "start archiver" :uri uri) + (loop [] + (let [res (archive-events cfg)] + (when (= res :continue) + (aa/thread-sleep 200) + (recur)))))) + +(def sql:retrieve-batch-of-audit-log + "select * from audit_log + where archived_at is null + order by created_at asc + limit 100 + for update skip locked;") + +(defn archive-events + [{:keys [pool uri tokens] :as cfg}] + (letfn [(decode-row [{:keys [props] :as row}] + (cond-> row + (db/pgobject? props) + (assoc :props (db/decode-transit-pgobject props)))) + + (row->event [{:keys [name type created-at profile-id props]}] + {:type (str "backend:" type) + :name name + :timestamp created-at + :profile-id profile-id + :props props}) + + (send [events] + (let [token (tokens :generate {:iss "authentication" + :iat (dt/now) + :uid uuid/zero}) + body (t/encode {:events events}) + headers {"content-type" "application/transit+json" + "origin" (cf/get :public-uri) + "cookie" (u/map->query-string {:auth-token token})} + params {:uri uri + :timeout 5000 + :method :post + :headers headers + :body body} + resp (http/send! params)] + (when (not= (:status resp) 204) + (ex/raise :type :internal + :code :unable-to-send-events + :hint "unable to send events" + :context resp)))) + + (mark-as-archived [conn rows] + (db/exec-one! conn ["update audit_log set archived_at=now() where id = ANY(?)" + (->> (map :id rows) + (into-array java.util.UUID) + (db/create-array conn "uuid"))]))] + + (db/with-atomic [conn pool] + (let [rows (db/exec! conn [sql:retrieve-batch-of-audit-log]) + + xform (comp (map decode-row) + (map row->event)) + events (into [] xform rows)] + (l/debug :action "archive-events" :uri uri :events (count events)) + (if (empty? events) + :empty + (do + (send events) + (mark-as-archived conn rows) + :continue)))))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; GC Task +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare clean-archived) + +(s/def ::max-age ::cf/audit-archive-gc-max-age) + +(defmethod ig/pre-init-spec ::archive-gc-task [_] + (s/keys :req-un [::db/pool ::enabled ::max-age])) + +(defmethod ig/init-key ::archive-gc-task + [_ cfg] + (fn [_] + (clean-archived cfg))) + +(def sql:clean-archived + "delete from audit_log + where archived_at is not null + and archived_at < now() - ?::interval") + +(defn- clean-archived + [{:keys [pool max-age]}] + (prn "clean-archived" max-age) + (let [interval (db/interval max-age) + result (db/exec-one! pool [sql:clean-archived interval]) + result (:next.jdbc/update-count result)] + (l/debug :action "clean archived audit log" :removed result) + result)) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index c616605a5..a3cfb79f5 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -140,7 +140,7 @@ :msgbus (ig/ref :app.msgbus/msgbus) :rlimits (ig/ref :app.rlimits/all) :public-uri (cf/get :public-uri) - :activity (ig/ref :app.loggers.activity/reporter)} + :audit (ig/ref :app.loggers.audit/collector)} :app.notifications/handler {:msgbus (ig/ref :app.msgbus/msgbus) @@ -187,6 +187,14 @@ {:cron #app/cron "0 0 0 */1 * ?" ;; daily :task :tasks-gc} + (when (cf/get :audit-archive-enabled) + {:cron #app/cron "0 0 * * * ?" ;; every 1h + :task :audit-archive}) + + (when (cf/get :audit-archive-gc-enabled) + {:cron #app/cron "0 0 * * * ?" ;; every 1h + :task :audit-archive-gc}) + (when (cf/get :telemetry-enabled) {:cron #app/cron "0 0 */6 * * ?" ;; every 6h :task :telemetry})]} @@ -204,7 +212,9 @@ :storage-recheck (ig/ref :app.storage/recheck-task) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) - :session-gc (ig/ref :app.http.session/gc-task)}} + :session-gc (ig/ref :app.http.session/gc-task) + :audit-archive (ig/ref :app.loggers.audit/archive-task) + :audit-archive-gc (ig/ref :app.loggers.audit/archive-gc-task)}} :app.emails/sendmail-handler {:host (cf/get :smtp-host) @@ -263,11 +273,22 @@ :app.loggers.zmq/receiver {:endpoint (cf/get :loggers-zmq-uri)} - :app.loggers.activity/reporter - {:uri (cf/get :activity-reporter-uri) - :tokens (ig/ref :app.tokens/tokens) + :app.loggers.audit/collector + {:enabled (cf/get :audit-enabled false) + :pool (ig/ref :app.db/pool) :executor (ig/ref :app.worker/executor)} + :app.loggers.audit/archive-task + {:uri (cf/get :audit-archive-uri) + :enabled (cf/get :audit-archive-enabled false) + :tokens (ig/ref :app.tokens/tokens) + :pool (ig/ref :app.db/pool)} + + :app.loggers.audit/archive-gc-task + {:enabled (cf/get :audit-archive-gc-enabled false) + :max-age (cf/get :audit-archive-gc-max-age cf/deletion-delay) + :pool (ig/ref :app.db/pool)} + :app.loggers.loki/reporter {:uri (cf/get :loggers-loki-uri) :receiver (ig/ref :app.loggers.zmq/receiver) diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 4989dd109..26e28f181 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -169,6 +169,9 @@ {:name "0053-add-team-font-variant-table" :fn (mg/resource "app/migrations/sql/0053-add-team-font-variant-table.sql")} + + {:name "0054-add-audit-log-table" + :fn (mg/resource "app/migrations/sql/0054-add-audit-log-table.sql")} ]) diff --git a/backend/src/app/migrations/sql/0054-add-audit-log-table.sql b/backend/src/app/migrations/sql/0054-add-audit-log-table.sql new file mode 100644 index 000000000..b7097fde2 --- /dev/null +++ b/backend/src/app/migrations/sql/0054-add-audit-log-table.sql @@ -0,0 +1,25 @@ +CREATE TABLE audit_log ( + id uuid NOT NULL DEFAULT uuid_generate_v4(), + + name text NOT NULL, + type text NOT NULL, + + created_at timestamptz DEFAULT clock_timestamp() NOT NULL, + archived_at timestamptz NULL, + + profile_id uuid NOT NULL, + props jsonb, + + PRIMARY KEY (created_at, profile_id) +) PARTITION BY RANGE (created_at); + +ALTER TABLE audit_log + ALTER COLUMN name SET STORAGE external, + ALTER COLUMN type SET STORAGE external, + ALTER COLUMN props SET STORAGE external; + +CREATE INDEX audit_log_id_archived_at_idx ON audit_log (id, archived_at); + +CREATE TABLE audit_log_default (LIKE audit_log INCLUDING ALL); + +ALTER TABLE audit_log ATTACH PARTITION audit_log_default DEFAULT; diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 16ab02791..da0497914 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -10,6 +10,7 @@ [app.common.exceptions :as ex] [app.common.spec :as us] [app.db :as db] + [app.loggers.audit :as audit] [app.metrics :as mtx] [app.rlimits :as rlm] [app.util.logging :as l] @@ -86,28 +87,31 @@ (defn- wrap-impl - [{:keys [activity] :as cfg} f mdata] + [{:keys [audit] :as cfg} f mdata] (let [f (wrap-with-rlimits cfg f mdata) f (wrap-with-metrics cfg f mdata) spec (or (::sv/spec mdata) (s/spec any?)) auth? (:auth mdata true)] - (l/trace :action "register" - :name (::sv/name mdata)) + + (l/trace :action "register" :name (::sv/name mdata)) (fn [params] (when (and auth? (not (uuid? (:profile-id params)))) (ex/raise :type :authentication :code :authentication-required :hint "authentication required for this endpoint")) + (let [params (us/conform spec params) result (f cfg params) - ;; On non authenticated handlers we check the private - ;; result that can be found on the metadata. - result* (if auth? result (:result (meta result) {}))] - (when (::type cfg) - (activity :submit {:type (::type cfg) - :name (::sv/name mdata) - :params params - :result result*})) + resultm (meta result)] + (when (and (::type cfg) (fn? audit)) + (let [profile-id (or (:profile-id params) + (:profile-id result) + (::audit/profile-id resultm)) + props (d/merge params (::audit/props resultm))] + (audit :submit {:type (::type cfg) + :name (::sv/name mdata) + :profile-id profile-id + :props (audit/clean-props props)}))) result)))) (defn- process-method @@ -124,7 +128,7 @@ :registry (get-in cfg [:metrics :registry]) :type :histogram :help "Timing of query services."}) - cfg (assoc cfg ::mobj mobj)] + cfg (assoc cfg ::mobj mobj ::type "query")] (->> (sv/scan-ns 'app.rpc.queries.projects 'app.rpc.queries.files 'app.rpc.queries.teams @@ -145,7 +149,7 @@ :registry (get-in cfg [:metrics :registry]) :type :histogram :help "Timing of mutation services."}) - cfg (assoc cfg ::mobj mobj ::type :mutation)] + cfg (assoc cfg ::mobj mobj ::type "mutation")] (->> (sv/scan-ns 'app.rpc.mutations.demo 'app.rpc.mutations.media 'app.rpc.mutations.profile @@ -164,10 +168,10 @@ (s/def ::storage some?) (s/def ::session map?) (s/def ::tokens fn?) -(s/def ::activity some?) +(s/def ::audit (s/nilable fn?)) (defmethod ig/pre-init-spec ::rpc [_] - (s/keys :req-un [::storage ::session ::tokens ::activity + (s/keys :req-un [::storage ::session ::tokens ::audit ::mtx/metrics ::rlm/rlimits ::db/pool])) (defmethod ig/init-key ::rpc diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj index 6e75017a5..3024c28bc 100644 --- a/backend/src/app/rpc/mutations/profile.clj +++ b/backend/src/app/rpc/mutations/profile.clj @@ -14,6 +14,7 @@ [app.db :as db] [app.emails :as eml] [app.http.oauth :refer [extract-props]] + [app.loggers.audit :as audit] [app.media :as media] [app.rpc.mutations.projects :as projects] [app.rpc.mutations.teams :as teams] @@ -103,7 +104,8 @@ (with-meta resp {:transform-response ((:create session) (:id profile)) :before-complete (annotate-profile-register metrics profile) - :result profile})) + ::audit/props (:props profile) + ::audit/profile-id (:id profile)})) ;; If no token is provided, send a verification email (let [vtoken (tokens :generate @@ -132,7 +134,8 @@ (with-meta profile {:before-complete (annotate-profile-register metrics profile) - :result profile}))))) + ::audit/props (:props profile) + ::audit/profile-id (:id profile)}))))) (defn email-domain-in-whitelist? "Returns true if email's domain is in the given whitelist or if given