mirror of
https://github.com/pomerium/pomerium.git
synced 2025-07-28 14:08:43 +02:00
zero/telemetry: collect DAU and MAU
This commit is contained in:
parent
2071140205
commit
8301f5541b
8 changed files with 441 additions and 0 deletions
78
internal/zero/analytics/activeusers.go
Normal file
78
internal/zero/analytics/activeusers.go
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
package analytics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/pkg/counter"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
activeUsersCap = 10_000
|
||||||
|
)
|
||||||
|
|
||||||
|
// IntervalResetFunc is a function that determines if a counter should be reset
|
||||||
|
type IntervalResetFunc func(lastReset time.Time, now time.Time) bool
|
||||||
|
|
||||||
|
// ResetMonthlyUTC resets the counter on a monthly interval
|
||||||
|
func ResetMonthlyUTC(lastReset time.Time, now time.Time) bool {
|
||||||
|
lastResetUTC := lastReset.UTC()
|
||||||
|
nowUTC := now.UTC()
|
||||||
|
return lastResetUTC.Year() != nowUTC.Year() ||
|
||||||
|
lastResetUTC.Month() != nowUTC.Month()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetDailyUTC resets the counter on a daily interval
|
||||||
|
func ResetDailyUTC(lastReset time.Time, now time.Time) bool {
|
||||||
|
lastResetUTC := lastReset.UTC()
|
||||||
|
nowUTC := now.UTC()
|
||||||
|
return lastResetUTC.Year() != nowUTC.Year() ||
|
||||||
|
lastResetUTC.Month() != nowUTC.Month() ||
|
||||||
|
lastResetUTC.Day() != nowUTC.Day()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ActiveUsersCounter is a counter that resets on a given interval
|
||||||
|
type ActiveUsersCounter struct {
|
||||||
|
*counter.Counter
|
||||||
|
lastReset time.Time
|
||||||
|
needsReset IntervalResetFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewActiveUsersCounter creates a new active users counter
|
||||||
|
func NewActiveUsersCounter(needsReset IntervalResetFunc, now time.Time) *ActiveUsersCounter {
|
||||||
|
return &ActiveUsersCounter{
|
||||||
|
Counter: counter.New(activeUsersCap),
|
||||||
|
lastReset: now,
|
||||||
|
needsReset: needsReset,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadActiveUsersCounter loads an active users counter from a binary state
|
||||||
|
func LoadActiveUsersCounter(state []byte, lastReset time.Time, resetFn IntervalResetFunc) (*ActiveUsersCounter, error) {
|
||||||
|
c, err := counter.FromBinary(state)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &ActiveUsersCounter{
|
||||||
|
Counter: c,
|
||||||
|
lastReset: lastReset,
|
||||||
|
needsReset: resetFn,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update updates the counter with the current users
|
||||||
|
func (c *ActiveUsersCounter) Update(users []string, now time.Time) (wasReset bool) {
|
||||||
|
if c.needsReset(c.lastReset, now) {
|
||||||
|
c.Counter.Reset()
|
||||||
|
c.lastReset = now
|
||||||
|
wasReset = true
|
||||||
|
}
|
||||||
|
for _, user := range users {
|
||||||
|
c.Mark(user)
|
||||||
|
}
|
||||||
|
return wasReset
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastReset returns the last time the counter was reset
|
||||||
|
func (c *ActiveUsersCounter) GetLastReset() time.Time {
|
||||||
|
return c.lastReset
|
||||||
|
}
|
37
internal/zero/analytics/activeusers_test.go
Normal file
37
internal/zero/analytics/activeusers_test.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package analytics_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/zero/analytics"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestActiveUsers(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
startTime := time.Now().UTC()
|
||||||
|
|
||||||
|
// Create a new counter that resets on a daily interval
|
||||||
|
c := analytics.NewActiveUsersCounter(analytics.ResetDailyUTC, startTime)
|
||||||
|
|
||||||
|
wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute))
|
||||||
|
assert.False(t, wasReset)
|
||||||
|
assert.EqualValues(t, 2, c.Count())
|
||||||
|
|
||||||
|
wasReset = c.Update([]string{"user1", "user2", "user3"}, startTime.Add(time.Minute*2))
|
||||||
|
assert.False(t, wasReset)
|
||||||
|
assert.EqualValues(t, 3, c.Count())
|
||||||
|
|
||||||
|
// Update the counter with a new user after lapse
|
||||||
|
wasReset = c.Update([]string{"user1", "user2", "user3", "user4"}, startTime.Add(time.Hour*25))
|
||||||
|
assert.True(t, wasReset)
|
||||||
|
assert.EqualValues(t, 4, c.Count())
|
||||||
|
|
||||||
|
// Update the counter with a new user after lapse
|
||||||
|
wasReset = c.Update([]string{"user4"}, startTime.Add(time.Hour*25*2))
|
||||||
|
assert.True(t, wasReset)
|
||||||
|
assert.EqualValues(t, 1, c.Count())
|
||||||
|
}
|
100
internal/zero/analytics/collector.go
Normal file
100
internal/zero/analytics/collector.go
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
// Package analytics collects active user metrics and reports them to the cloud dashboard
|
||||||
|
package analytics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
updateInterval = time.Second * 30
|
||||||
|
)
|
||||||
|
|
||||||
|
// Collect collects metrics and reports them to the cloud
|
||||||
|
func Collect(
|
||||||
|
ctx context.Context,
|
||||||
|
client databroker.DataBrokerServiceClient,
|
||||||
|
) error {
|
||||||
|
c := &collector{
|
||||||
|
client: client,
|
||||||
|
counters: make(map[string]*ActiveUsersCounter),
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
for key, resetFn := range map[string]IntervalResetFunc{
|
||||||
|
"mau": ResetMonthlyUTC,
|
||||||
|
"dau": ResetDailyUTC,
|
||||||
|
} {
|
||||||
|
state, err := LoadMetricState(ctx, client, key)
|
||||||
|
if err != nil && !databroker.IsNotFound(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if state == nil {
|
||||||
|
c.counters[key] = NewActiveUsersCounter(resetFn, now)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
counter, err := LoadActiveUsersCounter(state.Data, state.LastReset, resetFn)
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Error().Err(err).Str("metric", key).Msg("failed to load metric state, resetting")
|
||||||
|
counter = NewActiveUsersCounter(resetFn, now)
|
||||||
|
}
|
||||||
|
c.counters[key] = counter
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.run(ctx, updateInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
type collector struct {
|
||||||
|
client databroker.DataBrokerServiceClient
|
||||||
|
counters map[string]*ActiveUsersCounter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *collector) run(ctx context.Context, interval time.Duration) error {
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-ticker.C:
|
||||||
|
if err := c.update(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *collector) update(ctx context.Context) error {
|
||||||
|
users, err := CurrentUsers(ctx, c.client)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get current users: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
for key, counter := range c.counters {
|
||||||
|
updated := counter.Update(users, now)
|
||||||
|
if !updated {
|
||||||
|
log.Ctx(ctx).Debug().Msgf("metric %s not changed: %d", key, counter.Count())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Ctx(ctx).Debug().Msgf("metric %s updated: %d", key, counter.Count())
|
||||||
|
|
||||||
|
data, err := counter.ToBinary()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal metric %s: %w", key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = SaveMetricState(ctx, c.client, key, data, counter.GetLastReset())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to save metric %s: %w", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
52
internal/zero/analytics/sessions.go
Normal file
52
internal/zero/analytics/sessions.go
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
package analytics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/session"
|
||||||
|
"github.com/pomerium/pomerium/pkg/protoutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
sessionTypeURL = protoutil.GetTypeURL(new(session.Session))
|
||||||
|
)
|
||||||
|
|
||||||
|
// CurrentUsers returns a list of currently active user IDs from the databroker
|
||||||
|
func CurrentUsers(
|
||||||
|
ctx context.Context,
|
||||||
|
client databroker.DataBrokerServiceClient,
|
||||||
|
) ([]string, error) {
|
||||||
|
records, _, _, err := databroker.InitialSync(ctx, client, &databroker.SyncLatestRequest{
|
||||||
|
Type: sessionTypeURL,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("fetching sessions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var users []string
|
||||||
|
utcNow := time.Now().UTC()
|
||||||
|
threshold := time.Date(utcNow.Year(), utcNow.Month(), utcNow.Day(), utcNow.Hour(), 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
for _, record := range records {
|
||||||
|
var s session.Session
|
||||||
|
err := record.GetData().UnmarshalTo(&s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshaling session: %w", err)
|
||||||
|
}
|
||||||
|
if s.UserId == "" { // session creation is in progress
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if s.AccessedAt == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if s.AccessedAt.AsTime().Before(threshold) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
users = append(users, s.UserId)
|
||||||
|
}
|
||||||
|
|
||||||
|
return users, nil
|
||||||
|
}
|
116
internal/zero/analytics/storage.go
Normal file
116
internal/zero/analytics/storage.go
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
package analytics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
|
"google.golang.org/protobuf/types/known/structpb"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
"github.com/pomerium/pomerium/pkg/protoutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
metricStateTypeURL = "pomerium.io/ActiveUsersMetricState"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SaveMetricState saves the state of a metric to the databroker
|
||||||
|
func SaveMetricState(
|
||||||
|
ctx context.Context,
|
||||||
|
client databroker.DataBrokerServiceClient,
|
||||||
|
id string,
|
||||||
|
data []byte,
|
||||||
|
lastReset time.Time,
|
||||||
|
) error {
|
||||||
|
_, err := client.Put(ctx, &databroker.PutRequest{
|
||||||
|
Records: []*databroker.Record{{
|
||||||
|
Type: metricStateTypeURL,
|
||||||
|
Id: id,
|
||||||
|
Data: (&MetricState{
|
||||||
|
Data: data,
|
||||||
|
LastReset: lastReset,
|
||||||
|
}).ToAny(),
|
||||||
|
}},
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadMetricState loads the state of a metric from the databroker
|
||||||
|
func LoadMetricState(
|
||||||
|
ctx context.Context, client databroker.DataBrokerServiceClient, id string,
|
||||||
|
) (*MetricState, error) {
|
||||||
|
resp, err := client.Get(ctx, &databroker.GetRequest{
|
||||||
|
Type: metricStateTypeURL,
|
||||||
|
Id: id,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("load metric state: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var state MetricState
|
||||||
|
err = state.FromAny(resp.GetRecord().GetData())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("load metric state: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &state, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MetricState is the persistent state of a metric
|
||||||
|
type MetricState struct {
|
||||||
|
Data []byte
|
||||||
|
LastReset time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
dataKey = "data"
|
||||||
|
lastResetKey = "last_reset"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ToAny marshals a MetricState into an anypb.Any
|
||||||
|
func (r *MetricState) ToAny() *anypb.Any {
|
||||||
|
return protoutil.NewAny(&structpb.Struct{
|
||||||
|
Fields: map[string]*structpb.Value{
|
||||||
|
dataKey: structpb.NewStringValue(base64.StdEncoding.EncodeToString(r.Data)),
|
||||||
|
lastResetKey: structpb.NewStringValue(r.LastReset.Format(time.RFC3339)),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromAny unmarshals an anypb.Any into a MetricState
|
||||||
|
func (r *MetricState) FromAny(any *anypb.Any) error {
|
||||||
|
var s structpb.Struct
|
||||||
|
err := any.UnmarshalTo(&s)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unmarshal struct: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
vData, ok := s.GetFields()[dataKey]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("missing %s field", dataKey)
|
||||||
|
}
|
||||||
|
data, err := base64.StdEncoding.DecodeString(vData.GetStringValue())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("decode state: %w", err)
|
||||||
|
}
|
||||||
|
if len(data) == 0 {
|
||||||
|
return fmt.Errorf("empty data")
|
||||||
|
}
|
||||||
|
|
||||||
|
vLastReset, ok := s.GetFields()[lastResetKey]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("missing %s field", lastResetKey)
|
||||||
|
}
|
||||||
|
lastReset, err := time.Parse(time.RFC3339, vLastReset.GetStringValue())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse last reset: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Data = data
|
||||||
|
r.LastReset = lastReset
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
29
internal/zero/analytics/storage_test.go
Normal file
29
internal/zero/analytics/storage_test.go
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
package analytics_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/zero/analytics"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStorage(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
now := time.Date(2020, 1, 2, 3, 4, 5, 6, time.UTC)
|
||||||
|
state := &analytics.MetricState{
|
||||||
|
Data: []byte("data"),
|
||||||
|
LastReset: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
pbany := state.ToAny()
|
||||||
|
assert.NotNil(t, pbany)
|
||||||
|
|
||||||
|
var newState analytics.MetricState
|
||||||
|
err := newState.FromAny(pbany)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, state.Data, newState.Data)
|
||||||
|
assert.EqualValues(t, state.LastReset.Truncate(time.Second), newState.LastReset.Truncate(time.Second))
|
||||||
|
}
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/log"
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
|
"github.com/pomerium/pomerium/internal/zero/analytics"
|
||||||
"github.com/pomerium/pomerium/internal/zero/bootstrap"
|
"github.com/pomerium/pomerium/internal/zero/bootstrap"
|
||||||
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
|
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
|
||||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
@ -41,6 +42,7 @@ func Run(ctx context.Context, opts ...Option) error {
|
||||||
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) })
|
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) })
|
||||||
eg.Go(func() error { return run(ctx, "zero-reconciler", c.RunReconciler, src.WaitReady) })
|
eg.Go(func() error { return run(ctx, "zero-reconciler", c.RunReconciler, src.WaitReady) })
|
||||||
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
|
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
|
||||||
|
eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics(), src.WaitReady) })
|
||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,3 +99,23 @@ func (c *controller) runPomeriumCore(ctx context.Context) error {
|
||||||
func (c *controller) runConnect(ctx context.Context) error {
|
func (c *controller) runConnect(ctx context.Context) error {
|
||||||
return c.api.Connect(ctx)
|
return c.api.Connect(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *controller) runAnalytics() func(ctx context.Context) error {
|
||||||
|
disable := false
|
||||||
|
return func(ctx context.Context) error {
|
||||||
|
if disable {
|
||||||
|
log.Ctx(ctx).Info().Msg("analytics disabled due to previous error")
|
||||||
|
<-ctx.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := analytics.Collect(ctx, c.GetDataBrokerServiceClient())
|
||||||
|
if err != nil && ctx.Err() == nil {
|
||||||
|
disable = true
|
||||||
|
log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
status "google.golang.org/grpc/status"
|
||||||
"google.golang.org/protobuf/encoding/protojson"
|
"google.golang.org/protobuf/encoding/protojson"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
structpb "google.golang.org/protobuf/types/known/structpb"
|
structpb "google.golang.org/protobuf/types/known/structpb"
|
||||||
|
@ -33,6 +35,11 @@ func NewRecord(object recordObject) *Record {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsNotFound returns true if the error is a not found error.
|
||||||
|
func IsNotFound(err error) bool {
|
||||||
|
return status.Code(err) == codes.NotFound
|
||||||
|
}
|
||||||
|
|
||||||
// Get gets a record from the databroker and unmarshals it into the object.
|
// Get gets a record from the databroker and unmarshals it into the object.
|
||||||
func Get(ctx context.Context, client DataBrokerServiceClient, object recordObject) error {
|
func Get(ctx context.Context, client DataBrokerServiceClient, object recordObject) error {
|
||||||
res, err := client.Get(ctx, &GetRequest{
|
res, err := client.Get(ctx, &GetRequest{
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue