From fd24e0e7be227d25561b10beed0676edd6381f65 Mon Sep 17 00:00:00 2001 From: Denis Mishin Date: Sat, 9 Dec 2023 22:24:23 -0500 Subject: [PATCH] cr --- internal/sets/hash.go | 9 +++ internal/zero/analytics/activeusers.go | 6 +- internal/zero/analytics/activeusers_test.go | 16 ++--- internal/zero/analytics/collector.go | 67 +++++++++++++++------ internal/zero/analytics/sessions.go | 7 ++- internal/zero/analytics/storage.go | 10 +++ internal/zero/controller/controller.go | 27 +++------ 7 files changed, 93 insertions(+), 49 deletions(-) diff --git a/internal/sets/hash.go b/internal/sets/hash.go index 1529a9c44..e18083852 100644 --- a/internal/sets/hash.go +++ b/internal/sets/hash.go @@ -31,3 +31,12 @@ func (s *Hash[T]) Has(element T) bool { func (s *Hash[T]) Size() int { return len(s.m) } + +// Items returns the set's elements as a slice. +func (s *Hash[T]) Items() []T { + items := make([]T, 0, len(s.m)) + for item := range s.m { + items = append(items, item) + } + return items +} diff --git a/internal/zero/analytics/activeusers.go b/internal/zero/analytics/activeusers.go index 1a3e87c73..b5dffeacf 100644 --- a/internal/zero/analytics/activeusers.go +++ b/internal/zero/analytics/activeusers.go @@ -7,6 +7,8 @@ import ( ) const ( + // activeUsersCap is the number of active users we would be able to track. + // for counter to work within the 1% error limit, actual number should be 80% of the cap. activeUsersCap = 10_000 ) @@ -60,7 +62,7 @@ func LoadActiveUsersCounter(state []byte, lastReset time.Time, resetFn IntervalR } // Update updates the counter with the current users -func (c *ActiveUsersCounter) Update(users []string, now time.Time) (wasReset bool) { +func (c *ActiveUsersCounter) Update(users []string, now time.Time) (current uint, wasReset bool) { if c.needsReset(c.lastReset, now) { c.Counter.Reset() c.lastReset = now @@ -69,7 +71,7 @@ func (c *ActiveUsersCounter) Update(users []string, now time.Time) (wasReset boo for _, user := range users { c.Mark(user) } - return wasReset + return c.Count(), wasReset } // GetLastReset returns the last time the counter was reset diff --git a/internal/zero/analytics/activeusers_test.go b/internal/zero/analytics/activeusers_test.go index bc09d39b1..226365159 100644 --- a/internal/zero/analytics/activeusers_test.go +++ b/internal/zero/analytics/activeusers_test.go @@ -17,21 +17,21 @@ func TestActiveUsers(t *testing.T) { // Create a new counter that resets on a daily interval c := analytics.NewActiveUsersCounter(analytics.ResetDailyUTC, startTime) - wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute)) + count, wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute)) assert.False(t, wasReset) - assert.EqualValues(t, 2, c.Count()) + assert.EqualValues(t, 2, count) - wasReset = c.Update([]string{"user1", "user2", "user3"}, startTime.Add(time.Minute*2)) + count, wasReset = c.Update([]string{"user1", "user2", "user3"}, startTime.Add(time.Minute*2)) assert.False(t, wasReset) - assert.EqualValues(t, 3, c.Count()) + assert.EqualValues(t, 3, count) // Update the counter with a new user after lapse - wasReset = c.Update([]string{"user1", "user2", "user3", "user4"}, startTime.Add(time.Hour*25)) + count, wasReset = c.Update([]string{"user1", "user2", "user3", "user4"}, startTime.Add(time.Hour*25)) assert.True(t, wasReset) - assert.EqualValues(t, 4, c.Count()) + assert.EqualValues(t, 4, count) // Update the counter with a new user after lapse - wasReset = c.Update([]string{"user4"}, startTime.Add(time.Hour*25*2)) + count, wasReset = c.Update([]string{"user4"}, startTime.Add(time.Hour*25*2)) assert.True(t, wasReset) - assert.EqualValues(t, 1, c.Count()) + assert.EqualValues(t, 1, count) } diff --git a/internal/zero/analytics/collector.go b/internal/zero/analytics/collector.go index 0ad324df4..051985a3d 100644 --- a/internal/zero/analytics/collector.go +++ b/internal/zero/analytics/collector.go @@ -10,26 +10,53 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/databroker" ) -const ( - updateInterval = time.Hour * 6 -) - // Collect collects metrics and reports them to the cloud func Collect( ctx context.Context, client databroker.DataBrokerServiceClient, + updateInterval time.Duration, ) error { c := &collector{ - client: client, - counters: make(map[string]*ActiveUsersCounter), + client: client, + counters: make(map[string]*ActiveUsersCounter), + updateInterval: updateInterval, } + leaser := databroker.NewLeaser("pomerium-zero-analytics", c.leaseTTL(), c) + return leaser.Run(ctx) +} + +type collector struct { + client databroker.DataBrokerServiceClient + counters map[string]*ActiveUsersCounter + updateInterval time.Duration +} + +func (c *collector) RunLeased(ctx context.Context) error { + err := c.loadCounters(ctx) + if err != nil { + return fmt.Errorf("failed to load counters: %w", err) + } + + err = c.runPeriodicUpdate(ctx) + if err != nil { + return fmt.Errorf("failed to run periodic update: %w", err) + } + + return nil +} + +func (c *collector) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient { + return c.client +} + +func (c *collector) loadCounters(ctx context.Context) error { now := time.Now() for key, resetFn := range map[string]IntervalResetFunc{ "mau": ResetMonthlyUTC, "dau": ResetDailyUTC, } { - state, err := LoadMetricState(ctx, client, key) + state, err := LoadMetricState(ctx, c.client, key) if err != nil && !databroker.IsNotFound(err) { return err } @@ -46,16 +73,11 @@ func Collect( c.counters[key] = counter } - return c.run(ctx, updateInterval) + return nil } -type collector struct { - client databroker.DataBrokerServiceClient - counters map[string]*ActiveUsersCounter -} - -func (c *collector) run(ctx context.Context, interval time.Duration) error { - ticker := time.NewTicker(interval) +func (c *collector) runPeriodicUpdate(ctx context.Context) error { + ticker := time.NewTicker(c.updateInterval) defer ticker.Stop() for { @@ -78,8 +100,9 @@ func (c *collector) update(ctx context.Context) error { now := time.Now() for key, counter := range c.counters { - updated := counter.Update(users, now) - if !updated { + before := counter.Count() + after, _ := counter.Update(users, now) + if before == after { log.Ctx(ctx).Debug().Msgf("metric %s not changed: %d", key, counter.Count()) continue } @@ -90,7 +113,7 @@ func (c *collector) update(ctx context.Context) error { return fmt.Errorf("failed to marshal metric %s: %w", key, err) } - err = SaveMetricState(ctx, c.client, key, data, counter.GetLastReset()) + err = SaveMetricState(ctx, c.client, key, data, after, counter.GetLastReset()) if err != nil { return fmt.Errorf("failed to save metric %s: %w", key, err) } @@ -98,3 +121,11 @@ func (c *collector) update(ctx context.Context) error { return nil } + +func (c *collector) leaseTTL() time.Duration { + const defaultTTL = time.Minute * 5 + if defaultTTL < c.updateInterval { + return defaultTTL + } + return c.updateInterval +} diff --git a/internal/zero/analytics/sessions.go b/internal/zero/analytics/sessions.go index 276898236..d7bb7ff53 100644 --- a/internal/zero/analytics/sessions.go +++ b/internal/zero/analytics/sessions.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/pomerium/pomerium/internal/sets" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/protoutil" @@ -26,7 +27,7 @@ func CurrentUsers( return nil, fmt.Errorf("fetching sessions: %w", err) } - var users []string + users := sets.NewHash[string]() utcNow := time.Now().UTC() threshold := time.Date(utcNow.Year(), utcNow.Month(), utcNow.Day(), 0, 0, 0, 0, time.UTC) @@ -45,8 +46,8 @@ func CurrentUsers( if s.AccessedAt.AsTime().Before(threshold) { continue } - users = append(users, s.UserId) + users.Add(s.UserId) } - return users, nil + return users.Items(), nil } diff --git a/internal/zero/analytics/storage.go b/internal/zero/analytics/storage.go index a6d0e0e7b..617df6426 100644 --- a/internal/zero/analytics/storage.go +++ b/internal/zero/analytics/storage.go @@ -23,6 +23,7 @@ func SaveMetricState( client databroker.DataBrokerServiceClient, id string, data []byte, + value uint, lastReset time.Time, ) error { _, err := client.Put(ctx, &databroker.PutRequest{ @@ -32,6 +33,7 @@ func SaveMetricState( Data: (&MetricState{ Data: data, LastReset: lastReset, + Count: value, }).ToAny(), }}, }) @@ -63,9 +65,11 @@ func LoadMetricState( type MetricState struct { Data []byte LastReset time.Time + Count uint } const ( + countKey = "count" dataKey = "data" lastResetKey = "last_reset" ) @@ -74,6 +78,7 @@ const ( func (r *MetricState) ToAny() *anypb.Any { return protoutil.NewAny(&structpb.Struct{ Fields: map[string]*structpb.Value{ + countKey: structpb.NewNumberValue(float64(r.Count)), dataKey: structpb.NewStringValue(base64.StdEncoding.EncodeToString(r.Data)), lastResetKey: structpb.NewStringValue(r.LastReset.Format(time.RFC3339)), }, @@ -108,9 +113,14 @@ func (r *MetricState) FromAny(any *anypb.Any) error { if err != nil { return fmt.Errorf("parse last reset: %w", err) } + vCount, ok := s.GetFields()[countKey] + if !ok { + return fmt.Errorf("missing %s field", countKey) + } r.Data = data r.LastReset = lastReset + r.Count = uint(vCount.GetNumberValue()) return nil } diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index cc1c8391a..dc4a9fde1 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" @@ -44,7 +45,7 @@ func Run(ctx context.Context, opts ...Option) error { eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) }) eg.Go(func() error { return run(ctx, "zero-reconciler", c.runReconciler, src.WaitReady) }) eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) }) - eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics(), src.WaitReady) }) + eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics, src.WaitReady) }) return eg.Wait() } @@ -120,22 +121,12 @@ func (c *controller) runReconciler(ctx context.Context) error { ) } -func (c *controller) runAnalytics() func(ctx context.Context) error { - disable := false - return func(ctx context.Context) error { - if disable { - log.Ctx(ctx).Info().Msg("analytics disabled due to previous error") - <-ctx.Done() - return nil - } - - err := analytics.Collect(ctx, c.GetDataBrokerServiceClient()) - if err != nil && ctx.Err() == nil { - disable = true - log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling") - return nil - } - - return err +func (c *controller) runAnalytics(ctx context.Context) error { + err := analytics.Collect(ctx, c.GetDataBrokerServiceClient(), time.Second*30) + if err != nil && ctx.Err() == nil { + log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling") + return nil } + + return err }