This commit is contained in:
Denis Mishin 2023-12-09 22:24:23 -05:00
parent 9399139757
commit fd24e0e7be
7 changed files with 93 additions and 49 deletions

View file

@ -31,3 +31,12 @@ func (s *Hash[T]) Has(element T) bool {
func (s *Hash[T]) Size() int { func (s *Hash[T]) Size() int {
return len(s.m) return len(s.m)
} }
// Items returns the set's elements as a slice.
func (s *Hash[T]) Items() []T {
items := make([]T, 0, len(s.m))
for item := range s.m {
items = append(items, item)
}
return items
}

View file

@ -7,6 +7,8 @@ import (
) )
const ( const (
// activeUsersCap is the number of active users we would be able to track.
// for counter to work within the 1% error limit, actual number should be 80% of the cap.
activeUsersCap = 10_000 activeUsersCap = 10_000
) )
@ -60,7 +62,7 @@ func LoadActiveUsersCounter(state []byte, lastReset time.Time, resetFn IntervalR
} }
// Update updates the counter with the current users // Update updates the counter with the current users
func (c *ActiveUsersCounter) Update(users []string, now time.Time) (wasReset bool) { func (c *ActiveUsersCounter) Update(users []string, now time.Time) (current uint, wasReset bool) {
if c.needsReset(c.lastReset, now) { if c.needsReset(c.lastReset, now) {
c.Counter.Reset() c.Counter.Reset()
c.lastReset = now c.lastReset = now
@ -69,7 +71,7 @@ func (c *ActiveUsersCounter) Update(users []string, now time.Time) (wasReset boo
for _, user := range users { for _, user := range users {
c.Mark(user) c.Mark(user)
} }
return wasReset return c.Count(), wasReset
} }
// GetLastReset returns the last time the counter was reset // GetLastReset returns the last time the counter was reset

View file

@ -17,21 +17,21 @@ func TestActiveUsers(t *testing.T) {
// Create a new counter that resets on a daily interval // Create a new counter that resets on a daily interval
c := analytics.NewActiveUsersCounter(analytics.ResetDailyUTC, startTime) c := analytics.NewActiveUsersCounter(analytics.ResetDailyUTC, startTime)
wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute)) count, wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute))
assert.False(t, wasReset) assert.False(t, wasReset)
assert.EqualValues(t, 2, c.Count()) assert.EqualValues(t, 2, count)
wasReset = c.Update([]string{"user1", "user2", "user3"}, startTime.Add(time.Minute*2)) count, wasReset = c.Update([]string{"user1", "user2", "user3"}, startTime.Add(time.Minute*2))
assert.False(t, wasReset) assert.False(t, wasReset)
assert.EqualValues(t, 3, c.Count()) assert.EqualValues(t, 3, count)
// Update the counter with a new user after lapse // Update the counter with a new user after lapse
wasReset = c.Update([]string{"user1", "user2", "user3", "user4"}, startTime.Add(time.Hour*25)) count, wasReset = c.Update([]string{"user1", "user2", "user3", "user4"}, startTime.Add(time.Hour*25))
assert.True(t, wasReset) assert.True(t, wasReset)
assert.EqualValues(t, 4, c.Count()) assert.EqualValues(t, 4, count)
// Update the counter with a new user after lapse // Update the counter with a new user after lapse
wasReset = c.Update([]string{"user4"}, startTime.Add(time.Hour*25*2)) count, wasReset = c.Update([]string{"user4"}, startTime.Add(time.Hour*25*2))
assert.True(t, wasReset) assert.True(t, wasReset)
assert.EqualValues(t, 1, c.Count()) assert.EqualValues(t, 1, count)
} }

View file

@ -10,26 +10,53 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
) )
const (
updateInterval = time.Hour * 6
)
// Collect collects metrics and reports them to the cloud // Collect collects metrics and reports them to the cloud
func Collect( func Collect(
ctx context.Context, ctx context.Context,
client databroker.DataBrokerServiceClient, client databroker.DataBrokerServiceClient,
updateInterval time.Duration,
) error { ) error {
c := &collector{ c := &collector{
client: client, client: client,
counters: make(map[string]*ActiveUsersCounter), counters: make(map[string]*ActiveUsersCounter),
updateInterval: updateInterval,
} }
leaser := databroker.NewLeaser("pomerium-zero-analytics", c.leaseTTL(), c)
return leaser.Run(ctx)
}
type collector struct {
client databroker.DataBrokerServiceClient
counters map[string]*ActiveUsersCounter
updateInterval time.Duration
}
func (c *collector) RunLeased(ctx context.Context) error {
err := c.loadCounters(ctx)
if err != nil {
return fmt.Errorf("failed to load counters: %w", err)
}
err = c.runPeriodicUpdate(ctx)
if err != nil {
return fmt.Errorf("failed to run periodic update: %w", err)
}
return nil
}
func (c *collector) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
return c.client
}
func (c *collector) loadCounters(ctx context.Context) error {
now := time.Now() now := time.Now()
for key, resetFn := range map[string]IntervalResetFunc{ for key, resetFn := range map[string]IntervalResetFunc{
"mau": ResetMonthlyUTC, "mau": ResetMonthlyUTC,
"dau": ResetDailyUTC, "dau": ResetDailyUTC,
} { } {
state, err := LoadMetricState(ctx, client, key) state, err := LoadMetricState(ctx, c.client, key)
if err != nil && !databroker.IsNotFound(err) { if err != nil && !databroker.IsNotFound(err) {
return err return err
} }
@ -46,16 +73,11 @@ func Collect(
c.counters[key] = counter c.counters[key] = counter
} }
return c.run(ctx, updateInterval) return nil
} }
type collector struct { func (c *collector) runPeriodicUpdate(ctx context.Context) error {
client databroker.DataBrokerServiceClient ticker := time.NewTicker(c.updateInterval)
counters map[string]*ActiveUsersCounter
}
func (c *collector) run(ctx context.Context, interval time.Duration) error {
ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -78,8 +100,9 @@ func (c *collector) update(ctx context.Context) error {
now := time.Now() now := time.Now()
for key, counter := range c.counters { for key, counter := range c.counters {
updated := counter.Update(users, now) before := counter.Count()
if !updated { after, _ := counter.Update(users, now)
if before == after {
log.Ctx(ctx).Debug().Msgf("metric %s not changed: %d", key, counter.Count()) log.Ctx(ctx).Debug().Msgf("metric %s not changed: %d", key, counter.Count())
continue continue
} }
@ -90,7 +113,7 @@ func (c *collector) update(ctx context.Context) error {
return fmt.Errorf("failed to marshal metric %s: %w", key, err) return fmt.Errorf("failed to marshal metric %s: %w", key, err)
} }
err = SaveMetricState(ctx, c.client, key, data, counter.GetLastReset()) err = SaveMetricState(ctx, c.client, key, data, after, counter.GetLastReset())
if err != nil { if err != nil {
return fmt.Errorf("failed to save metric %s: %w", key, err) return fmt.Errorf("failed to save metric %s: %w", key, err)
} }
@ -98,3 +121,11 @@ func (c *collector) update(ctx context.Context) error {
return nil return nil
} }
func (c *collector) leaseTTL() time.Duration {
const defaultTTL = time.Minute * 5
if defaultTTL < c.updateInterval {
return defaultTTL
}
return c.updateInterval
}

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/pomerium/pomerium/internal/sets"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/protoutil" "github.com/pomerium/pomerium/pkg/protoutil"
@ -26,7 +27,7 @@ func CurrentUsers(
return nil, fmt.Errorf("fetching sessions: %w", err) return nil, fmt.Errorf("fetching sessions: %w", err)
} }
var users []string users := sets.NewHash[string]()
utcNow := time.Now().UTC() utcNow := time.Now().UTC()
threshold := time.Date(utcNow.Year(), utcNow.Month(), utcNow.Day(), 0, 0, 0, 0, time.UTC) threshold := time.Date(utcNow.Year(), utcNow.Month(), utcNow.Day(), 0, 0, 0, 0, time.UTC)
@ -45,8 +46,8 @@ func CurrentUsers(
if s.AccessedAt.AsTime().Before(threshold) { if s.AccessedAt.AsTime().Before(threshold) {
continue continue
} }
users = append(users, s.UserId) users.Add(s.UserId)
} }
return users, nil return users.Items(), nil
} }

View file

@ -23,6 +23,7 @@ func SaveMetricState(
client databroker.DataBrokerServiceClient, client databroker.DataBrokerServiceClient,
id string, id string,
data []byte, data []byte,
value uint,
lastReset time.Time, lastReset time.Time,
) error { ) error {
_, err := client.Put(ctx, &databroker.PutRequest{ _, err := client.Put(ctx, &databroker.PutRequest{
@ -32,6 +33,7 @@ func SaveMetricState(
Data: (&MetricState{ Data: (&MetricState{
Data: data, Data: data,
LastReset: lastReset, LastReset: lastReset,
Count: value,
}).ToAny(), }).ToAny(),
}}, }},
}) })
@ -63,9 +65,11 @@ func LoadMetricState(
type MetricState struct { type MetricState struct {
Data []byte Data []byte
LastReset time.Time LastReset time.Time
Count uint
} }
const ( const (
countKey = "count"
dataKey = "data" dataKey = "data"
lastResetKey = "last_reset" lastResetKey = "last_reset"
) )
@ -74,6 +78,7 @@ const (
func (r *MetricState) ToAny() *anypb.Any { func (r *MetricState) ToAny() *anypb.Any {
return protoutil.NewAny(&structpb.Struct{ return protoutil.NewAny(&structpb.Struct{
Fields: map[string]*structpb.Value{ Fields: map[string]*structpb.Value{
countKey: structpb.NewNumberValue(float64(r.Count)),
dataKey: structpb.NewStringValue(base64.StdEncoding.EncodeToString(r.Data)), dataKey: structpb.NewStringValue(base64.StdEncoding.EncodeToString(r.Data)),
lastResetKey: structpb.NewStringValue(r.LastReset.Format(time.RFC3339)), lastResetKey: structpb.NewStringValue(r.LastReset.Format(time.RFC3339)),
}, },
@ -108,9 +113,14 @@ func (r *MetricState) FromAny(any *anypb.Any) error {
if err != nil { if err != nil {
return fmt.Errorf("parse last reset: %w", err) return fmt.Errorf("parse last reset: %w", err)
} }
vCount, ok := s.GetFields()[countKey]
if !ok {
return fmt.Errorf("missing %s field", countKey)
}
r.Data = data r.Data = data
r.LastReset = lastReset r.LastReset = lastReset
r.Count = uint(vCount.GetNumberValue())
return nil return nil
} }

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -44,7 +45,7 @@ func Run(ctx context.Context, opts ...Option) error {
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) }) eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) })
eg.Go(func() error { return run(ctx, "zero-reconciler", c.runReconciler, src.WaitReady) }) eg.Go(func() error { return run(ctx, "zero-reconciler", c.runReconciler, src.WaitReady) })
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) }) eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics(), src.WaitReady) }) eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics, src.WaitReady) })
return eg.Wait() return eg.Wait()
} }
@ -120,22 +121,12 @@ func (c *controller) runReconciler(ctx context.Context) error {
) )
} }
func (c *controller) runAnalytics() func(ctx context.Context) error { func (c *controller) runAnalytics(ctx context.Context) error {
disable := false err := analytics.Collect(ctx, c.GetDataBrokerServiceClient(), time.Second*30)
return func(ctx context.Context) error { if err != nil && ctx.Err() == nil {
if disable { log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling")
log.Ctx(ctx).Info().Msg("analytics disabled due to previous error") return nil
<-ctx.Done()
return nil
}
err := analytics.Collect(ctx, c.GetDataBrokerServiceClient())
if err != nil && ctx.Err() == nil {
disable = true
log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling")
return nil
}
return err
} }
return err
} }