core/zero: add usage reporter (#5281)

* wip

* add response

* handle empty email

* use set, update log

* add test

* add coalesce, comments, test

* add test, fix bug

* use builtin cmp.Or

* remove wait ready call

* use api error
This commit is contained in:
Caleb Doxsey 2024-09-12 15:45:54 -06:00 committed by GitHub
parent 82a9dbe42a
commit 146efc1b13
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 697 additions and 2 deletions

View file

@ -17,6 +17,7 @@ import (
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/pomerium/pomerium/internal/zero/bootstrap/writers"
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/internal/zero/controller/usagereporter"
"github.com/pomerium/pomerium/internal/zero/healthcheck"
"github.com/pomerium/pomerium/internal/zero/reconciler"
"github.com/pomerium/pomerium/internal/zero/telemetry"
@ -160,6 +161,7 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error {
c.runSessionAnalyticsLeased,
c.runHealthChecksLeased,
leaseStatus.MonitorLease,
c.runUsageReporter,
),
)
})
@ -208,6 +210,14 @@ func (c *controller) runHealthChecksLeased(ctx context.Context, client databroke
})
}
func (c *controller) runUsageReporter(ctx context.Context, client databroker.DataBrokerServiceClient) error {
ur := usagereporter.New(c.api, c.bootstrapConfig.GetConfig().ZeroOrganizationID, time.Minute)
return retry.WithBackoff(ctx, "zero-usage-reporter", func(ctx context.Context) error {
// start the usage reporter
return ur.Run(ctx, client)
})
}
func (c *controller) getEnvoyScrapeURL() string {
return (&url.URL{
Scheme: "http",

View file

@ -0,0 +1,217 @@
// Package usagereporter reports usage for a cluster.
//
// Usage is determined from session and user records in the databroker. The usage reporter
// uses SyncLatest and Sync to retrieve this data, builds a collection of records and then
// sends them to the Zero Cluster API every minute.
//
// All usage users are reported on start but only the changed users are reported while running.
// The Zero Cluster API is tolerant of redundant data.
package usagereporter
import (
"cmp"
"context"
"sync"
"time"
backoff "github.com/cenkalti/backoff/v4"
set "github.com/hashicorp/go-set/v3"
"golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/zero/cluster"
)
// API is the part of the Zero Cluster API used to report usage.
type API interface {
ReportUsage(ctx context.Context, req cluster.ReportUsageRequest) error
}
type usageReporterRecord struct {
userID string
userEmail string
lastSignedInAt time.Time
}
// A UsageReporter reports usage to the zero api.
type UsageReporter struct {
api API
organizationID string
reportInterval time.Duration
mu sync.Mutex
byUserID map[string]usageReporterRecord
updates *set.Set[string]
}
// New creates a new UsageReporter.
func New(api API, organizationID string, reportInterval time.Duration) *UsageReporter {
return &UsageReporter{
api: api,
organizationID: organizationID,
reportInterval: reportInterval,
byUserID: make(map[string]usageReporterRecord),
updates: set.New[string](0),
}
}
// Run runs the usage reporter.
func (ur *UsageReporter) Run(ctx context.Context, client databroker.DataBrokerServiceClient) error {
ctx = log.Ctx(ctx).With().Str("organization-id", ur.organizationID).Logger().WithContext(ctx)
// first initialize the user collection
serverVersion, latestSessionRecordVersion, latestUserRecordVersion, err := ur.runInit(ctx, client)
if err != nil {
return err
}
// run the continuous sync calls and periodically report usage
return ur.runSync(ctx, client, serverVersion, latestSessionRecordVersion, latestUserRecordVersion)
}
func (ur *UsageReporter) report(ctx context.Context, records []usageReporterRecord) error {
req := cluster.ReportUsageRequest{
Users: convertUsageReporterRecords(ur.organizationID, records),
}
return backoff.Retry(func() error {
log.Debug(ctx).Int("updated-users", len(req.Users)).Msg("reporting usage")
err := ur.api.ReportUsage(ctx, req)
if err != nil {
log.Warn(ctx).Err(err).Msg("error reporting usage")
}
return err
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
}
func (ur *UsageReporter) runInit(
ctx context.Context,
client databroker.DataBrokerServiceClient,
) (serverVersion, latestSessionRecordVersion, latestUserRecordVersion uint64, err error) {
_, latestSessionRecordVersion, err = databroker.SyncLatestRecords(ctx, client, ur.onUpdateSession)
if err != nil {
return 0, 0, 0, err
}
serverVersion, latestUserRecordVersion, err = databroker.SyncLatestRecords(ctx, client, ur.onUpdateUser)
if err != nil {
return 0, 0, 0, err
}
return serverVersion, latestSessionRecordVersion, latestUserRecordVersion, nil
}
func (ur *UsageReporter) runSync(
ctx context.Context,
client databroker.DataBrokerServiceClient,
serverVersion, latestSessionRecordVersion, latestUserRecordVersion uint64,
) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return databroker.SyncRecords(ctx, client, serverVersion, latestSessionRecordVersion, ur.onUpdateSession)
})
eg.Go(func() error {
return databroker.SyncRecords(ctx, client, serverVersion, latestUserRecordVersion, ur.onUpdateUser)
})
eg.Go(func() error {
return ur.runReporter(ctx)
})
return eg.Wait()
}
func (ur *UsageReporter) runReporter(ctx context.Context) error {
// every minute collect any updates and submit them to the API
timer := time.NewTicker(ur.reportInterval)
defer timer.Stop()
for {
// collect the updated records since last run
ur.mu.Lock()
records := make([]usageReporterRecord, 0, ur.updates.Size())
for userID := range ur.updates.Items() {
records = append(records, ur.byUserID[userID])
}
ur.updates = set.New[string](0)
ur.mu.Unlock()
if len(records) > 0 {
err := ur.report(ctx, records)
if err != nil {
return err
}
}
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
}
}
func (ur *UsageReporter) onUpdateSession(s *session.Session) {
userID := s.GetUserId()
if userID == "" {
// ignore sessions without a user id
return
}
ur.mu.Lock()
defer ur.mu.Unlock()
r := ur.byUserID[userID]
nr := r
nr.lastSignedInAt = latest(nr.lastSignedInAt, s.GetIssuedAt().AsTime())
nr.userID = userID
if nr != r {
ur.byUserID[userID] = nr
ur.updates.Insert(userID)
}
}
func (ur *UsageReporter) onUpdateUser(u *user.User) {
userID := u.GetId()
if userID == "" {
// ignore users without a user id
return
}
ur.mu.Lock()
defer ur.mu.Unlock()
r := ur.byUserID[userID]
nr := r
nr.userID = userID
nr.userEmail = cmp.Or(nr.userEmail, u.GetEmail())
if nr != r {
ur.byUserID[userID] = nr
ur.updates.Insert(userID)
}
}
func convertUsageReporterRecords(organizationID string, records []usageReporterRecord) []cluster.ReportUsageUser {
var users []cluster.ReportUsageUser
for _, record := range records {
u := cluster.ReportUsageUser{
LastSignedInAt: record.lastSignedInAt,
PseudonymousId: cryptutil.Pseudonymize(organizationID, record.userID),
}
if record.userEmail != "" {
u.PseudonymousEmail = cryptutil.Pseudonymize(organizationID, record.userEmail)
}
users = append(users, u)
}
return users
}
// latest returns the latest time.
func latest(t1, t2 time.Time) time.Time {
if t2.After(t1) {
return t2
}
return t1
}

View file

@ -0,0 +1,156 @@
package usagereporter
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/databroker"
"github.com/pomerium/pomerium/internal/testutil"
databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/zero/cluster"
)
type mockAPI struct {
reportUsage func(ctx context.Context, req cluster.ReportUsageRequest) error
}
func (m mockAPI) ReportUsage(ctx context.Context, req cluster.ReportUsageRequest) error {
return m.reportUsage(ctx, req)
}
func TestUsageReporter(t *testing.T) {
t.Parallel()
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
t.Cleanup(clearTimeout)
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
cc := testutil.NewGRPCServer(t, func(srv *grpc.Server) {
databrokerpb.RegisterDataBrokerServiceServer(srv, databroker.New())
})
t.Cleanup(func() { cc.Close() })
tm1 := time.Date(2024, time.September, 11, 11, 56, 0, 0, time.UTC)
requests := make(chan cluster.ReportUsageRequest, 1)
client := databrokerpb.NewDataBrokerServiceClient(cc)
ur := New(mockAPI{
reportUsage: func(ctx context.Context, req cluster.ReportUsageRequest) error {
select {
case <-ctx.Done():
return ctx.Err()
case requests <- req:
}
return nil
},
}, "bQjwPpxcwJRbvsSMFgbZFkXmxFJ", time.Millisecond*100)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return ur.Run(ctx, client)
})
eg.Go(func() error {
_, err := databrokerpb.Put(ctx, client,
&session.Session{
Id: "S1a",
UserId: "U1",
IssuedAt: timestamppb.New(tm1),
},
&session.Session{
Id: "S1b",
UserId: "U1",
IssuedAt: timestamppb.New(tm1),
})
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case req := <-requests:
assert.Equal(t, cluster.ReportUsageRequest{
Users: []cluster.ReportUsageUser{{
LastSignedInAt: tm1,
PseudonymousId: "095xqqsjEEgYf5Yf+TAjWjooMQyh6jSV5SCPGe9eqvg=",
}},
}, req, "should send a single usage record")
}
_, err = databrokerpb.Put(ctx, client,
&user.User{
Id: "U1",
Email: "u1@example.com",
})
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case req := <-requests:
assert.Equal(t, cluster.ReportUsageRequest{
Users: []cluster.ReportUsageUser{{
LastSignedInAt: tm1,
PseudonymousEmail: "iq8/fj+uZaKitkWY12JIQgKJ5KIP+E0Cmy/HpxpdBXY=",
PseudonymousId: "095xqqsjEEgYf5Yf+TAjWjooMQyh6jSV5SCPGe9eqvg=",
}},
}, req, "should send another usage record with the email set")
}
cancel()
return nil
})
err := eg.Wait()
if err != nil && !errors.Is(ctx.Err(), context.Canceled) {
assert.NoError(t, err)
}
}
func Test_convertUsageReporterRecords(t *testing.T) {
t.Parallel()
tm1 := time.Date(2024, time.September, 11, 11, 56, 0, 0, time.UTC)
assert.Empty(t, convertUsageReporterRecords("XXX", nil))
assert.Equal(t, []cluster.ReportUsageUser{{
LastSignedInAt: tm1,
PseudonymousId: "T9V1yL/UueF/LVuF6XjoSNde0INElXG10zKepmyPke8=",
PseudonymousEmail: "8w5rtnZyv0EGkpHmTlkmupgb1jCzn/IxGCfvpdGGnvI=",
}}, convertUsageReporterRecords("XXX", []usageReporterRecord{{
userID: "ID",
userEmail: "EMAIL@example.com",
lastSignedInAt: tm1,
}}))
assert.Equal(t, []cluster.ReportUsageUser{{
LastSignedInAt: tm1,
PseudonymousId: "T9V1yL/UueF/LVuF6XjoSNde0INElXG10zKepmyPke8=",
}}, convertUsageReporterRecords("XXX", []usageReporterRecord{{
userID: "ID",
lastSignedInAt: tm1,
}}), "should leave empty email")
}
func Test_latest(t *testing.T) {
t.Parallel()
tm1 := time.Date(2024, time.September, 11, 11, 56, 0, 0, time.UTC)
tm2 := time.Date(2024, time.September, 12, 11, 56, 0, 0, time.UTC)
assert.Equal(t, tm2, latest(tm1, tm2))
assert.Equal(t, tm2, latest(tm2, tm1), "should ignore ordering")
assert.Equal(t, tm1, latest(tm1, time.Time{}), "should handle zero time")
}