zero: refactor telemetry and controller (#5135)

* zero: refactor controller

* refactor zero telemetry and controller

* wire with connect handler

* cr
This commit is contained in:
Denis Mishin 2024-06-12 21:59:25 -04:00 committed by GitHub
parent cc636be707
commit 114f730dba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 612 additions and 342 deletions

View file

@ -12,8 +12,6 @@ import (
"github.com/pomerium/pomerium/internal/zero/apierror" "github.com/pomerium/pomerium/internal/zero/apierror"
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/internal/zero/grpcconn" "github.com/pomerium/pomerium/internal/zero/grpcconn"
"github.com/pomerium/pomerium/internal/zero/healthcheck"
metrics_reporter "github.com/pomerium/pomerium/internal/zero/reporter"
token_api "github.com/pomerium/pomerium/internal/zero/token" token_api "github.com/pomerium/pomerium/internal/zero/token"
"github.com/pomerium/pomerium/pkg/fanout" "github.com/pomerium/pomerium/pkg/fanout"
cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster" cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster"
@ -94,16 +92,6 @@ func NewAPI(ctx context.Context, opts ...Option) (*API, error) {
}, nil }, nil
} }
// ReportMetrics runs metrics reporting to the cloud
func (api *API) ReportMetrics(ctx context.Context, opts ...metrics_reporter.Option) error {
return metrics_reporter.Run(ctx, api.telemetryConn, opts...)
}
// ReportHealthChecks runs health check reporting to the cloud
func (api *API) ReportHealthChecks(ctx context.Context) error {
return healthcheck.NewReporter(api.telemetryConn).Run(ctx)
}
// Connect connects to the connect API and allows watching for changes // Connect connects to the connect API and allows watching for changes
func (api *API) Connect(ctx context.Context, opts ...fanout.Option) error { func (api *API) Connect(ctx context.Context, opts ...fanout.Option) error {
return api.mux.Run(ctx, opts...) return api.mux.Run(ctx, opts...)
@ -127,3 +115,7 @@ func (api *API) GetClusterResourceBundles(ctx context.Context) (*cluster_api.Get
api.cluster.GetClusterResourceBundlesWithResponse(ctx), api.cluster.GetClusterResourceBundlesWithResponse(ctx),
) )
} }
func (api *API) GetTelemetryConn() *grpc.ClientConn {
return api.telemetryConn
}

View file

@ -17,6 +17,7 @@ type controllerConfig struct {
reconcilerLeaseDuration time.Duration reconcilerLeaseDuration time.Duration
databrokerRequestTimeout time.Duration databrokerRequestTimeout time.Duration
shutdownTimeout time.Duration
} }
// WithTmpDir sets the temporary directory to use. // WithTmpDir sets the temporary directory to use.
@ -110,6 +111,13 @@ func WithDatabrokerRequestTimeout(timeout time.Duration) Option {
} }
} }
// WithShutdownTimeout sets the timeout for shutting down and cleanup.
func WithShutdownTimeout(timeout time.Duration) Option {
return func(c *controllerConfig) {
c.shutdownTimeout = timeout
}
}
func newControllerConfig(opts ...Option) *controllerConfig { func newControllerConfig(opts ...Option) *controllerConfig {
c := new(controllerConfig) c := new(controllerConfig)
@ -118,6 +126,7 @@ func newControllerConfig(opts ...Option) *controllerConfig {
WithConnectAPIEndpoint("https://connect.pomerium.com"), WithConnectAPIEndpoint("https://connect.pomerium.com"),
WithDatabrokerLeaseDuration(time.Second * 30), WithDatabrokerLeaseDuration(time.Second * 30),
WithDatabrokerRequestTimeout(time.Second * 30), WithDatabrokerRequestTimeout(time.Second * 30),
WithShutdownTimeout(time.Second * 10),
} { } {
opt(c) opt(c)
} }

View file

@ -5,22 +5,20 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/zero/analytics"
sdk "github.com/pomerium/pomerium/internal/zero/api" sdk "github.com/pomerium/pomerium/internal/zero/api"
"github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/pomerium/pomerium/internal/zero/bootstrap/writers" "github.com/pomerium/pomerium/internal/zero/bootstrap/writers"
"github.com/pomerium/pomerium/internal/zero/healthcheck" connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/internal/zero/reconciler" "github.com/pomerium/pomerium/internal/zero/reconciler"
"github.com/pomerium/pomerium/internal/zero/reporter" "github.com/pomerium/pomerium/internal/zero/telemetry/reporter"
"github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/cmd/pomerium"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/zero/connect"
) )
// Run runs Pomerium is managed mode using the provided token. // Run runs Pomerium is managed mode using the provided token.
@ -61,7 +59,6 @@ func Run(ctx context.Context, opts ...Option) error {
eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap) }) eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap) })
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore) }) eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore) })
eg.Go(func() error { return run(ctx, "zero-control-loop", c.runZeroControlLoop) }) eg.Go(func() error { return run(ctx, "zero-control-loop", c.runZeroControlLoop) })
eg.Go(func() error { return run(ctx, "healh-check-reporter", c.runHealthCheckReporter) })
return eg.Wait() return eg.Wait()
} }
@ -70,7 +67,8 @@ type controller struct {
api *sdk.API api *sdk.API
bootstrapConfig *bootstrap.Source bootstrapConfig *bootstrap.Source
telemetryReporter *reporter.Reporter
} }
func (c *controller) initAPI(ctx context.Context) error { func (c *controller) initAPI(ctx context.Context) error {
@ -85,7 +83,6 @@ func (c *controller) initAPI(ctx context.Context) error {
} }
c.api = api c.api = api
return nil return nil
} }
@ -134,14 +131,35 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error {
r := c.NewDatabrokerRestartRunner(ctx) r := c.NewDatabrokerRestartRunner(ctx)
defer r.Close() defer r.Close()
return r.Run(ctx, err = c.initTelemetry(ctx, func() (databroker.DataBrokerServiceClient, error) {
WithLease( client, _, err := r.getDatabrokerClient()
c.runReconcilerLeased, return client, err
c.runAnalyticsLeased, })
c.runMetricsReporterLeased, if err != nil {
c.runHealthChecksLeased, return fmt.Errorf("init telemetry: %w", err)
), }
) defer c.shutdownTelemetry(ctx)
err = c.api.Watch(ctx, connect_mux.WithOnTelemetryRequested(func(ctx context.Context, _ *connect.TelemetryRequest) {
c.telemetryReporter.CollectAndExportMetrics(ctx)
}))
if err != nil {
return fmt.Errorf("watch telemetry: %w", err)
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return r.Run(ctx,
WithLease(
c.runReconcilerLeased,
c.runSessionAnalyticsLeased,
c.enableSessionAnalyticsReporting,
c.runHealthChecksLeased,
),
)
})
eg.Go(func() error { return c.runTelemetryReporter(ctx) })
return eg.Wait()
} }
func (c *controller) runReconcilerLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { func (c *controller) runReconcilerLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error {
@ -154,54 +172,3 @@ func (c *controller) runReconcilerLeased(ctx context.Context, client databroker.
reconciler.WithDataBrokerClient(client), reconciler.WithDataBrokerClient(client),
) )
} }
func (c *controller) runAnalyticsLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-analytics")
})
err := analytics.Collect(ctx, client, time.Hour)
if err != nil && ctx.Err() == nil {
log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling")
return nil
}
return err
}
func (c *controller) runMetricsReporterLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-reporter")
})
return c.api.ReportMetrics(ctx,
reporter.WithCollectInterval(time.Hour),
reporter.WithMetrics(analytics.Metrics(func() databroker.DataBrokerServiceClient { return client })...),
)
}
func (c *controller) runHealthChecksLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-health-checks")
})
return healthcheck.RunChecks(ctx, c.bootstrapConfig, client)
}
func (c *controller) runHealthCheckReporter(ctx context.Context) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-health-check-reporter")
})
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
return backoff.RetryNotify(
func() error {
return c.api.ReportHealthChecks(ctx)
},
backoff.WithContext(bo, ctx),
func(err error, next time.Duration) {
log.Ctx(ctx).Warn().Err(err).Dur("next", next).Msg("health check reporter backoff")
},
)
}

View file

@ -0,0 +1,79 @@
package controller
import (
"context"
"fmt"
"time"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/sdk/instrumentation"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/zero/healthcheck"
"github.com/pomerium/pomerium/internal/zero/telemetry/reporter"
"github.com/pomerium/pomerium/internal/zero/telemetry/sessions"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/health"
)
const (
producerSessionAnalytics = "session-analytics"
)
func (c *controller) initTelemetry(ctx context.Context, clientProvider func() (databroker.DataBrokerServiceClient, error)) error {
sessionMetricProducer := sessions.NewProducer(instrumentation.Scope{}, clientProvider)
r, err := reporter.New(ctx, c.api.GetTelemetryConn(),
reporter.WithProducer(producerSessionAnalytics, sessionMetricProducer),
)
if err != nil {
return fmt.Errorf("error creating telemetry metrics reporter: %w", err)
}
c.telemetryReporter = r
return nil
}
func (c *controller) shutdownTelemetry(ctx context.Context) {
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), c.cfg.shutdownTimeout)
defer cancel()
err := c.telemetryReporter.Shutdown(ctx)
if err != nil {
log.Warn(ctx).Err(err).Msg("telemetry reporter shutdown error")
}
}
func (c *controller) runSessionAnalyticsLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-analytics")
})
return sessions.Collect(ctx, client, time.Hour)
}
// those metrics are cluster-wide, so we only enable their reporting when we have the lease
func (c *controller) enableSessionAnalyticsReporting(ctx context.Context, _ databroker.DataBrokerServiceClient) error {
_ = c.telemetryReporter.SetMetricProducerEnabled(producerSessionAnalytics, true)
defer func() { _ = c.telemetryReporter.SetMetricProducerEnabled(producerSessionAnalytics, false) }()
<-ctx.Done()
return nil
}
func (c *controller) runHealthChecksLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-health-checks")
})
return healthcheck.RunChecks(ctx, c.bootstrapConfig, client)
}
func (c *controller) runTelemetryReporter(ctx context.Context) error {
health.SetProvider(c.telemetryReporter)
defer health.SetProvider(nil)
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-bootstrap")
})
return c.telemetryReporter.Run(ctx)
}

View file

@ -1,126 +0,0 @@
package healthcheck
import (
"context"
"fmt"
"os"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
trace_sdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/version"
"github.com/pomerium/pomerium/pkg/health"
)
type Provider struct {
exporter *otlptrace.Exporter
tracer trace.Tracer
}
var _ health.Provider = (*Provider)(nil)
const (
shutdownTimeout = 30 * time.Second
serviceName = "pomerium-managed-core"
)
// NewReporter creates a new unstarted health check reporter
func NewReporter(
conn *grpc.ClientConn,
) *Provider {
p := new(Provider)
p.init(conn)
return p
}
func (p *Provider) Run(ctx context.Context) error {
health.SetProvider(p)
defer health.SetProvider(nil)
// we want the exporter
xc, cancel := context.WithCancel(context.WithoutCancel(ctx))
defer cancel()
err := p.exporter.Start(xc)
if err != nil {
// this should not happen for the gRPC exporter as its non-blocking
return fmt.Errorf("error starting health check exporter: %w", err)
}
<-ctx.Done()
return p.shutdown(xc)
}
func (p *Provider) shutdown(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
return p.exporter.Shutdown(ctx)
}
func (p *Provider) init(conn *grpc.ClientConn) {
p.initExporter(conn)
p.initTracer()
}
func (p *Provider) initExporter(conn *grpc.ClientConn) {
p.exporter = export_grpc.NewUnstarted(export_grpc.WithGRPCConn(conn))
}
func (p *Provider) initTracer() {
processor := trace_sdk.NewBatchSpanProcessor(p.exporter)
provider := trace_sdk.NewTracerProvider(
trace_sdk.WithResource(p.getResource()),
trace_sdk.WithSampler(trace_sdk.AlwaysSample()),
trace_sdk.WithSpanProcessor(processor),
)
p.tracer = provider.Tracer(serviceName)
}
func (p *Provider) getResource() *resource.Resource {
attr := []attribute.KeyValue{
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(version.FullVersion()),
}
hostname, err := os.Hostname()
if err == nil {
attr = append(attr, semconv.HostNameKey.String(hostname))
}
return resource.NewSchemaless(attr...)
}
func (p *Provider) ReportOK(check health.Check, attr ...health.Attr) {
ctx := context.Background()
log.Ctx(ctx).Debug().Str("check", string(check)).Msg("health check ok")
_, span := p.tracer.Start(ctx, string(check))
span.SetStatus(codes.Ok, "")
setAttributes(span, attr...)
span.End()
}
func (p *Provider) ReportError(check health.Check, err error, attr ...health.Attr) {
ctx := context.Background()
log.Ctx(ctx).Warn().Str("check", string(check)).Err(err).Msg("health check error")
_, span := p.tracer.Start(ctx, string(check))
span.SetStatus(codes.Error, err.Error())
setAttributes(span, attr...)
span.End()
}
func setAttributes(span trace.Span, attr ...health.Attr) {
for _, a := range attr {
span.SetAttributes(attribute.String(a.Key, a.Value))
}
}

View file

@ -1,49 +0,0 @@
package reporter
import (
"time"
"go.opentelemetry.io/otel/metric"
)
type config struct {
shutdownTimeout time.Duration
collectInterval time.Duration
metrics []func(metric.Meter) error
}
// Option is a functional option for configuring the dialhome package.
type Option func(*config)
// WithShutdownTimeout sets the shutdown timeout to use for dialhome.
func WithShutdownTimeout(timeout time.Duration) Option {
return func(c *config) {
c.shutdownTimeout = timeout
}
}
// WithCollectInterval sets the collect interval for metrics to be queried.
func WithCollectInterval(interval time.Duration) Option {
return func(c *config) {
c.collectInterval = interval
}
}
// WithMetrics adds metrics to be collected
func WithMetrics(fns ...func(metric.Meter) error) Option {
return func(c *config) {
c.metrics = append(c.metrics, fns...)
}
}
func getConfig(opts ...Option) *config {
c := new(config)
defaults := []Option{
WithShutdownTimeout(time.Second * 5),
WithCollectInterval(time.Hour),
}
for _, opt := range append(defaults, opts...) {
opt(c)
}
return c
}

View file

@ -1,61 +0,0 @@
// Package reporter periodically submits metrics back to the cloud.
package reporter
import (
"context"
"fmt"
"time"
export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
metric_sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"google.golang.org/grpc"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/version"
)
// Run starts loop that pushes metrics via OTEL protocol until ctx is canceled
func Run(
ctx context.Context,
conn *grpc.ClientConn,
opts ...Option,
) error {
cfg := getConfig(opts...)
exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn))
if err != nil {
return fmt.Errorf("starting OTEL exporter: %w", err)
}
defer shutdown(exporter.Shutdown, cfg.shutdownTimeout)
provider := metric_sdk.NewMeterProvider(
metric_sdk.WithResource(resource.NewSchemaless(
semconv.ServiceNameKey.String("pomerium-managed-core"),
semconv.ServiceVersionKey.String(version.FullVersion()),
)),
metric_sdk.WithReader(
metric_sdk.NewPeriodicReader(
exporter,
metric_sdk.WithInterval(cfg.collectInterval),
)))
defer shutdown(provider.Shutdown, cfg.shutdownTimeout)
meter := provider.Meter("pomerium-managed-core")
for _, fn := range cfg.metrics {
err := fn(meter)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("error registering metric")
}
}
<-ctx.Done()
return ctx.Err()
}
func shutdown(fn func(ctx context.Context) error, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
_ = fn(ctx)
}

View file

@ -0,0 +1,31 @@
package reporter
import (
"go.opentelemetry.io/otel/sdk/metric"
)
type config struct {
producers map[string]*metricsProducer
}
type Option func(*config)
// WithProducer adds a metric producer to the reporter
func WithProducer(name string, p metric.Producer) Option {
return func(c *config) {
if _, ok := c.producers[name]; ok {
panic("duplicate producer name " + name)
}
c.producers[name] = newProducer(name, p)
}
}
func getConfig(opts ...Option) config {
c := config{
producers: make(map[string]*metricsProducer),
}
for _, opt := range opts {
opt(&c)
}
return c
}

View file

@ -0,0 +1,92 @@
package reporter
import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
trace_sdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/health"
)
type healthCheckReporter struct {
resource *resource.Resource
exporter *otlptrace.Exporter
provider *trace_sdk.TracerProvider
tracer trace.Tracer
}
// NewhealthCheckReporter creates a new unstarted health check healthCheckReporter
func newHealthCheckReporter(
conn *grpc.ClientConn,
resource *resource.Resource,
) *healthCheckReporter {
exporter := export_grpc.NewUnstarted(export_grpc.WithGRPCConn(conn))
processor := trace_sdk.NewBatchSpanProcessor(exporter)
provider := trace_sdk.NewTracerProvider(
trace_sdk.WithResource(resource),
trace_sdk.WithSampler(trace_sdk.AlwaysSample()),
trace_sdk.WithSpanProcessor(processor),
)
tracer := provider.Tracer(serviceName)
return &healthCheckReporter{
resource: resource,
exporter: exporter,
tracer: tracer,
provider: provider,
}
}
func (r *healthCheckReporter) Run(ctx context.Context) error {
err := r.exporter.Start(ctx)
if err != nil {
// this should not happen for the gRPC exporter as its non-blocking
return fmt.Errorf("error starting health check exporter: %w", err)
}
<-ctx.Done()
return nil
}
func (r *healthCheckReporter) Shutdown(ctx context.Context) error {
return errors.Join(
r.provider.Shutdown(ctx),
r.exporter.Shutdown(ctx),
)
}
// ReportOK implements health.Provider interface
func (r *healthCheckReporter) ReportOK(check health.Check, attr ...health.Attr) {
ctx := context.Background()
log.Ctx(ctx).Debug().Str("check", string(check)).Msg("health check ok")
_, span := r.tracer.Start(ctx, string(check))
span.SetStatus(codes.Ok, "")
setAttributes(span, attr...)
span.End()
}
// ReportError implements health.Provider interface
func (r *healthCheckReporter) ReportError(check health.Check, err error, attr ...health.Attr) {
ctx := context.Background()
log.Ctx(ctx).Warn().Str("check", string(check)).Err(err).Msg("health check error")
_, span := r.tracer.Start(ctx, string(check))
span.SetStatus(codes.Error, err.Error())
setAttributes(span, attr...)
span.End()
}
func setAttributes(span trace.Span, attr ...health.Attr) {
for _, a := range attr {
span.SetAttributes(attribute.String(a.Key, a.Value))
}
}

View file

@ -0,0 +1,47 @@
package reporter
import (
"context"
"sync/atomic"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"github.com/pomerium/pomerium/internal/log"
)
type metricsProducer struct {
enabled atomic.Bool
name string
metric.Producer
}
func newProducer(name string, p metric.Producer) *metricsProducer {
return &metricsProducer{
name: name,
Producer: p,
}
}
var _ metric.Producer = (*metricsProducer)(nil)
// Produce wraps the underlying producer's Produce method and logs any errors,
// to prevent the error from blocking the export of other metrics.
// also checks if the producer is enabled before producing metrics
func (p *metricsProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
if enabled := p.enabled.Load(); !enabled {
return nil, nil
}
data, err := p.Producer.Produce(ctx)
if err != nil {
log.Error(ctx).Err(err).Str("producer", p.name).Msg("failed to produce metrics")
return nil, err
}
return data, nil
}
// SetEnabled sets the enabled state of the producer
func (p *metricsProducer) SetEnabled(v bool) {
p.enabled.Store(v)
}

View file

@ -0,0 +1,99 @@
package reporter
import (
"context"
"errors"
"fmt"
export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
metric_sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"google.golang.org/grpc"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/health"
)
type metricsReporter struct {
exporter *export_grpc.Exporter
resource *resource.Resource
reader *metric_sdk.ManualReader
producers map[string]*metricsProducer
singleTask
}
func newMetricsReporter(
ctx context.Context,
conn *grpc.ClientConn,
resource *resource.Resource,
producers map[string]*metricsProducer,
) (*metricsReporter, error) {
exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("create exporter: %w", err)
}
readerOpts := make([]metric_sdk.ManualReaderOption, 0, len(producers))
for _, p := range producers {
readerOpts = append(readerOpts, metric_sdk.WithProducer(p))
}
reader := metric_sdk.NewManualReader(readerOpts...)
return &metricsReporter{
exporter: exporter,
resource: resource,
reader: reader,
producers: producers,
}, nil
}
func (r *metricsReporter) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}
func (r *metricsReporter) Shutdown(ctx context.Context) error {
return errors.Join(
r.reader.Shutdown(ctx),
r.exporter.Shutdown(ctx),
)
}
func (r *metricsReporter) SetMetricProducerEnabled(name string, enabled bool) error {
p, ok := r.producers[name]
if !ok {
return fmt.Errorf("producer %q not found", name)
}
p.SetEnabled(enabled)
return nil
}
func (r *metricsReporter) CollectAndExportMetrics(ctx context.Context) {
r.singleTask.Run(ctx, func(ctx context.Context) {
err := r.collectAndExport(ctx)
if errors.Is(err, ErrAnotherExecutionRequested) {
log.Warn(ctx).Msg("telemetry metrics were not sent, due to another execution requested")
return
}
if err != nil {
health.ReportError(health.CollectAndSendTelemetry, err)
} else {
health.ReportOK(health.CollectAndSendTelemetry)
}
})
}
func (r *metricsReporter) collectAndExport(ctx context.Context) error {
rm := &metricdata.ResourceMetrics{
Resource: r.resource,
}
err := withBackoff(ctx, "collect metrics", func(ctx context.Context) error { return r.reader.Collect(ctx, rm) })
if err != nil {
return fmt.Errorf("collect metrics: %w", err)
}
err = withBackoff(ctx, "export metrics", func(ctx context.Context) error { return r.exporter.Export(ctx, rm) })
if err != nil {
return fmt.Errorf("export metrics: %w", err)
}
return nil
}

View file

@ -0,0 +1,99 @@
// Package reporter periodically submits metrics back to the cloud.
package reporter
import (
"context"
"fmt"
"os"
"time"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/version"
)
type Reporter struct {
*metricsReporter
*healthCheckReporter
}
const (
serviceName = "pomerium-managed-core"
)
// New creates a new unstarted zero telemetry reporter
func New(
ctx context.Context,
conn *grpc.ClientConn,
opts ...Option,
) (*Reporter, error) {
cfg := getConfig(opts...)
resource := getResource()
metrics, err := newMetricsReporter(ctx, conn, resource, cfg.producers)
if err != nil {
return nil, fmt.Errorf("failed to create metrics reporter: %w", err)
}
healthChecks := newHealthCheckReporter(conn, resource)
return &Reporter{
metricsReporter: metrics,
healthCheckReporter: healthChecks,
}, nil
}
func (r *Reporter) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { return withBackoff(ctx, "metrics reporter", r.metricsReporter.Run) })
eg.Go(func() error { return withBackoff(ctx, "health check reporter", r.healthCheckReporter.Run) })
return eg.Wait()
}
// Shutdown should be called after Run to cleanly shutdown the reporter
func (r *Reporter) Shutdown(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { return r.metricsReporter.Shutdown(ctx) })
eg.Go(func() error { return r.healthCheckReporter.Shutdown(ctx) })
return eg.Wait()
}
func getResource() *resource.Resource {
attr := []attribute.KeyValue{
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(version.FullVersion()),
}
hostname, err := os.Hostname()
if err == nil {
attr = append(attr, semconv.HostNameKey.String(hostname))
}
return resource.NewSchemaless(attr...)
}
func withBackoff(ctx context.Context, name string, f func(context.Context) error) error {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
return backoff.RetryNotify(
func() error { return f(ctx) },
backoff.WithContext(bo, ctx),
func(err error, d time.Duration) {
log.Warn(ctx).
Str("name", name).
Err(err).
Dur("backoff", d).
Msg("retrying")
},
)
}

View file

@ -0,0 +1,27 @@
package reporter
import (
"context"
"errors"
"sync"
)
type singleTask struct {
lock sync.Mutex
cancel context.CancelCauseFunc
}
var ErrAnotherExecutionRequested = errors.New("another execution requested")
func (s *singleTask) Run(ctx context.Context, f func(context.Context)) {
s.lock.Lock()
defer s.lock.Unlock()
if s.cancel != nil {
s.cancel(ErrAnotherExecutionRequested)
}
ctx, cancel := context.WithCancelCause(ctx)
s.cancel = cancel
go f(ctx)
}

View file

@ -1,4 +1,4 @@
package analytics package sessions
import ( import (
"time" "time"

View file

@ -1,4 +1,4 @@
package analytics_test package sessions_test
import ( import (
"testing" "testing"
@ -6,7 +6,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/pomerium/pomerium/internal/zero/analytics" "github.com/pomerium/pomerium/internal/zero/telemetry/sessions"
) )
func TestActiveUsers(t *testing.T) { func TestActiveUsers(t *testing.T) {
@ -15,7 +15,7 @@ func TestActiveUsers(t *testing.T) {
startTime := time.Now().UTC() startTime := time.Now().UTC()
// Create a new counter that resets on a daily interval // Create a new counter that resets on a daily interval
c := analytics.NewActiveUsersCounter(analytics.ResetDailyUTC, startTime) c := sessions.NewActiveUsersCounter(sessions.ResetDailyUTC, startTime)
count, wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute)) count, wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute))
assert.False(t, wasReset) assert.False(t, wasReset)

View file

@ -1,5 +1,5 @@
// Package analytics collects active user metrics and reports them to the cloud dashboard // Package analytics collects active user metrics and reports them to the cloud dashboard
package analytics package sessions
import ( import (
"context" "context"
@ -22,8 +22,7 @@ func Collect(
updateInterval: updateInterval, updateInterval: updateInterval,
} }
leaser := databroker.NewLeaser("pomerium-zero-analytics", c.leaseTTL(), c) return c.run(ctx)
return leaser.Run(ctx)
} }
type collector struct { type collector struct {
@ -32,7 +31,7 @@ type collector struct {
updateInterval time.Duration updateInterval time.Duration
} }
func (c *collector) RunLeased(ctx context.Context) error { func (c *collector) run(ctx context.Context) error {
err := c.loadCounters(ctx) err := c.loadCounters(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to load counters: %w", err) return fmt.Errorf("failed to load counters: %w", err)
@ -46,10 +45,6 @@ func (c *collector) RunLeased(ctx context.Context) error {
return nil return nil
} }
func (c *collector) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
return c.client
}
func (c *collector) loadCounters(ctx context.Context) error { func (c *collector) loadCounters(ctx context.Context) error {
now := time.Now() now := time.Now()
for key, resetFn := range map[string]IntervalResetFunc{ for key, resetFn := range map[string]IntervalResetFunc{
@ -121,11 +116,3 @@ func (c *collector) update(ctx context.Context) error {
return nil return nil
} }
func (c *collector) leaseTTL() time.Duration {
const defaultTTL = time.Minute * 5
if defaultTTL < c.updateInterval {
return defaultTTL
}
return c.updateInterval
}

View file

@ -1,4 +1,4 @@
package analytics package sessions
import ( import (
"context" "context"

View file

@ -0,0 +1,75 @@
package sessions
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
type producer struct {
scope instrumentation.Scope
clientProvider func() (databroker.DataBrokerServiceClient, error)
}
func NewProducer(
scope instrumentation.Scope,
clientProvider func() (databroker.DataBrokerServiceClient, error),
) metric.Producer {
return &producer{
clientProvider: clientProvider,
scope: scope,
}
}
func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
client, err := p.clientProvider()
if err != nil {
return nil, fmt.Errorf("error getting client: %w", err)
}
now := time.Now()
ids := []string{"dau", "mau"}
metrics := make([]metricdata.Metrics, len(ids))
eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < len(ids); i++ {
i := i
eg.Go(func() error {
state, err := LoadMetricState(ctx, client, ids[i])
if err != nil {
return err
}
metrics[i] = metricdata.Metrics{
Name: ids[i],
Unit: "unique users",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Time: now,
Value: int64(state.Count),
},
},
},
}
return nil
})
}
err = eg.Wait()
if err != nil {
return nil, err
}
return []metricdata.ScopeMetrics{
{
Scope: p.scope,
Metrics: metrics,
},
}, nil
}

View file

@ -1,4 +1,4 @@
package analytics package sessions
import ( import (
"context" "context"

View file

@ -1,4 +1,4 @@
package analytics package sessions
import ( import (
"context" "context"

View file

@ -1,4 +1,4 @@
package analytics_test package sessions_test
import ( import (
"testing" "testing"
@ -6,14 +6,14 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/pomerium/pomerium/internal/zero/analytics" "github.com/pomerium/pomerium/internal/zero/telemetry/sessions"
) )
func TestStorage(t *testing.T) { func TestStorage(t *testing.T) {
t.Parallel() t.Parallel()
now := time.Date(2020, 1, 2, 3, 4, 5, 6, time.UTC) now := time.Date(2020, 1, 2, 3, 4, 5, 6, time.UTC)
state := &analytics.MetricState{ state := &sessions.MetricState{
Data: []byte("data"), Data: []byte("data"),
LastReset: now, LastReset: now,
} }
@ -21,7 +21,7 @@ func TestStorage(t *testing.T) {
pbany := state.ToAny() pbany := state.ToAny()
assert.NotNil(t, pbany) assert.NotNil(t, pbany)
var newState analytics.MetricState var newState sessions.MetricState
err := newState.FromAny(pbany) err := newState.FromAny(pbany)
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, state.Data, newState.Data) assert.EqualValues(t, state.Data, newState.Data)

View file

@ -7,6 +7,8 @@ type Check string
const ( const (
// BuildDatabrokerConfig checks whether the Databroker config was applied // BuildDatabrokerConfig checks whether the Databroker config was applied
BuildDatabrokerConfig = Check("config.databroker.build") BuildDatabrokerConfig = Check("config.databroker.build")
// CollectAndSendTelemetry checks whether telemetry was collected and sent
CollectAndSendTelemetry = Check("zero.telemetry.collect-and-send")
// StorageBackend checks whether the storage backend is healthy // StorageBackend checks whether the storage backend is healthy
StorageBackend = Check("storage.backend") StorageBackend = Check("storage.backend")
// XDSCluster checks whether the XDS Cluster resources were applied // XDSCluster checks whether the XDS Cluster resources were applied