mirror of
https://github.com/pomerium/pomerium.git
synced 2025-04-30 10:56:28 +02:00
* zero: refactor controller * refactor zero telemetry and controller * wire with connect handler * cr
118 lines
2.8 KiB
Go
118 lines
2.8 KiB
Go
// Package analytics collects active user metrics and reports them to the cloud dashboard
|
|
package sessions
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/pomerium/pomerium/internal/log"
|
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
|
)
|
|
|
|
// Collect collects metrics and stores them in the databroker
|
|
func Collect(
|
|
ctx context.Context,
|
|
client databroker.DataBrokerServiceClient,
|
|
updateInterval time.Duration,
|
|
) error {
|
|
c := &collector{
|
|
client: client,
|
|
counters: make(map[string]*ActiveUsersCounter),
|
|
updateInterval: updateInterval,
|
|
}
|
|
|
|
return c.run(ctx)
|
|
}
|
|
|
|
type collector struct {
|
|
client databroker.DataBrokerServiceClient
|
|
counters map[string]*ActiveUsersCounter
|
|
updateInterval time.Duration
|
|
}
|
|
|
|
func (c *collector) run(ctx context.Context) error {
|
|
err := c.loadCounters(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load counters: %w", err)
|
|
}
|
|
|
|
err = c.runPeriodicUpdate(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to run periodic update: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *collector) loadCounters(ctx context.Context) error {
|
|
now := time.Now()
|
|
for key, resetFn := range map[string]IntervalResetFunc{
|
|
"mau": ResetMonthlyUTC,
|
|
"dau": ResetDailyUTC,
|
|
} {
|
|
state, err := LoadMetricState(ctx, c.client, key)
|
|
if err != nil && !databroker.IsNotFound(err) {
|
|
return err
|
|
}
|
|
if state == nil {
|
|
c.counters[key] = NewActiveUsersCounter(resetFn, now)
|
|
continue
|
|
}
|
|
|
|
counter, err := LoadActiveUsersCounter(state.Data, state.LastReset, resetFn)
|
|
if err != nil {
|
|
log.Ctx(ctx).Error().Err(err).Str("metric", key).Msg("failed to load metric state, resetting")
|
|
counter = NewActiveUsersCounter(resetFn, now)
|
|
}
|
|
c.counters[key] = counter
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *collector) runPeriodicUpdate(ctx context.Context) error {
|
|
ticker := time.NewTicker(c.updateInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
if err := c.update(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *collector) update(ctx context.Context) error {
|
|
users, err := CurrentUsers(ctx, c.client)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get current users: %w", err)
|
|
}
|
|
|
|
now := time.Now()
|
|
for key, counter := range c.counters {
|
|
before := counter.Count()
|
|
after, _ := counter.Update(users, now)
|
|
if before == after {
|
|
log.Ctx(ctx).Debug().Msgf("metric %s not changed: %d", key, counter.Count())
|
|
continue
|
|
}
|
|
log.Ctx(ctx).Debug().Msgf("metric %s updated: %d", key, counter.Count())
|
|
|
|
data, err := counter.ToBinary()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal metric %s: %w", key, err)
|
|
}
|
|
|
|
err = SaveMetricState(ctx, c.client, key, data, after, counter.GetLastReset())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save metric %s: %w", key, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|