pomerium/internal/zero/telemetry/sessions/producer.go

85 lines
1.8 KiB
Go

package sessions
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/slices"
)
type Producer struct {
scope instrumentation.Scope
clientProvider func() (databroker.DataBrokerServiceClient, error)
}
func NewProducer(
scope instrumentation.Scope,
clientProvider func() (databroker.DataBrokerServiceClient, error),
) *Producer {
return &Producer{
clientProvider: clientProvider,
scope: scope,
}
}
func (p *Producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
client, err := p.clientProvider()
if err != nil {
return nil, fmt.Errorf("error getting client: %w", err)
}
now := time.Now()
ids := []string{"dau", "mau"}
metrics := make([]metricdata.Metrics, len(ids))
eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < len(ids); i++ {
i := i
eg.Go(func() error {
state, err := LoadMetricState(ctx, client, ids[i])
if err != nil {
if status.Code(err) == codes.NotFound {
return nil
}
return err
}
metrics[i] = metricdata.Metrics{
Name: ids[i],
Unit: "unique users",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Time: now,
Value: int64(state.Count),
},
},
},
}
return nil
})
}
err = eg.Wait()
if err != nil {
return nil, err
}
metrics = slices.Filter(metrics, func(v metricdata.Metrics) bool { return v.Name != "" })
if len(metrics) == 0 {
return nil, nil
}
return []metricdata.ScopeMetrics{
{
Scope: p.scope,
Metrics: metrics,
},
}, nil
}