Add improved activity logging.

This commit is contained in:
Andrey Antukh 2021-05-09 14:06:27 +02:00 committed by Andrés Moya
parent e94e202cef
commit 334ac26f0d
10 changed files with 204 additions and 35 deletions

View file

@ -8,6 +8,7 @@
"A configuration management." "A configuration management."
(:refer-clojure :exclude [get]) (:refer-clojure :exclude [get])
(:require (:require
[app.common.data :as d]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.version :as v] [app.common.version :as v]
[app.util.time :as dt] [app.util.time :as dt]
@ -16,7 +17,8 @@
[clojure.pprint :as pprint] [clojure.pprint :as pprint]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[environ.core :refer [env]])) [environ.core :refer [env]]
[integrant.core :as ig]))
(prefer-method print-method (prefer-method print-method
clojure.lang.IRecord clojure.lang.IRecord
@ -26,6 +28,16 @@
clojure.lang.IPersistentMap clojure.lang.IPersistentMap
clojure.lang.IDeref) clojure.lang.IDeref)
(defmethod ig/init-key :default
[_ data]
(d/without-nils data))
(defmethod ig/prep-key :default
[_ data]
(if (map? data)
(d/without-nils data)
data))
(def defaults (def defaults
{:http-server-port 6060 {:http-server-port 6060
:host "devenv" :host "devenv"

View file

@ -6,6 +6,7 @@
(ns app.http.oauth (ns app.http.oauth
(:require (:require
[app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uri :as u] [app.common.uri :as u]
@ -138,16 +139,32 @@
(cond-> info (cond-> info
(some? (:invitation-token state)) (some? (:invitation-token state))
(assoc :invitation-token (:invitation-token state))))) (assoc :invitation-token (:invitation-token state))
(map? (:props state))
(d/merge (:props state)))))
;; --- HTTP HANDLERS ;; --- HTTP HANDLERS
(defn extract-props
[params]
(reduce-kv (fn [params k v]
(let [sk (name k)]
(cond-> params
(or (str/starts-with? sk "pm_")
(str/starts-with? sk "pm-"))
(assoc (-> sk str/kebab keyword) v))))
{}
params))
(defn- auth-handler (defn- auth-handler
[{:keys [tokens] :as cfg} request] [{:keys [tokens] :as cfg} {:keys [params] :as request}]
(let [invitation (get-in request [:params :invitation-token]) (let [invitation (:invitation-token params)
props (extract-props params)
state (tokens :generate state (tokens :generate
{:iss :oauth {:iss :oauth
:invitation-token invitation :invitation-token invitation
:props props
:exp (dt/in-future "15m")}) :exp (dt/in-future "15m")})
uri (build-auth-uri cfg state)] uri (build-auth-uri cfg state)]
{:status 200 {:status 200

View file

@ -0,0 +1,127 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.loggers.activity
"Activity registry logger consumer."
(:require
[app.common.data :as d]
[app.common.spec :as us]
[app.config :as cf]
[app.util.async :as aa]
[app.util.http :as http]
[app.util.logging :as l]
[app.util.time :as dt]
[app.util.transit :as t]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[lambdaisland.uri :as u]))
(declare process-event)
(declare handle-event)
(s/def ::uri ::us/string)
(defmethod ig/pre-init-spec ::reporter [_]
(s/keys :req-un [::wrk/executor]
:opt-un [::uri]))
(defmethod ig/init-key ::reporter
[_ {:keys [uri] :as cfg}]
(if (string? uri)
(do
(l/info :msg "intializing activity reporter" :uri uri)
(let [xform (comp (map process-event)
(filter map?))
input (a/chan (a/sliding-buffer 1024) xform)]
(a/go-loop []
(when-let [event (a/<! input)]
(a/<! (handle-event cfg event))
(recur)))
(fn [& [cmd & params]]
(case cmd
:stop (a/close! input)
:submit (when-not (a/offer! input (first params))
(l/warn :msg "activity channel is full"))))))
(constantly nil)))
(defmethod ig/halt-key! ::reporter
[_ f]
(f :stop))
(defn- clean-params
"Cleans the params from complex data, only accept strings, numbers and
uuids and removing sensitive data such as :password and related
props."
[params]
(let [params (dissoc params :profile-id :session-id :password :old-password)]
(reduce-kv (fn [params k v]
(cond-> params
(or (string? v)
(uuid? v)
(number? v))
(assoc k v)))
{}
params)))
(defn- process-event
[{:keys [type name params result] :as event}]
(let [profile-id (:profile-id params)]
(if (uuid? profile-id)
{:type (str "backend:" (d/name type))
:name name
:timestamp (dt/now)
:profile-id profile-id
:props (clean-params params)}
(cond
(= "register-profile" name)
{:type (str "backend:" (d/name type))
:name name
:timestamp (dt/now)
:profile-id (:id result)
:props (clean-params (:props result))}
:else nil))))
(defn- send-activity
[{:keys [uri tokens]} event i]
(try
(let [token (tokens :generate {:iss "authentication"
:iat (dt/now)
:uid (:profile-id event)})
body (t/encode {:events [event]})
headers {"content-type" "application/transit+json"
"origin" (cf/get :public-uri)
"cookie" (u/map->query-string {:auth-token token})}
params {:uri uri
:timeout 6000
:method :post
:headers headers
:body body}
response (http/send! params)]
(if (= (:status response) 204)
true
(do
(l/error :hint "error on sending activity"
:try i
:rsp (pr-str response))
false)))
(catch Exception e
(l/error :hint "error on sending message to loki"
:cause e
:try i)
false)))
(defn- handle-event
[{:keys [executor] :as cfg} event]
(aa/with-thread executor
(loop [i 1]
(when (and (not (send-activity cfg event i)) (< i 20))
(Thread/sleep (* i 2000))
(recur (inc i))))))

View file

@ -31,16 +31,16 @@
[_ {:keys [receiver uri] :as cfg}] [_ {:keys [receiver uri] :as cfg}]
(when uri (when uri
(l/info :msg "intializing loki reporter" :uri uri) (l/info :msg "intializing loki reporter" :uri uri)
(let [output (a/chan (a/sliding-buffer 1024))] (let [input (a/chan (a/sliding-buffer 1024))]
(receiver :sub output) (receiver :sub input)
(a/go-loop [] (a/go-loop []
(let [msg (a/<! output)] (let [msg (a/<! input)]
(if (nil? msg) (if (nil? msg)
(l/info :msg "stoping error reporting loop") (l/info :msg "stoping error reporting loop")
(do (do
(a/<! (handle-event cfg msg)) (a/<! (handle-event cfg msg))
(recur))))) (recur)))))
output))) input)))
(defmethod ig/halt-key! ::reporter (defmethod ig/halt-key! ::reporter
[_ output] [_ output]

View file

@ -6,7 +6,6 @@
(ns app.main (ns app.main
(:require (:require
[app.common.data :as d]
[app.config :as cf] [app.config :as cf]
[app.util.logging :as l] [app.util.logging :as l]
[app.util.time :as dt] [app.util.time :as dt]
@ -140,7 +139,8 @@
:storage (ig/ref :app.storage/storage) :storage (ig/ref :app.storage/storage)
:msgbus (ig/ref :app.msgbus/msgbus) :msgbus (ig/ref :app.msgbus/msgbus)
:rlimits (ig/ref :app.rlimits/all) :rlimits (ig/ref :app.rlimits/all)
:public-uri (cf/get :public-uri)} :public-uri (cf/get :public-uri)
:activity (ig/ref :app.loggers.activity/reporter)}
:app.notifications/handler :app.notifications/handler
{:msgbus (ig/ref :app.msgbus/msgbus) {:msgbus (ig/ref :app.msgbus/msgbus)
@ -263,6 +263,11 @@
:app.loggers.zmq/receiver :app.loggers.zmq/receiver
{:endpoint (cf/get :loggers-zmq-uri)} {:endpoint (cf/get :loggers-zmq-uri)}
:app.loggers.activity/reporter
{:uri (cf/get :activity-reporter-uri)
:tokens (ig/ref :app.tokens/tokens)
:executor (ig/ref :app.worker/executor)}
:app.loggers.loki/reporter :app.loggers.loki/reporter
{:uri (cf/get :loggers-loki-uri) {:uri (cf/get :loggers-loki-uri)
:receiver (ig/ref :app.loggers.zmq/receiver) :receiver (ig/ref :app.loggers.zmq/receiver)
@ -299,13 +304,6 @@
[::main :app.storage.db/backend] [::main :app.storage.db/backend]
{:pool (ig/ref :app.db/pool)}}) {:pool (ig/ref :app.db/pool)}})
(defmethod ig/init-key :default [_ data] data)
(defmethod ig/prep-key :default
[_ data]
(if (map? data)
(d/without-nils data)
data))
(def system nil) (def system nil)
(defn start (defn start

View file

@ -84,19 +84,31 @@
(rlm/execute rlinst (f cfg params)))) (rlm/execute rlinst (f cfg params))))
f)) f))
(defn- wrap-impl (defn- wrap-impl
[cfg f mdata] [{:keys [activity] :as cfg} f mdata]
(let [f (wrap-with-rlimits cfg f mdata) (let [f (wrap-with-rlimits cfg f mdata)
f (wrap-with-metrics cfg f mdata) f (wrap-with-metrics cfg f mdata)
spec (or (::sv/spec mdata) (s/spec any?))] spec (or (::sv/spec mdata) (s/spec any?))
auth? (:auth mdata true)]
(l/trace :action "register" (l/trace :action "register"
:name (::sv/name mdata)) :name (::sv/name mdata))
(fn [params] (fn [params]
(when (and (:auth mdata true) (not (uuid? (:profile-id params)))) (when (and auth? (not (uuid? (:profile-id params))))
(ex/raise :type :authentication (ex/raise :type :authentication
:code :authentication-required :code :authentication-required
:hint "authentication required for this endpoint")) :hint "authentication required for this endpoint"))
(f cfg (us/conform spec params))))) (let [params (us/conform spec params)
result (f cfg params)
;; On non authenticated handlers we check the private
;; result that can be found on the metadata.
result* (if auth? result (:result (meta result) {}))]
(when (::type cfg)
(activity :submit {:type (::type cfg)
:name (::sv/name mdata)
:params params
:result result*}))
result))))
(defn- process-method (defn- process-method
[cfg vfn] [cfg vfn]
@ -133,7 +145,7 @@
:registry (get-in cfg [:metrics :registry]) :registry (get-in cfg [:metrics :registry])
:type :histogram :type :histogram
:help "Timing of mutation services."}) :help "Timing of mutation services."})
cfg (assoc cfg ::mobj mobj)] cfg (assoc cfg ::mobj mobj ::type :mutation)]
(->> (sv/scan-ns 'app.rpc.mutations.demo (->> (sv/scan-ns 'app.rpc.mutations.demo
'app.rpc.mutations.media 'app.rpc.mutations.media
'app.rpc.mutations.profile 'app.rpc.mutations.profile
@ -152,9 +164,11 @@
(s/def ::storage some?) (s/def ::storage some?)
(s/def ::session map?) (s/def ::session map?)
(s/def ::tokens fn?) (s/def ::tokens fn?)
(s/def ::activity some?)
(defmethod ig/pre-init-spec ::rpc [_] (defmethod ig/pre-init-spec ::rpc [_]
(s/keys :req-un [::db/pool ::storage ::session ::tokens ::mtx/metrics ::rlm/rlimits])) (s/keys :req-un [::storage ::session ::tokens ::activity
::mtx/metrics ::rlm/rlimits ::db/pool]))
(defmethod ig/init-key ::rpc (defmethod ig/init-key ::rpc
[_ cfg] [_ cfg]

View file

@ -13,6 +13,7 @@
[app.config :as cfg] [app.config :as cfg]
[app.db :as db] [app.db :as db]
[app.emails :as eml] [app.emails :as eml]
[app.http.oauth :refer [extract-props]]
[app.media :as media] [app.media :as media]
[app.rpc.mutations.projects :as projects] [app.rpc.mutations.projects :as projects]
[app.rpc.mutations.teams :as teams] [app.rpc.mutations.teams :as teams]
@ -101,7 +102,8 @@
resp {:invitation-token token}] resp {:invitation-token token}]
(with-meta resp (with-meta resp
{:transform-response ((:create session) (:id profile)) {:transform-response ((:create session) (:id profile))
:before-complete (annotate-profile-register metrics profile)})) :before-complete (annotate-profile-register metrics profile)
:result profile}))
;; If no token is provided, send a verification email ;; If no token is provided, send a verification email
(let [vtoken (tokens :generate (let [vtoken (tokens :generate
@ -129,7 +131,8 @@
:extra-data ptoken}) :extra-data ptoken})
(with-meta profile (with-meta profile
{:before-complete (annotate-profile-register metrics profile)}))))) {:before-complete (annotate-profile-register metrics profile)
:result profile})))))
(defn email-domain-in-whitelist? (defn email-domain-in-whitelist?
"Returns true if email's domain is in the given whitelist or if given "Returns true if email's domain is in the given whitelist or if given
@ -174,11 +177,12 @@
(defn create-profile (defn create-profile
"Create the profile entry on the database with limited input "Create the profile entry on the database with limited input
filling all the other fields with defaults." filling all the other fields with defaults."
[conn {:keys [id fullname email password props is-active is-muted is-demo opts] [conn {:keys [id fullname email password is-active is-muted is-demo opts]
:or {is-active false is-muted false is-demo false}}] :or {is-active false is-muted false is-demo false}
:as params}]
(let [id (or id (uuid/next)) (let [id (or id (uuid/next))
is-active (if is-demo true is-active) is-active (if is-demo true is-active)
props (db/tjson (or props {})) props (-> params extract-props db/tjson)
password (derive-password password) password (derive-password password)
params {:id id params {:id id
:fullname fullname :fullname fullname

View file

@ -6,7 +6,6 @@
(ns app.rpc.queries.files (ns app.rpc.queries.files
(:require (:require
[app.common.exceptions :as ex]
[app.common.pages.migrations :as pmg] [app.common.pages.migrations :as pmg]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
@ -254,7 +253,7 @@
(defn retrieve-file-libraries (defn retrieve-file-libraries
[conn is-indirect file-id] [conn is-indirect file-id]
(let [libraries (->> (db/exec! conn [sql:file-libraries file-id]) (let [libraries (->> (db/exec! conn [sql:file-libraries file-id])
(map #(assoc :is-indirect is-indirect)) (map #(assoc % :is-indirect is-indirect))
(into #{} decode-row-xf))] (into #{} decode-row-xf))]
(reduce #(into %1 (retrieve-file-libraries conn true %2)) (reduce #(into %1 (retrieve-file-libraries conn true %2))
libraries libraries

View file

@ -179,7 +179,6 @@
(effect [_ state stream] (effect [_ state stream]
(let [profile-id (:profile-id state) (let [profile-id (:profile-id state)
events (into [] (take max-buffer-size) @buffer)] events (into [] (take max-buffer-size) @buffer)]
(prn ::persistence (count events) profile-id)
(when (seq events) (when (seq events)
(->> events (->> events
(filterv #(= profile-id (:profile-id %))) (filterv #(= profile-id (:profile-id %)))

View file

@ -54,8 +54,7 @@
[:& recovery-request-page {:locale locale}] [:& recovery-request-page {:locale locale}]
:auth-recovery :auth-recovery
[:& recovery-page {:locale locale [:& recovery-page {:locale locale :params params}])
:params (:query-params route)}])
[:div.terms-login [:div.terms-login
[:a {:href "https://penpot.app/terms.html" :target "_blank"} "Terms of service"] [:a {:href "https://penpot.app/terms.html" :target "_blank"} "Terms of service"]
[:span "and"] [:span "and"]