From 323b9ce9003b695dd3814b1418b7f3d620bcbb99 Mon Sep 17 00:00:00 2001 From: Denis Mishin Date: Thu, 30 May 2024 16:06:14 -0400 Subject: [PATCH] telemetry: add method to push metrics on demand --- internal/zero/api/api.go | 20 ++++++-- internal/zero/controller/controller.go | 2 +- internal/zero/reporter/reporter.go | 69 ++++++++++++++++++++------ pkg/envoy/resource_monitor_other.go | 2 +- 4 files changed, 73 insertions(+), 20 deletions(-) diff --git a/internal/zero/api/api.go b/internal/zero/api/api.go index 1dc6c2550..279233c1c 100644 --- a/internal/zero/api/api.go +++ b/internal/zero/api/api.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -13,7 +14,7 @@ import ( connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" "github.com/pomerium/pomerium/internal/zero/grpcconn" "github.com/pomerium/pomerium/internal/zero/healthcheck" - metrics_reporter "github.com/pomerium/pomerium/internal/zero/reporter" + "github.com/pomerium/pomerium/internal/zero/reporter" token_api "github.com/pomerium/pomerium/internal/zero/token" "github.com/pomerium/pomerium/pkg/fanout" cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster" @@ -25,6 +26,7 @@ type API struct { cfg *config cluster cluster_api.ClientWithResponsesInterface telemetryConn *grpc.ClientConn + reporter *reporter.Reporter mux *connect_mux.Mux downloadURLCache *cluster_api.URLCache tokenFn func(ctx context.Context, ttl time.Duration) (string, error) @@ -84,19 +86,29 @@ func NewAPI(ctx context.Context, opts ...Option) (*API, error) { return nil, fmt.Errorf("error creating OTEL exporter grpc client: %w", err) } + reporter, err := reporter.New(ctx, telemetryGRPCConn) + if err != nil { + return nil, fmt.Errorf("error creating metrics reporter: %w", err) + } + return &API{ cfg: cfg, cluster: clusterClient, mux: connect_mux.New(connect_api.NewConnectClient(connectGRPCConn)), telemetryConn: telemetryGRPCConn, + reporter: reporter, downloadURLCache: cluster_api.NewURLCache(), tokenFn: tokenCache.GetToken, }, nil } -// ReportMetrics runs metrics reporting to the cloud -func (api *API) ReportMetrics(ctx context.Context, opts ...metrics_reporter.Option) error { - return metrics_reporter.Run(ctx, api.telemetryConn, opts...) +// ReportPeriodicMetrics runs periodic metrics collection and reporting to the cloud +func (api *API) ReportPeriodicMetrics(ctx context.Context, opts ...reporter.Option) error { + return api.reporter.RunPeriodicMetricReporter(ctx, opts...) +} + +func (api *API) ReportMetrics(ctx context.Context, metrics []metricdata.Metrics) error { + return api.reporter.ReportMetrics(ctx, metrics) } // ReportHealthChecks runs health check reporting to the cloud diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index 527644a24..1065c577d 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -143,7 +143,7 @@ func (c *controller) runMetricsReporterLeased(ctx context.Context, client databr return c.Str("service", "zero-reporter") }) - return c.api.ReportMetrics(ctx, + return c.api.ReportPeriodicMetrics(ctx, reporter.WithCollectInterval(time.Hour), reporter.WithMetrics(analytics.Metrics(func() databroker.DataBrokerServiceClient { return client })...), ) diff --git a/internal/zero/reporter/reporter.go b/internal/zero/reporter/reporter.go index 21973941b..ed548ce78 100644 --- a/internal/zero/reporter/reporter.go +++ b/internal/zero/reporter/reporter.go @@ -4,10 +4,14 @@ package reporter import ( "context" "fmt" + "os" "time" + "github.com/cenkalti/backoff/v4" + "go.opentelemetry.io/otel/attribute" export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" metric_sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "google.golang.org/grpc" @@ -16,28 +20,51 @@ import ( "github.com/pomerium/pomerium/internal/version" ) -// Run starts loop that pushes metrics via OTEL protocol until ctx is canceled -func Run( +type Reporter struct { + exporter *export_grpc.Exporter + resource *resource.Resource +} + +func New(ctx context.Context, conn *grpc.ClientConn) (*Reporter, error) { + exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn)) + if err != nil { + return nil, fmt.Errorf("starting OTEL exporter: %w", err) + } + return &Reporter{ + exporter: exporter, + resource: getResource(), + }, nil +} + +func (r *Reporter) ReportMetrics(ctx context.Context, metrics []metricdata.Metrics) error { + bo := backoff.NewExponentialBackOff() + bo.MaxElapsedTime = 0 + + req := &metricdata.ResourceMetrics{ + Resource: r.resource, + ScopeMetrics: []metricdata.ScopeMetrics{ + {Metrics: metrics}, + }, + } + return backoff.RetryNotify(func() error { + return r.exporter.Export(ctx, req) + }, backoff.WithContext(bo, ctx), func(err error, d time.Duration) { + log.Ctx(ctx).Warn().Err(err).Str("retry_in", d.String()).Msg("error exporting metrics") + }) +} + +// RunPeriodicMetricReporter starts loop that pushes metrics collected periodically via OTEL protocol until ctx is canceled +func (r *Reporter) RunPeriodicMetricReporter( ctx context.Context, - conn *grpc.ClientConn, opts ...Option, ) error { cfg := getConfig(opts...) - exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn)) - if err != nil { - return fmt.Errorf("starting OTEL exporter: %w", err) - } - defer shutdown(exporter.Shutdown, cfg.shutdownTimeout) - provider := metric_sdk.NewMeterProvider( - metric_sdk.WithResource(resource.NewSchemaless( - semconv.ServiceNameKey.String("pomerium-managed-core"), - semconv.ServiceVersionKey.String(version.FullVersion()), - )), + metric_sdk.WithResource(r.resource), metric_sdk.WithReader( metric_sdk.NewPeriodicReader( - exporter, + r.exporter, metric_sdk.WithInterval(cfg.collectInterval), ))) defer shutdown(provider.Shutdown, cfg.shutdownTimeout) @@ -59,3 +86,17 @@ func shutdown(fn func(ctx context.Context) error, timeout time.Duration) { defer cancel() _ = fn(ctx) } + +func getResource() *resource.Resource { + attr := []attribute.KeyValue{ + semconv.ServiceNameKey.String("pomerium-managed-core"), + semconv.ServiceVersionKey.String(version.FullVersion()), + } + + hostname, err := os.Hostname() + if err == nil { + attr = append(attr, semconv.HostNameKey.String(hostname)) + } + + return resource.NewSchemaless(attr...) +} diff --git a/pkg/envoy/resource_monitor_other.go b/pkg/envoy/resource_monitor_other.go index d82ab9a10..036e509bb 100644 --- a/pkg/envoy/resource_monitor_other.go +++ b/pkg/envoy/resource_monitor_other.go @@ -9,6 +9,6 @@ import ( "github.com/pomerium/pomerium/config" ) -func NewSharedResourceMonitor(ctx context.Context, src config.Source, tempDir string) (ResourceMonitor, error) { +func NewSharedResourceMonitor(_ context.Context, _ config.Source, _ string) (ResourceMonitor, error) { return nil, errors.New("unsupported platform") }