🎉 Add webhook processing worker

This commit is contained in:
Andrey Antukh 2022-12-07 14:47:32 +01:00
parent d768711caa
commit 5b9f0ed0b1
9 changed files with 308 additions and 64 deletions

View file

@ -17,6 +17,7 @@
[app.db :as db] [app.db :as db]
[app.http.client :as http] [app.http.client :as http]
[app.loggers.audit.tasks :as-alias tasks] [app.loggers.audit.tasks :as-alias tasks]
[app.loggers.webhooks :as-alias webhooks]
[app.main :as-alias main] [app.main :as-alias main]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.tokens :as tokens] [app.tokens :as tokens]
@ -103,10 +104,11 @@
(s/def ::type ::us/string) (s/def ::type ::us/string)
(s/def ::props (s/map-of ::us/keyword any?)) (s/def ::props (s/map-of ::us/keyword any?))
(s/def ::ip-addr ::us/string) (s/def ::ip-addr ::us/string)
(s/def ::webhooks/event? ::us/boolean)
(s/def ::event (s/def ::event
(s/keys :req-un [::type ::name ::profile-id] (s/keys :req-un [::type ::name ::profile-id]
:opt-un [::ip-addr ::props])) :opt-un [::ip-addr ::props ::webhooks/event?]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; COLLECTOR ;; COLLECTOR
@ -117,8 +119,7 @@
;; an external storage and data cleared. ;; an external storage and data cleared.
(s/def ::collector (s/def ::collector
(s/nilable (s/keys :req [::wrk/executor ::db/pool]))
(s/keys :req [::wrk/executor ::db/pool])))
(defmethod ig/pre-init-spec ::collector [_] (defmethod ig/pre-init-spec ::collector [_]
(s/keys :req [::db/pool ::wrk/executor ::mtx/metrics])) (s/keys :req [::db/pool ::wrk/executor ::mtx/metrics]))
@ -126,11 +127,8 @@
(defmethod ig/init-key ::collector (defmethod ig/init-key ::collector
[_ {:keys [::db/pool] :as cfg}] [_ {:keys [::db/pool] :as cfg}]
(cond (cond
(not (contains? cf/flags :audit-log))
(l/info :hint "audit: log collection disabled")
(db/read-only? pool) (db/read-only? pool)
(l/warn :hint "audit: log collection disabled (db is read-only)") (l/warn :hint "audit: disabled (db is read-only)")
:else :else
cfg)) cfg))
@ -138,19 +136,35 @@
(defn- persist-event! (defn- persist-event!
[pool event] [pool event]
(us/verify! ::event event) (us/verify! ::event event)
(db/insert! pool :audit-log (let [params {:id (uuid/next)
{:id (uuid/next)
:name (:name event) :name (:name event)
:type (:type event) :type (:type event)
:profile-id (:profile-id event) :profile-id (:profile-id event)
:tracked-at (dt/now) :tracked-at (dt/now)
:ip-addr (some-> (:ip-addr event) db/inet) :ip-addr (:ip-addr event)
:props (db/tjson (:props event)) :props (:props event)}]
:source "backend"}))
(when (contains? cf/flags :audit-log)
(db/insert! pool :audit-log
(-> params
(update :props db/tjson)
(update :ip-addr db/inet)
(assoc :source "backend"))))
(when (and (contains? cf/flags :webhooks)
(::webhooks/event? event))
(wrk/submit! ::wrk/conn pool
::wrk/task :process-webhook-event
::wrk/queue :webhooks
::wrk/max-retries 0
::webhooks/event (-> params
(dissoc :ip-addr)
(dissoc :type))))))
(defn submit! (defn submit!
"Submit audit event to the collector." "Submit audit event to the collector."
[{:keys [::wrk/executor ::db/pool]} params] [{:keys [::wrk/executor ::db/pool] :as collector} params]
(us/assert! ::collector collector)
(->> (px/submit! executor (partial persist-event! pool (d/without-nils params))) (->> (px/submit! executor (partial persist-event! pool (d/without-nils params)))
(p/merr (fn [cause] (p/merr (fn [cause]
(l/error :hint "audit: unexpected error processing event" :cause cause) (l/error :hint "audit: unexpected error processing event" :cause cause)

View file

@ -0,0 +1,171 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.loggers.webhooks
"A mattermost integration for error reporting."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.common.transit :as t]
[app.common.uri :as uri]
[app.db :as db]
[app.http.client :as http]
[app.util.json :as json]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]))
;; --- PROC
(defn lookup-webhooks-by-team
[pool team-id]
(db/exec! pool ["select * from webhook where team_id=? and is_active=true" team-id]))
(defn lookup-webhooks-by-project
[pool project-id]
(let [sql [(str "select * from webhook as w"
" join project as p on (p.team_id = w.team_id)"
" where p.id = ? and w.is_active = true")
project-id]]
(db/exec! pool sql)))
(defn lookup-webhooks-by-file
[pool file-id]
(let [sql [(str "select * from webhook as w"
" join project as p on (p.team_id = w.team_id)"
" join file as f on (f.project_id = p.id)"
" where f.id = ? and w.is_active = true")
file-id]]
(db/exec! pool sql)))
(defn lookup-webhooks
[{:keys [::db/pool]} {:keys [props] :as event}]
(or (some->> (:team-id props) (lookup-webhooks-by-team pool))
(some->> (:project-id props) (lookup-webhooks-by-project pool))
(some->> (:file-id props) (lookup-webhooks-by-file pool))))
(defmethod ig/pre-init-spec ::process-event-handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::process-event-handler
[_ {:keys [::db/pool] :as cfg}]
(fn [{:keys [props] :as task}]
(let [event (::event props)]
(l/debug :hint "process webhook event"
:name (:name event))
(when-let [items (lookup-webhooks cfg event)]
;; (app.common.pprint/pprint items)
(l/trace :hint "webhooks found for event" :total (count items))
(db/with-atomic [conn pool]
(doseq [item items]
(wrk/submit! ::wrk/conn conn
::wrk/task :run-webhook
::wrk/queue :webhooks
::wrk/max-retries 3
::event event
::config item)))))))
;; --- RUN
(declare interpret-exception)
(declare interpret-response)
(def ^:private mapper
(json/mapper
{:encode-key-fn str/camel
:decode-key-fn (comp keyword str/kebab)
:pretty true}))
(defmethod ig/pre-init-spec ::run-webhook-handler [_]
(s/keys :req [::http/client ::db/pool]))
(defmethod ig/prep-key ::run-webhook-handler
[_ cfg]
(merge {::max-errors 3} (d/without-nils cfg)))
(defmethod ig/init-key ::run-webhook-handler
[_ {:keys [::db/pool ::max-errors] :as cfg}]
(letfn [(update-webhook! [whook err]
(if err
(let [sql [(str "update webhook "
" set error_code=?, "
" error_count=error_count+1 "
" where id=?")
err
(:id whook)]
res (db/exec-one! pool sql {:return-keys true})]
(when (>= (:error-count res) max-errors)
(db/update! pool :webhook {:is-active false} {:id (:id whook)})))
(db/update! pool :webhook
{:updated-at (dt/now)
:error-code nil
:error-count 0}
{:id (:id whook)})))
(report-delivery! [whook req rsp err]
(db/insert! pool :webhook-delivery
{:webhook-id (:id whook)
:created-at (dt/now)
:error-code err
:req-data (db/tjson req)
:rsp-data (db/tjson rsp)}))]
(fn [{:keys [props] :as task}]
(let [event (::event props)
whook (::config props)
body (case (:mtype whook)
"application/json" (json/encode-str event mapper)
"application/transit+json" (t/encode-str event)
"application/x-www-form-urlencoded" (uri/map->query-string event))]
(l/debug :hint "run webhook"
:event-name (:name event)
:webhook-id (:id whook)
:webhook-uri (:uri whook)
:webhook-mtype (:mtype whook))
(let [req {:uri (:uri whook)
:headers {"content-type" (:mtype whook)}
:timeout (dt/duration "4s")
:method :post
:body body}]
(try
(let [rsp (http/req! cfg req {:response-type :input-stream :sync? true})
err (interpret-response rsp)]
(report-delivery! whook req rsp err)
(update-webhook! whook err))
(catch Throwable cause
(let [err (interpret-exception cause)]
(report-delivery! whook req nil err)
(update-webhook! whook err)
(when (= err "unknown")
(l/error :hint "unknown error on webhook request"
:cause cause))))))))))
(defn interpret-response
[{:keys [status] :as response}]
(when-not (or (= 200 status)
(= 204 status))
(str/ffmt "unexpected-status:%" status)))
(defn interpret-exception
[cause]
(cond
(instance? javax.net.ssl.SSLHandshakeException cause)
"ssl-validation-error"
(instance? java.net.ConnectException cause)
"connection-error"
(instance? java.net.http.HttpConnectTimeoutException cause)
"timeout"
))

View file

@ -15,6 +15,7 @@
[app.http.session :as-alias http.session] [app.http.session :as-alias http.session]
[app.loggers.audit :as-alias audit] [app.loggers.audit :as-alias audit]
[app.loggers.audit.tasks :as-alias audit.tasks] [app.loggers.audit.tasks :as-alias audit.tasks]
[app.loggers.webhooks :as-alias webhooks]
[app.loggers.zmq :as-alias lzmq] [app.loggers.zmq :as-alias lzmq]
[app.metrics :as-alias mtx] [app.metrics :as-alias mtx]
[app.metrics.definition :as-alias mdef] [app.metrics.definition :as-alias mdef]
@ -357,7 +358,12 @@
:telemetry (ig/ref :app.tasks.telemetry/handler) :telemetry (ig/ref :app.tasks.telemetry/handler)
:session-gc (ig/ref :app.http.session/gc-task) :session-gc (ig/ref :app.http.session/gc-task)
:audit-log-archive (ig/ref ::audit.tasks/archive) :audit-log-archive (ig/ref ::audit.tasks/archive)
:audit-log-gc (ig/ref ::audit.tasks/gc)}} :audit-log-gc (ig/ref ::audit.tasks/gc)
:process-webhook-event
(ig/ref ::webhooks/process-event-handler)
:run-webhook
(ig/ref ::webhooks/run-webhook-handler)}}
:app.emails/sendmail :app.emails/sendmail
@ -420,6 +426,14 @@
::audit.tasks/gc ::audit.tasks/gc
{::db/pool (ig/ref ::db/pool)} {::db/pool (ig/ref ::db/pool)}
::webhooks/process-event-handler
{::db/pool (ig/ref ::db/pool)
::http.client/client (ig/ref ::http.client/client)}
::webhooks/run-webhook-handler
{::db/pool (ig/ref ::db/pool)
::http.client/client (ig/ref ::http.client/client)}
:app.loggers.loki/reporter :app.loggers.loki/reporter
{::lzmq/receiver (ig/ref ::lzmq/receiver) {::lzmq/receiver (ig/ref ::lzmq/receiver)
::http.client/client (ig/ref ::http.client/client)} ::http.client/client (ig/ref ::http.client/client)}

View file

@ -265,6 +265,9 @@
{:name "0085-add-webhook-table" {:name "0085-add-webhook-table"
:fn (mg/resource "app/migrations/sql/0085-add-webhook-table.sql")} :fn (mg/resource "app/migrations/sql/0085-add-webhook-table.sql")}
{:name "0086-add-webhook-delivery-table"
:fn (mg/resource "app/migrations/sql/0086-add-webhook-delivery-table.sql")}
]) ])

View file

@ -0,0 +1,16 @@
CREATE TABLE webhook_delivery (
webhook_id uuid NOT NULL REFERENCES webhook(id) ON DELETE CASCADE DEFERRABLE,
created_at timestamptz NOT NULL DEFAULT now(),
error_code text NULL,
req_data jsonb NULL,
rsp_data jsonb NULL,
PRIMARY KEY (webhook_id, created_at)
);
ALTER TABLE webhook_delivery
ALTER COLUMN error_code SET STORAGE external,
ALTER COLUMN req_data SET STORAGE external,
ALTER COLUMN rsp_data SET STORAGE external;

View file

@ -16,6 +16,7 @@
[app.http.client :as-alias http.client] [app.http.client :as-alias http.client]
[app.http.session :as-alias http.session] [app.http.session :as-alias http.session]
[app.loggers.audit :as audit] [app.loggers.audit :as audit]
[app.loggers.webhooks :as-alias webhooks]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.msgbus :as-alias mbus] [app.msgbus :as-alias mbus]
[app.rpc.climit :as climit] [app.rpc.climit :as climit]
@ -155,18 +156,24 @@
(:profile-id result) (:profile-id result)
(:profile-id params) (:profile-id params)
uuid/zero) uuid/zero)
props (or (::audit/replace-props resultm) props (or (::audit/replace-props resultm)
(-> params (-> params
(d/without-qualified)
(merge (::audit/props resultm)) (merge (::audit/props resultm))
(dissoc :profile-id) (dissoc :profile-id)
(dissoc :type))) (dissoc :type)))
event {:type (or (::audit/type resultm) event {:type (or (::audit/type resultm)
(::type cfg)) (::type cfg))
:name (or (::audit/name resultm) :name (or (::audit/name resultm)
(::sv/name mdata)) (::sv/name mdata))
:profile-id profile-id :profile-id profile-id
:ip-addr (some-> request audit/parse-client-ip) :ip-addr (some-> request audit/parse-client-ip)
:props (d/without-qualified props)}] :props props
::webhooks/event? (or (::webhooks/event? mdata)
(::webhooks/event? resultm)
false)}]
(audit/submit! collector event))) (audit/submit! collector event)))

View file

@ -11,6 +11,7 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.http.client :as http] [app.http.client :as http]
[app.loggers.webhooks :as webhooks]
[app.rpc.doc :as-alias doc] [app.rpc.doc :as-alias doc]
[app.rpc.queries.teams :refer [check-edition-permissions! check-read-permissions!]] [app.rpc.queries.teams :refer [check-edition-permissions! check-read-permissions!]]
[app.util.services :as sv] [app.util.services :as sv]
@ -35,77 +36,83 @@
(s/keys :req-un [::profile-id ::team-id ::uri ::mtype] (s/keys :req-un [::profile-id ::team-id ::uri ::mtype]
:opt-un [::is-active])) :opt-un [::is-active]))
;; FIXME: validate ;; NOTE: for now the quote is hardcoded but this need to be solved in
;; FIXME: default ratelimit ;; a more universal way for handling properly object quotes
;; FIXME: quotes (def max-hooks-for-team 8)
(defn- validate-webhook! (defn- validate-webhook!
[cfg whook params] [cfg whook params]
(letfn [(handle-exception [exception] (letfn [(handle-exception [exception]
(cond (if-let [hint (webhooks/interpret-exception exception)]
(instance? java.util.concurrent.CompletionException exception)
(handle-exception (ex/cause exception))
(instance? javax.net.ssl.SSLHandshakeException exception)
(ex/raise :type :validation (ex/raise :type :validation
:code :webhook-validation :code :webhook-validation
:hint "ssl-validation") :hint hint)
(ex/raise :type :internal
:else
(ex/raise :type :validation
:code :webhook-validation :code :webhook-validation
:hint "unknown"
:cause exception))) :cause exception)))
(handle-response [{:keys [status] :as response}] (handle-response [response]
(when (not= status 200) (when-let [hint (webhooks/interpret-response response)]
(ex/raise :type :validation (ex/raise :type :validation
:code :webhook-validation :code :webhook-validation
:hint (str/ffmt "unexpected-status-%" (:status response)))))] :hint hint)))]
(if (not= (:uri whook) (:uri params)) (if (not= (:uri whook) (:uri params))
(->> (http/req! cfg {:method :head (->> (http/req! cfg {:method :head
:uri (:uri params) :uri (:uri params)
:timeout (dt/duration "2s")}) :timeout (dt/duration "3s")})
(p/hmap (fn [response exception] (p/hmap (fn [response exception]
(if exception (if exception
(handle-exception exception) (handle-exception exception)
(handle-response response))))) (handle-response response)))))
(p/resolved nil)))) (p/resolved nil))))
(sv/defmethod ::create-webhook (defn- validate-quotes!
{::doc/added "1.17"} [{:keys [::db/pool]} {:keys [team-id]}]
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id uri mtype is-active] :as params}] (let [sql ["select count(*) as total from webhook where team_id = ?" team-id]
(check-edition-permissions! pool profile-id team-id) total (:total (db/exec-one! pool sql))]
(letfn [(insert-webhook [_] (when (>= total max-hooks-for-team)
(ex/raise :type :restriction
:code :webhooks-quote-reached
:hint (str/ffmt "can't create more than % webhooks per team" max-hooks-for-team)))))
(defn- insert-webhook!
[{:keys [::db/pool]} {:keys [team-id uri mtype is-active] :as params}]
(db/insert! pool :webhook (db/insert! pool :webhook
{:id (uuid/next) {:id (uuid/next)
:team-id team-id :team-id team-id
:uri uri :uri uri
:is-active is-active :is-active is-active
:mtype mtype}))] :mtype mtype}))
(->> (validate-webhook! cfg nil params)
(p/fmap executor insert-webhook))))
(s/def ::update-webhook (defn- update-webhook!
(s/keys :req-un [::id ::uri ::mtype ::is-active])) [{:keys [::db/pool] :as cfg} {:keys [id] :as wook} {:keys [uri mtype is-active] :as params}]
(sv/defmethod ::update-webhook
{::doc/added "1.17"}
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id id uri mtype is-active] :as params}]
(let [whook (db/get pool :webhook {:id id})
update-fn (fn [_]
(db/update! pool :webhook (db/update! pool :webhook
{:uri uri {:uri uri
:is-active is-active :is-active is-active
:mtype mtype :mtype mtype
:error-code nil :error-code nil
:error-count 0} :error-count 0}
{:id id}))] {:id id}))
(check-edition-permissions! pool profile-id (:team-id whook))
(sv/defmethod ::create-webhook
{::doc/added "1.17"}
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [profile-id team-id] :as params}]
(check-edition-permissions! pool profile-id team-id)
(->> (validate-webhook! cfg nil params)
(p/fmap executor (fn [_] (validate-quotes! cfg params)))
(p/fmap executor (fn [_] (insert-webhook! cfg params)))))
(s/def ::update-webhook
(s/keys :req-un [::id ::uri ::mtype ::is-active]))
(sv/defmethod ::update-webhook
{::doc/added "1.17"}
[{:keys [::db/pool ::wrk/executor] :as cfg} {:keys [id profile-id] :as params}]
(let [whook (db/get pool :webhook {:id id})]
(check-edition-permissions! pool profile-id (:team-id whook))
(->> (validate-webhook! cfg whook params) (->> (validate-webhook! cfg whook params)
(p/fmap executor update-fn)))) (p/fmap executor (fn [_] (update-webhook! cfg whook params))))))
(s/def ::delete-webhook (s/def ::delete-webhook
(s/keys :req-un [::profile-id ::id])) (s/keys :req-un [::profile-id ::id]))

View file

@ -64,6 +64,10 @@
[mdw mdata] [mdw mdata]
(vary-meta mdw merge mdata)) (vary-meta mdw merge mdata))
(defn assoc-meta
[mdw k v]
(vary-meta mdw assoc k v))
(defn with-http-cache (defn with-http-cache
[mdw max-age] [mdw max-age]
(vary-meta mdw update ::rpc/response-transform-fns conj (vary-meta mdw update ::rpc/response-transform-fns conj

View file

@ -9,6 +9,10 @@
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.rpc.permissions :as perms] [app.rpc.permissions :as perms]
[app.rpc.queries.projects :as proj] [app.rpc.queries.projects :as proj]
[app.rpc.queries.teams :as teams] [app.rpc.queries.teams :as teams]
@ -22,7 +26,6 @@
(s/def ::name ::us/string) (s/def ::name ::us/string)
(s/def ::profile-id ::us/uuid) (s/def ::profile-id ::us/uuid)
;; --- Mutation: Create Project ;; --- Mutation: Create Project
(declare create-project) (declare create-project)
@ -35,6 +38,8 @@
:opt-un [::id])) :opt-un [::id]))
(sv/defmethod ::create-project (sv/defmethod ::create-project
{::doc/added "1.0"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [profile-id team-id] :as params}] [{:keys [pool] :as cfg} {:keys [profile-id team-id] :as params}]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(teams/check-edition-permissions! conn profile-id team-id) (teams/check-edition-permissions! conn profile-id team-id)
@ -122,10 +127,13 @@
;; this is not allowed. ;; this is not allowed.
(sv/defmethod ::delete-project (sv/defmethod ::delete-project
{::doc/added "1.0"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}] [{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(proj/check-edition-permissions! conn profile-id id) (proj/check-edition-permissions! conn profile-id id)
(db/update! conn :project (let [project (db/update! conn :project
{:deleted-at (dt/now)} {:deleted-at (dt/now)}
{:id id :is-default false}) {:id id :is-default false})]
nil)) (rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project)}}))))