diff --git a/internal/zero/api/api.go b/internal/zero/api/api.go index 1dc6c2550..9f62e0936 100644 --- a/internal/zero/api/api.go +++ b/internal/zero/api/api.go @@ -12,8 +12,6 @@ import ( "github.com/pomerium/pomerium/internal/zero/apierror" connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" "github.com/pomerium/pomerium/internal/zero/grpcconn" - "github.com/pomerium/pomerium/internal/zero/healthcheck" - metrics_reporter "github.com/pomerium/pomerium/internal/zero/reporter" token_api "github.com/pomerium/pomerium/internal/zero/token" "github.com/pomerium/pomerium/pkg/fanout" cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster" @@ -94,16 +92,6 @@ func NewAPI(ctx context.Context, opts ...Option) (*API, error) { }, nil } -// ReportMetrics runs metrics reporting to the cloud -func (api *API) ReportMetrics(ctx context.Context, opts ...metrics_reporter.Option) error { - return metrics_reporter.Run(ctx, api.telemetryConn, opts...) -} - -// ReportHealthChecks runs health check reporting to the cloud -func (api *API) ReportHealthChecks(ctx context.Context) error { - return healthcheck.NewReporter(api.telemetryConn).Run(ctx) -} - // Connect connects to the connect API and allows watching for changes func (api *API) Connect(ctx context.Context, opts ...fanout.Option) error { return api.mux.Run(ctx, opts...) @@ -127,3 +115,7 @@ func (api *API) GetClusterResourceBundles(ctx context.Context) (*cluster_api.Get api.cluster.GetClusterResourceBundlesWithResponse(ctx), ) } + +func (api *API) GetTelemetryConn() *grpc.ClientConn { + return api.telemetryConn +} diff --git a/internal/zero/controller/config.go b/internal/zero/controller/config.go index a0e80707c..edff8fcb9 100644 --- a/internal/zero/controller/config.go +++ b/internal/zero/controller/config.go @@ -17,6 +17,7 @@ type controllerConfig struct { reconcilerLeaseDuration time.Duration databrokerRequestTimeout time.Duration + shutdownTimeout time.Duration } // WithTmpDir sets the temporary directory to use. @@ -110,6 +111,13 @@ func WithDatabrokerRequestTimeout(timeout time.Duration) Option { } } +// WithShutdownTimeout sets the timeout for shutting down and cleanup. +func WithShutdownTimeout(timeout time.Duration) Option { + return func(c *controllerConfig) { + c.shutdownTimeout = timeout + } +} + func newControllerConfig(opts ...Option) *controllerConfig { c := new(controllerConfig) @@ -118,6 +126,7 @@ func newControllerConfig(opts ...Option) *controllerConfig { WithConnectAPIEndpoint("https://connect.pomerium.com"), WithDatabrokerLeaseDuration(time.Second * 30), WithDatabrokerRequestTimeout(time.Second * 30), + WithShutdownTimeout(time.Second * 10), } { opt(c) } diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index 96bc6cd06..db1f878eb 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -5,22 +5,20 @@ import ( "context" "errors" "fmt" - "time" - "github.com/cenkalti/backoff/v4" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/internal/zero/analytics" sdk "github.com/pomerium/pomerium/internal/zero/api" "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/internal/zero/bootstrap/writers" - "github.com/pomerium/pomerium/internal/zero/healthcheck" + connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" "github.com/pomerium/pomerium/internal/zero/reconciler" - "github.com/pomerium/pomerium/internal/zero/reporter" + "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/zero/connect" ) // Run runs Pomerium is managed mode using the provided token. @@ -61,7 +59,6 @@ func Run(ctx context.Context, opts ...Option) error { eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap) }) eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore) }) eg.Go(func() error { return run(ctx, "zero-control-loop", c.runZeroControlLoop) }) - eg.Go(func() error { return run(ctx, "healh-check-reporter", c.runHealthCheckReporter) }) return eg.Wait() } @@ -70,7 +67,8 @@ type controller struct { api *sdk.API - bootstrapConfig *bootstrap.Source + bootstrapConfig *bootstrap.Source + telemetryReporter *reporter.Reporter } func (c *controller) initAPI(ctx context.Context) error { @@ -85,7 +83,6 @@ func (c *controller) initAPI(ctx context.Context) error { } c.api = api - return nil } @@ -134,14 +131,35 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error { r := c.NewDatabrokerRestartRunner(ctx) defer r.Close() - return r.Run(ctx, - WithLease( - c.runReconcilerLeased, - c.runAnalyticsLeased, - c.runMetricsReporterLeased, - c.runHealthChecksLeased, - ), - ) + err = c.initTelemetry(ctx, func() (databroker.DataBrokerServiceClient, error) { + client, _, err := r.getDatabrokerClient() + return client, err + }) + if err != nil { + return fmt.Errorf("init telemetry: %w", err) + } + defer c.shutdownTelemetry(ctx) + + err = c.api.Watch(ctx, connect_mux.WithOnTelemetryRequested(func(ctx context.Context, _ *connect.TelemetryRequest) { + c.telemetryReporter.CollectAndExportMetrics(ctx) + })) + if err != nil { + return fmt.Errorf("watch telemetry: %w", err) + } + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return r.Run(ctx, + WithLease( + c.runReconcilerLeased, + c.runSessionAnalyticsLeased, + c.enableSessionAnalyticsReporting, + c.runHealthChecksLeased, + ), + ) + }) + eg.Go(func() error { return c.runTelemetryReporter(ctx) }) + return eg.Wait() } func (c *controller) runReconcilerLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { @@ -154,54 +172,3 @@ func (c *controller) runReconcilerLeased(ctx context.Context, client databroker. reconciler.WithDataBrokerClient(client), ) } - -func (c *controller) runAnalyticsLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-analytics") - }) - - err := analytics.Collect(ctx, client, time.Hour) - if err != nil && ctx.Err() == nil { - log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling") - return nil - } - - return err -} - -func (c *controller) runMetricsReporterLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-reporter") - }) - - return c.api.ReportMetrics(ctx, - reporter.WithCollectInterval(time.Hour), - reporter.WithMetrics(analytics.Metrics(func() databroker.DataBrokerServiceClient { return client })...), - ) -} - -func (c *controller) runHealthChecksLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-health-checks") - }) - - return healthcheck.RunChecks(ctx, c.bootstrapConfig, client) -} - -func (c *controller) runHealthCheckReporter(ctx context.Context) error { - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-health-check-reporter") - }) - - bo := backoff.NewExponentialBackOff() - bo.MaxElapsedTime = 0 - return backoff.RetryNotify( - func() error { - return c.api.ReportHealthChecks(ctx) - }, - backoff.WithContext(bo, ctx), - func(err error, next time.Duration) { - log.Ctx(ctx).Warn().Err(err).Dur("next", next).Msg("health check reporter backoff") - }, - ) -} diff --git a/internal/zero/controller/telemetry.go b/internal/zero/controller/telemetry.go new file mode 100644 index 000000000..ac1b62b91 --- /dev/null +++ b/internal/zero/controller/telemetry.go @@ -0,0 +1,79 @@ +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog" + "go.opentelemetry.io/otel/sdk/instrumentation" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/zero/healthcheck" + "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" + "github.com/pomerium/pomerium/internal/zero/telemetry/sessions" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/health" +) + +const ( + producerSessionAnalytics = "session-analytics" +) + +func (c *controller) initTelemetry(ctx context.Context, clientProvider func() (databroker.DataBrokerServiceClient, error)) error { + sessionMetricProducer := sessions.NewProducer(instrumentation.Scope{}, clientProvider) + r, err := reporter.New(ctx, c.api.GetTelemetryConn(), + reporter.WithProducer(producerSessionAnalytics, sessionMetricProducer), + ) + if err != nil { + return fmt.Errorf("error creating telemetry metrics reporter: %w", err) + } + c.telemetryReporter = r + return nil +} + +func (c *controller) shutdownTelemetry(ctx context.Context) { + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), c.cfg.shutdownTimeout) + defer cancel() + + err := c.telemetryReporter.Shutdown(ctx) + if err != nil { + log.Warn(ctx).Err(err).Msg("telemetry reporter shutdown error") + } +} + +func (c *controller) runSessionAnalyticsLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("service", "zero-analytics") + }) + + return sessions.Collect(ctx, client, time.Hour) +} + +// those metrics are cluster-wide, so we only enable their reporting when we have the lease +func (c *controller) enableSessionAnalyticsReporting(ctx context.Context, _ databroker.DataBrokerServiceClient) error { + _ = c.telemetryReporter.SetMetricProducerEnabled(producerSessionAnalytics, true) + defer func() { _ = c.telemetryReporter.SetMetricProducerEnabled(producerSessionAnalytics, false) }() + + <-ctx.Done() + return nil +} + +func (c *controller) runHealthChecksLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("service", "zero-health-checks") + }) + + return healthcheck.RunChecks(ctx, c.bootstrapConfig, client) +} + +func (c *controller) runTelemetryReporter(ctx context.Context) error { + health.SetProvider(c.telemetryReporter) + defer health.SetProvider(nil) + + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("service", "zero-bootstrap") + }) + + return c.telemetryReporter.Run(ctx) +} diff --git a/internal/zero/healthcheck/provider.go b/internal/zero/healthcheck/provider.go deleted file mode 100644 index b62ad7243..000000000 --- a/internal/zero/healthcheck/provider.go +++ /dev/null @@ -1,126 +0,0 @@ -package healthcheck - -import ( - "context" - "fmt" - "os" - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace" - export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/sdk/resource" - trace_sdk "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - "go.opentelemetry.io/otel/trace" - "google.golang.org/grpc" - - "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/internal/version" - "github.com/pomerium/pomerium/pkg/health" -) - -type Provider struct { - exporter *otlptrace.Exporter - tracer trace.Tracer -} - -var _ health.Provider = (*Provider)(nil) - -const ( - shutdownTimeout = 30 * time.Second - serviceName = "pomerium-managed-core" -) - -// NewReporter creates a new unstarted health check reporter -func NewReporter( - conn *grpc.ClientConn, -) *Provider { - p := new(Provider) - p.init(conn) - - return p -} - -func (p *Provider) Run(ctx context.Context) error { - health.SetProvider(p) - defer health.SetProvider(nil) - - // we want the exporter - xc, cancel := context.WithCancel(context.WithoutCancel(ctx)) - defer cancel() - - err := p.exporter.Start(xc) - if err != nil { - // this should not happen for the gRPC exporter as its non-blocking - return fmt.Errorf("error starting health check exporter: %w", err) - } - - <-ctx.Done() - return p.shutdown(xc) -} - -func (p *Provider) shutdown(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) - defer cancel() - - return p.exporter.Shutdown(ctx) -} - -func (p *Provider) init(conn *grpc.ClientConn) { - p.initExporter(conn) - p.initTracer() -} - -func (p *Provider) initExporter(conn *grpc.ClientConn) { - p.exporter = export_grpc.NewUnstarted(export_grpc.WithGRPCConn(conn)) -} - -func (p *Provider) initTracer() { - processor := trace_sdk.NewBatchSpanProcessor(p.exporter) - provider := trace_sdk.NewTracerProvider( - trace_sdk.WithResource(p.getResource()), - trace_sdk.WithSampler(trace_sdk.AlwaysSample()), - trace_sdk.WithSpanProcessor(processor), - ) - p.tracer = provider.Tracer(serviceName) -} - -func (p *Provider) getResource() *resource.Resource { - attr := []attribute.KeyValue{ - semconv.ServiceNameKey.String(serviceName), - semconv.ServiceVersionKey.String(version.FullVersion()), - } - - hostname, err := os.Hostname() - if err == nil { - attr = append(attr, semconv.HostNameKey.String(hostname)) - } - - return resource.NewSchemaless(attr...) -} - -func (p *Provider) ReportOK(check health.Check, attr ...health.Attr) { - ctx := context.Background() - log.Ctx(ctx).Debug().Str("check", string(check)).Msg("health check ok") - _, span := p.tracer.Start(ctx, string(check)) - span.SetStatus(codes.Ok, "") - setAttributes(span, attr...) - span.End() -} - -func (p *Provider) ReportError(check health.Check, err error, attr ...health.Attr) { - ctx := context.Background() - log.Ctx(ctx).Warn().Str("check", string(check)).Err(err).Msg("health check error") - _, span := p.tracer.Start(ctx, string(check)) - span.SetStatus(codes.Error, err.Error()) - setAttributes(span, attr...) - span.End() -} - -func setAttributes(span trace.Span, attr ...health.Attr) { - for _, a := range attr { - span.SetAttributes(attribute.String(a.Key, a.Value)) - } -} diff --git a/internal/zero/reporter/config.go b/internal/zero/reporter/config.go deleted file mode 100644 index 2ff991fe5..000000000 --- a/internal/zero/reporter/config.go +++ /dev/null @@ -1,49 +0,0 @@ -package reporter - -import ( - "time" - - "go.opentelemetry.io/otel/metric" -) - -type config struct { - shutdownTimeout time.Duration - collectInterval time.Duration - metrics []func(metric.Meter) error -} - -// Option is a functional option for configuring the dialhome package. -type Option func(*config) - -// WithShutdownTimeout sets the shutdown timeout to use for dialhome. -func WithShutdownTimeout(timeout time.Duration) Option { - return func(c *config) { - c.shutdownTimeout = timeout - } -} - -// WithCollectInterval sets the collect interval for metrics to be queried. -func WithCollectInterval(interval time.Duration) Option { - return func(c *config) { - c.collectInterval = interval - } -} - -// WithMetrics adds metrics to be collected -func WithMetrics(fns ...func(metric.Meter) error) Option { - return func(c *config) { - c.metrics = append(c.metrics, fns...) - } -} - -func getConfig(opts ...Option) *config { - c := new(config) - defaults := []Option{ - WithShutdownTimeout(time.Second * 5), - WithCollectInterval(time.Hour), - } - for _, opt := range append(defaults, opts...) { - opt(c) - } - return c -} diff --git a/internal/zero/reporter/reporter.go b/internal/zero/reporter/reporter.go deleted file mode 100644 index 21973941b..000000000 --- a/internal/zero/reporter/reporter.go +++ /dev/null @@ -1,61 +0,0 @@ -// Package reporter periodically submits metrics back to the cloud. -package reporter - -import ( - "context" - "fmt" - "time" - - export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - metric_sdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - "google.golang.org/grpc" - - "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/internal/version" -) - -// Run starts loop that pushes metrics via OTEL protocol until ctx is canceled -func Run( - ctx context.Context, - conn *grpc.ClientConn, - opts ...Option, -) error { - cfg := getConfig(opts...) - - exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn)) - if err != nil { - return fmt.Errorf("starting OTEL exporter: %w", err) - } - defer shutdown(exporter.Shutdown, cfg.shutdownTimeout) - - provider := metric_sdk.NewMeterProvider( - metric_sdk.WithResource(resource.NewSchemaless( - semconv.ServiceNameKey.String("pomerium-managed-core"), - semconv.ServiceVersionKey.String(version.FullVersion()), - )), - metric_sdk.WithReader( - metric_sdk.NewPeriodicReader( - exporter, - metric_sdk.WithInterval(cfg.collectInterval), - ))) - defer shutdown(provider.Shutdown, cfg.shutdownTimeout) - - meter := provider.Meter("pomerium-managed-core") - for _, fn := range cfg.metrics { - err := fn(meter) - if err != nil { - log.Ctx(ctx).Error().Err(err).Msg("error registering metric") - } - } - - <-ctx.Done() - return ctx.Err() -} - -func shutdown(fn func(ctx context.Context) error, timeout time.Duration) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - _ = fn(ctx) -} diff --git a/internal/zero/telemetry/reporter/config.go b/internal/zero/telemetry/reporter/config.go new file mode 100644 index 000000000..3c9b29e5f --- /dev/null +++ b/internal/zero/telemetry/reporter/config.go @@ -0,0 +1,31 @@ +package reporter + +import ( + "go.opentelemetry.io/otel/sdk/metric" +) + +type config struct { + producers map[string]*metricsProducer +} + +type Option func(*config) + +// WithProducer adds a metric producer to the reporter +func WithProducer(name string, p metric.Producer) Option { + return func(c *config) { + if _, ok := c.producers[name]; ok { + panic("duplicate producer name " + name) + } + c.producers[name] = newProducer(name, p) + } +} + +func getConfig(opts ...Option) config { + c := config{ + producers: make(map[string]*metricsProducer), + } + for _, opt := range opts { + opt(&c) + } + return c +} diff --git a/internal/zero/telemetry/reporter/healthcheck.go b/internal/zero/telemetry/reporter/healthcheck.go new file mode 100644 index 000000000..e494db6fd --- /dev/null +++ b/internal/zero/telemetry/reporter/healthcheck.go @@ -0,0 +1,92 @@ +package reporter + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" + trace_sdk "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/pkg/health" +) + +type healthCheckReporter struct { + resource *resource.Resource + exporter *otlptrace.Exporter + provider *trace_sdk.TracerProvider + tracer trace.Tracer +} + +// NewhealthCheckReporter creates a new unstarted health check healthCheckReporter +func newHealthCheckReporter( + conn *grpc.ClientConn, + resource *resource.Resource, +) *healthCheckReporter { + exporter := export_grpc.NewUnstarted(export_grpc.WithGRPCConn(conn)) + processor := trace_sdk.NewBatchSpanProcessor(exporter) + provider := trace_sdk.NewTracerProvider( + trace_sdk.WithResource(resource), + trace_sdk.WithSampler(trace_sdk.AlwaysSample()), + trace_sdk.WithSpanProcessor(processor), + ) + tracer := provider.Tracer(serviceName) + + return &healthCheckReporter{ + resource: resource, + exporter: exporter, + tracer: tracer, + provider: provider, + } +} + +func (r *healthCheckReporter) Run(ctx context.Context) error { + err := r.exporter.Start(ctx) + if err != nil { + // this should not happen for the gRPC exporter as its non-blocking + return fmt.Errorf("error starting health check exporter: %w", err) + } + + <-ctx.Done() + return nil +} + +func (r *healthCheckReporter) Shutdown(ctx context.Context) error { + return errors.Join( + r.provider.Shutdown(ctx), + r.exporter.Shutdown(ctx), + ) +} + +// ReportOK implements health.Provider interface +func (r *healthCheckReporter) ReportOK(check health.Check, attr ...health.Attr) { + ctx := context.Background() + log.Ctx(ctx).Debug().Str("check", string(check)).Msg("health check ok") + _, span := r.tracer.Start(ctx, string(check)) + span.SetStatus(codes.Ok, "") + setAttributes(span, attr...) + span.End() +} + +// ReportError implements health.Provider interface +func (r *healthCheckReporter) ReportError(check health.Check, err error, attr ...health.Attr) { + ctx := context.Background() + log.Ctx(ctx).Warn().Str("check", string(check)).Err(err).Msg("health check error") + _, span := r.tracer.Start(ctx, string(check)) + span.SetStatus(codes.Error, err.Error()) + setAttributes(span, attr...) + span.End() +} + +func setAttributes(span trace.Span, attr ...health.Attr) { + for _, a := range attr { + span.SetAttributes(attribute.String(a.Key, a.Value)) + } +} diff --git a/internal/zero/telemetry/reporter/metrics_producer.go b/internal/zero/telemetry/reporter/metrics_producer.go new file mode 100644 index 000000000..d3070c804 --- /dev/null +++ b/internal/zero/telemetry/reporter/metrics_producer.go @@ -0,0 +1,47 @@ +package reporter + +import ( + "context" + "sync/atomic" + + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/pomerium/pomerium/internal/log" +) + +type metricsProducer struct { + enabled atomic.Bool + name string + metric.Producer +} + +func newProducer(name string, p metric.Producer) *metricsProducer { + return &metricsProducer{ + name: name, + Producer: p, + } +} + +var _ metric.Producer = (*metricsProducer)(nil) + +// Produce wraps the underlying producer's Produce method and logs any errors, +// to prevent the error from blocking the export of other metrics. +// also checks if the producer is enabled before producing metrics +func (p *metricsProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + if enabled := p.enabled.Load(); !enabled { + return nil, nil + } + + data, err := p.Producer.Produce(ctx) + if err != nil { + log.Error(ctx).Err(err).Str("producer", p.name).Msg("failed to produce metrics") + return nil, err + } + return data, nil +} + +// SetEnabled sets the enabled state of the producer +func (p *metricsProducer) SetEnabled(v bool) { + p.enabled.Store(v) +} diff --git a/internal/zero/telemetry/reporter/metrics_reporter.go b/internal/zero/telemetry/reporter/metrics_reporter.go new file mode 100644 index 000000000..e36e8c38f --- /dev/null +++ b/internal/zero/telemetry/reporter/metrics_reporter.go @@ -0,0 +1,99 @@ +package reporter + +import ( + "context" + "errors" + "fmt" + + export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + metric_sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/resource" + "google.golang.org/grpc" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/pkg/health" +) + +type metricsReporter struct { + exporter *export_grpc.Exporter + resource *resource.Resource + reader *metric_sdk.ManualReader + producers map[string]*metricsProducer + singleTask +} + +func newMetricsReporter( + ctx context.Context, + conn *grpc.ClientConn, + resource *resource.Resource, + producers map[string]*metricsProducer, +) (*metricsReporter, error) { + exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("create exporter: %w", err) + } + readerOpts := make([]metric_sdk.ManualReaderOption, 0, len(producers)) + for _, p := range producers { + readerOpts = append(readerOpts, metric_sdk.WithProducer(p)) + } + reader := metric_sdk.NewManualReader(readerOpts...) + return &metricsReporter{ + exporter: exporter, + resource: resource, + reader: reader, + producers: producers, + }, nil +} + +func (r *metricsReporter) Run(ctx context.Context) error { + <-ctx.Done() + return nil +} + +func (r *metricsReporter) Shutdown(ctx context.Context) error { + return errors.Join( + r.reader.Shutdown(ctx), + r.exporter.Shutdown(ctx), + ) +} + +func (r *metricsReporter) SetMetricProducerEnabled(name string, enabled bool) error { + p, ok := r.producers[name] + if !ok { + return fmt.Errorf("producer %q not found", name) + } + p.SetEnabled(enabled) + return nil +} + +func (r *metricsReporter) CollectAndExportMetrics(ctx context.Context) { + r.singleTask.Run(ctx, func(ctx context.Context) { + err := r.collectAndExport(ctx) + if errors.Is(err, ErrAnotherExecutionRequested) { + log.Warn(ctx).Msg("telemetry metrics were not sent, due to another execution requested") + return + } + if err != nil { + health.ReportError(health.CollectAndSendTelemetry, err) + } else { + health.ReportOK(health.CollectAndSendTelemetry) + } + }) +} + +func (r *metricsReporter) collectAndExport(ctx context.Context) error { + rm := &metricdata.ResourceMetrics{ + Resource: r.resource, + } + err := withBackoff(ctx, "collect metrics", func(ctx context.Context) error { return r.reader.Collect(ctx, rm) }) + if err != nil { + return fmt.Errorf("collect metrics: %w", err) + } + + err = withBackoff(ctx, "export metrics", func(ctx context.Context) error { return r.exporter.Export(ctx, rm) }) + if err != nil { + return fmt.Errorf("export metrics: %w", err) + } + return nil +} diff --git a/internal/zero/telemetry/reporter/reporter.go b/internal/zero/telemetry/reporter/reporter.go new file mode 100644 index 000000000..e32418a88 --- /dev/null +++ b/internal/zero/telemetry/reporter/reporter.go @@ -0,0 +1,99 @@ +// Package reporter periodically submits metrics back to the cloud. +package reporter + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/cenkalti/backoff/v4" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/version" +) + +type Reporter struct { + *metricsReporter + *healthCheckReporter +} + +const ( + serviceName = "pomerium-managed-core" +) + +// New creates a new unstarted zero telemetry reporter +func New( + ctx context.Context, + conn *grpc.ClientConn, + opts ...Option, +) (*Reporter, error) { + cfg := getConfig(opts...) + resource := getResource() + + metrics, err := newMetricsReporter(ctx, conn, resource, cfg.producers) + if err != nil { + return nil, fmt.Errorf("failed to create metrics reporter: %w", err) + } + + healthChecks := newHealthCheckReporter(conn, resource) + + return &Reporter{ + metricsReporter: metrics, + healthCheckReporter: healthChecks, + }, nil +} + +func (r *Reporter) Run(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { return withBackoff(ctx, "metrics reporter", r.metricsReporter.Run) }) + eg.Go(func() error { return withBackoff(ctx, "health check reporter", r.healthCheckReporter.Run) }) + + return eg.Wait() +} + +// Shutdown should be called after Run to cleanly shutdown the reporter +func (r *Reporter) Shutdown(ctx context.Context) error { + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { return r.metricsReporter.Shutdown(ctx) }) + eg.Go(func() error { return r.healthCheckReporter.Shutdown(ctx) }) + + return eg.Wait() +} + +func getResource() *resource.Resource { + attr := []attribute.KeyValue{ + semconv.ServiceNameKey.String(serviceName), + semconv.ServiceVersionKey.String(version.FullVersion()), + } + + hostname, err := os.Hostname() + if err == nil { + attr = append(attr, semconv.HostNameKey.String(hostname)) + } + + return resource.NewSchemaless(attr...) +} + +func withBackoff(ctx context.Context, name string, f func(context.Context) error) error { + bo := backoff.NewExponentialBackOff() + bo.MaxElapsedTime = 0 + return backoff.RetryNotify( + func() error { return f(ctx) }, + backoff.WithContext(bo, ctx), + func(err error, d time.Duration) { + log.Warn(ctx). + Str("name", name). + Err(err). + Dur("backoff", d). + Msg("retrying") + }, + ) +} diff --git a/internal/zero/telemetry/reporter/singletask.go b/internal/zero/telemetry/reporter/singletask.go new file mode 100644 index 000000000..303f59619 --- /dev/null +++ b/internal/zero/telemetry/reporter/singletask.go @@ -0,0 +1,27 @@ +package reporter + +import ( + "context" + "errors" + "sync" +) + +type singleTask struct { + lock sync.Mutex + cancel context.CancelCauseFunc +} + +var ErrAnotherExecutionRequested = errors.New("another execution requested") + +func (s *singleTask) Run(ctx context.Context, f func(context.Context)) { + s.lock.Lock() + defer s.lock.Unlock() + + if s.cancel != nil { + s.cancel(ErrAnotherExecutionRequested) + } + + ctx, cancel := context.WithCancelCause(ctx) + s.cancel = cancel + go f(ctx) +} diff --git a/internal/zero/analytics/activeusers.go b/internal/zero/telemetry/sessions/activeusers.go similarity index 99% rename from internal/zero/analytics/activeusers.go rename to internal/zero/telemetry/sessions/activeusers.go index b5dffeacf..ff7d4bcbb 100644 --- a/internal/zero/analytics/activeusers.go +++ b/internal/zero/telemetry/sessions/activeusers.go @@ -1,4 +1,4 @@ -package analytics +package sessions import ( "time" diff --git a/internal/zero/analytics/activeusers_test.go b/internal/zero/telemetry/sessions/activeusers_test.go similarity index 85% rename from internal/zero/analytics/activeusers_test.go rename to internal/zero/telemetry/sessions/activeusers_test.go index 226365159..a3e625f96 100644 --- a/internal/zero/analytics/activeusers_test.go +++ b/internal/zero/telemetry/sessions/activeusers_test.go @@ -1,4 +1,4 @@ -package analytics_test +package sessions_test import ( "testing" @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/pomerium/pomerium/internal/zero/analytics" + "github.com/pomerium/pomerium/internal/zero/telemetry/sessions" ) func TestActiveUsers(t *testing.T) { @@ -15,7 +15,7 @@ func TestActiveUsers(t *testing.T) { startTime := time.Now().UTC() // Create a new counter that resets on a daily interval - c := analytics.NewActiveUsersCounter(analytics.ResetDailyUTC, startTime) + c := sessions.NewActiveUsersCounter(sessions.ResetDailyUTC, startTime) count, wasReset := c.Update([]string{"user1", "user2"}, startTime.Add(time.Minute)) assert.False(t, wasReset) diff --git a/internal/zero/analytics/collector.go b/internal/zero/telemetry/sessions/collector.go similarity index 85% rename from internal/zero/analytics/collector.go rename to internal/zero/telemetry/sessions/collector.go index f31cbdf1d..0ff545cef 100644 --- a/internal/zero/analytics/collector.go +++ b/internal/zero/telemetry/sessions/collector.go @@ -1,5 +1,5 @@ // Package analytics collects active user metrics and reports them to the cloud dashboard -package analytics +package sessions import ( "context" @@ -22,8 +22,7 @@ func Collect( updateInterval: updateInterval, } - leaser := databroker.NewLeaser("pomerium-zero-analytics", c.leaseTTL(), c) - return leaser.Run(ctx) + return c.run(ctx) } type collector struct { @@ -32,7 +31,7 @@ type collector struct { updateInterval time.Duration } -func (c *collector) RunLeased(ctx context.Context) error { +func (c *collector) run(ctx context.Context) error { err := c.loadCounters(ctx) if err != nil { return fmt.Errorf("failed to load counters: %w", err) @@ -46,10 +45,6 @@ func (c *collector) RunLeased(ctx context.Context) error { return nil } -func (c *collector) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient { - return c.client -} - func (c *collector) loadCounters(ctx context.Context) error { now := time.Now() for key, resetFn := range map[string]IntervalResetFunc{ @@ -121,11 +116,3 @@ func (c *collector) update(ctx context.Context) error { return nil } - -func (c *collector) leaseTTL() time.Duration { - const defaultTTL = time.Minute * 5 - if defaultTTL < c.updateInterval { - return defaultTTL - } - return c.updateInterval -} diff --git a/internal/zero/analytics/metrics.go b/internal/zero/telemetry/sessions/metrics.go similarity index 98% rename from internal/zero/analytics/metrics.go rename to internal/zero/telemetry/sessions/metrics.go index de67dbcdd..abd1a12c8 100644 --- a/internal/zero/analytics/metrics.go +++ b/internal/zero/telemetry/sessions/metrics.go @@ -1,4 +1,4 @@ -package analytics +package sessions import ( "context" diff --git a/internal/zero/telemetry/sessions/producer.go b/internal/zero/telemetry/sessions/producer.go new file mode 100644 index 000000000..cc5b57a57 --- /dev/null +++ b/internal/zero/telemetry/sessions/producer.go @@ -0,0 +1,75 @@ +package sessions + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "golang.org/x/sync/errgroup" + + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +type producer struct { + scope instrumentation.Scope + clientProvider func() (databroker.DataBrokerServiceClient, error) +} + +func NewProducer( + scope instrumentation.Scope, + clientProvider func() (databroker.DataBrokerServiceClient, error), +) metric.Producer { + return &producer{ + clientProvider: clientProvider, + scope: scope, + } +} + +func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + client, err := p.clientProvider() + if err != nil { + return nil, fmt.Errorf("error getting client: %w", err) + } + + now := time.Now() + ids := []string{"dau", "mau"} + metrics := make([]metricdata.Metrics, len(ids)) + eg, ctx := errgroup.WithContext(ctx) + for i := 0; i < len(ids); i++ { + i := i + eg.Go(func() error { + state, err := LoadMetricState(ctx, client, ids[i]) + if err != nil { + return err + } + metrics[i] = metricdata.Metrics{ + Name: ids[i], + Unit: "unique users", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Time: now, + Value: int64(state.Count), + }, + }, + }, + } + return nil + }) + } + + err = eg.Wait() + if err != nil { + return nil, err + } + + return []metricdata.ScopeMetrics{ + { + Scope: p.scope, + Metrics: metrics, + }, + }, nil +} diff --git a/internal/zero/analytics/sessions.go b/internal/zero/telemetry/sessions/sessions.go similarity index 98% rename from internal/zero/analytics/sessions.go rename to internal/zero/telemetry/sessions/sessions.go index 26799c2b7..bae7deb3c 100644 --- a/internal/zero/analytics/sessions.go +++ b/internal/zero/telemetry/sessions/sessions.go @@ -1,4 +1,4 @@ -package analytics +package sessions import ( "context" diff --git a/internal/zero/analytics/storage.go b/internal/zero/telemetry/sessions/storage.go similarity index 99% rename from internal/zero/analytics/storage.go rename to internal/zero/telemetry/sessions/storage.go index 617df6426..f606ffe51 100644 --- a/internal/zero/analytics/storage.go +++ b/internal/zero/telemetry/sessions/storage.go @@ -1,4 +1,4 @@ -package analytics +package sessions import ( "context" diff --git a/internal/zero/analytics/storage_test.go b/internal/zero/telemetry/sessions/storage_test.go similarity index 75% rename from internal/zero/analytics/storage_test.go rename to internal/zero/telemetry/sessions/storage_test.go index 53868403f..751d48bc1 100644 --- a/internal/zero/analytics/storage_test.go +++ b/internal/zero/telemetry/sessions/storage_test.go @@ -1,4 +1,4 @@ -package analytics_test +package sessions_test import ( "testing" @@ -6,14 +6,14 @@ import ( "github.com/stretchr/testify/assert" - "github.com/pomerium/pomerium/internal/zero/analytics" + "github.com/pomerium/pomerium/internal/zero/telemetry/sessions" ) func TestStorage(t *testing.T) { t.Parallel() now := time.Date(2020, 1, 2, 3, 4, 5, 6, time.UTC) - state := &analytics.MetricState{ + state := &sessions.MetricState{ Data: []byte("data"), LastReset: now, } @@ -21,7 +21,7 @@ func TestStorage(t *testing.T) { pbany := state.ToAny() assert.NotNil(t, pbany) - var newState analytics.MetricState + var newState sessions.MetricState err := newState.FromAny(pbany) assert.NoError(t, err) assert.EqualValues(t, state.Data, newState.Data) diff --git a/pkg/health/check.go b/pkg/health/check.go index aa9a9d88b..528d7a8ae 100644 --- a/pkg/health/check.go +++ b/pkg/health/check.go @@ -7,6 +7,8 @@ type Check string const ( // BuildDatabrokerConfig checks whether the Databroker config was applied BuildDatabrokerConfig = Check("config.databroker.build") + // CollectAndSendTelemetry checks whether telemetry was collected and sent + CollectAndSendTelemetry = Check("zero.telemetry.collect-and-send") // StorageBackend checks whether the storage backend is healthy StorageBackend = Check("storage.backend") // XDSCluster checks whether the XDS Cluster resources were applied