pomerium/internal/zero/telemetry/sessions/collector.go
Denis Mishin 114f730dba
zero: refactor telemetry and controller (#5135)
* zero: refactor controller

* refactor zero telemetry and controller

* wire with connect handler

* cr
2024-06-12 21:59:25 -04:00

118 lines
2.8 KiB
Go

// Package analytics collects active user metrics and reports them to the cloud dashboard
package sessions
import (
"context"
"fmt"
"time"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
// Collect collects metrics and stores them in the databroker
func Collect(
ctx context.Context,
client databroker.DataBrokerServiceClient,
updateInterval time.Duration,
) error {
c := &collector{
client: client,
counters: make(map[string]*ActiveUsersCounter),
updateInterval: updateInterval,
}
return c.run(ctx)
}
type collector struct {
client databroker.DataBrokerServiceClient
counters map[string]*ActiveUsersCounter
updateInterval time.Duration
}
func (c *collector) run(ctx context.Context) error {
err := c.loadCounters(ctx)
if err != nil {
return fmt.Errorf("failed to load counters: %w", err)
}
err = c.runPeriodicUpdate(ctx)
if err != nil {
return fmt.Errorf("failed to run periodic update: %w", err)
}
return nil
}
func (c *collector) loadCounters(ctx context.Context) error {
now := time.Now()
for key, resetFn := range map[string]IntervalResetFunc{
"mau": ResetMonthlyUTC,
"dau": ResetDailyUTC,
} {
state, err := LoadMetricState(ctx, c.client, key)
if err != nil && !databroker.IsNotFound(err) {
return err
}
if state == nil {
c.counters[key] = NewActiveUsersCounter(resetFn, now)
continue
}
counter, err := LoadActiveUsersCounter(state.Data, state.LastReset, resetFn)
if err != nil {
log.Ctx(ctx).Error().Err(err).Str("metric", key).Msg("failed to load metric state, resetting")
counter = NewActiveUsersCounter(resetFn, now)
}
c.counters[key] = counter
}
return nil
}
func (c *collector) runPeriodicUpdate(ctx context.Context) error {
ticker := time.NewTicker(c.updateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
if err := c.update(ctx); err != nil {
return err
}
}
}
}
func (c *collector) update(ctx context.Context) error {
users, err := CurrentUsers(ctx, c.client)
if err != nil {
return fmt.Errorf("failed to get current users: %w", err)
}
now := time.Now()
for key, counter := range c.counters {
before := counter.Count()
after, _ := counter.Update(users, now)
if before == after {
log.Ctx(ctx).Debug().Msgf("metric %s not changed: %d", key, counter.Count())
continue
}
log.Ctx(ctx).Debug().Msgf("metric %s updated: %d", key, counter.Count())
data, err := counter.ToBinary()
if err != nil {
return fmt.Errorf("failed to marshal metric %s: %w", key, err)
}
err = SaveMetricState(ctx, c.client, key, data, after, counter.GetLastReset())
if err != nil {
return fmt.Errorf("failed to save metric %s: %w", key, err)
}
}
return nil
}