Improve update-file webhook batching

make it per user
This commit is contained in:
Andrey Antukh 2023-01-06 16:27:23 +01:00
parent 97a884018f
commit b235d3f0f2
4 changed files with 28 additions and 8 deletions

View file

@ -8,6 +8,7 @@
"Services related to the user activity (audit log)." "Services related to the user activity (audit log)."
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us] [app.common.spec :as us]
@ -20,6 +21,7 @@
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as-alias webhooks]
[app.main :as-alias main] [app.main :as-alias main]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.rpc :as-alias rpc]
[app.tokens :as tokens] [app.tokens :as tokens]
[app.util.retry :as rtry] [app.util.retry :as rtry]
[app.util.time :as dt] [app.util.time :as dt]
@ -171,18 +173,20 @@
(::webhooks/event? event)) (::webhooks/event? event))
(let [batch-key (::webhooks/batch-key event) (let [batch-key (::webhooks/batch-key event)
batch-timeout (::webhooks/batch-timeout event) batch-timeout (::webhooks/batch-timeout event)
label-suffix (when (ifn? batch-key) label (dm/str "rpc:" (:name params))
(str/ffmt ":%" (batch-key (:props params)))) label (cond
dedupe? (boolean (ifn? batch-key) (dm/str label ":" (batch-key (::rpc/params event)))
(and batch-key batch-timeout))] (string? batch-key) (dm/str label ":" batch-key)
:else label)
dedupe? (boolean (and batch-key batch-timeout))]
(wrk/submit! ::wrk/conn pool (wrk/submit! ::wrk/conn pool
::wrk/task :process-webhook-event ::wrk/task :process-webhook-event
::wrk/queue :webhooks ::wrk/queue :webhooks
::wrk/max-retries 0 ::wrk/max-retries 0
::wrk/delay (or batch-timeout 0) ::wrk/delay (or batch-timeout 0)
::wrk/dedupe dedupe? ::wrk/dedupe dedupe?
::wrk/label ::wrk/label label
(str/ffmt "rpc:%1%2" (:name params) label-suffix)
::webhooks/event ::webhooks/event
(-> params (-> params

View file

@ -8,6 +8,7 @@
"A mattermost integration for error reporting." "A mattermost integration for error reporting."
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.transit :as t] [app.common.transit :as t]
[app.common.uri :as uri] [app.common.uri :as uri]
@ -21,6 +22,15 @@
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig])) [integrant.core :as ig]))
;; --- HELPERS
(defn key-fn
[k & keys]
(fn [params]
(reduce #(dm/str %1 ":" (get params %2))
(dm/str (get params k))
keys)))
;; --- PROC ;; --- PROC
(defn- lookup-webhooks-by-team (defn- lookup-webhooks-by-team

View file

@ -190,6 +190,12 @@
:profile-id profile-id :profile-id profile-id
:ip-addr (some-> request audit/parse-client-ip) :ip-addr (some-> request audit/parse-client-ip)
:props props :props props
;; NOTE: for batch-key lookup we need the params as-is
;; because the rpc api does not need to know the
;; audit/webhook specific object layout.
::params (dissoc params ::http/request)
::webhooks/batch-key ::webhooks/batch-key
(or (::webhooks/batch-key mdata) (or (::webhooks/batch-key mdata)
(::webhooks/batch-key resultm)) (::webhooks/batch-key resultm))

View file

@ -17,7 +17,7 @@
[app.config :as cf] [app.config :as cf]
[app.db :as db] [app.db :as db]
[app.loggers.audit :as audit] [app.loggers.audit :as audit]
[app.loggers.webhooks :as-alias webhooks] [app.loggers.webhooks :as webhooks]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.msgbus :as mbus] [app.msgbus :as mbus]
[app.rpc :as-alias rpc] [app.rpc :as-alias rpc]
@ -130,7 +130,7 @@
::climit/key-fn :id ::climit/key-fn :id
::webhooks/event? true ::webhooks/event? true
::webhooks/batch-timeout (dt/duration "2m") ::webhooks/batch-timeout (dt/duration "2m")
::webhooks/batch-key :id ::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id)
::doc/added "1.17"} ::doc/added "1.17"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id id] :as params}] [{:keys [pool] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]