From c945b08bcfdc73118b1b0907e133853fd32ca63a Mon Sep 17 00:00:00 2001 From: Denis Mishin Date: Wed, 12 Jun 2024 22:34:45 -0400 Subject: [PATCH] merge main, wire with rest of funcs --- internal/telemetry/prometheus/producer.go | 29 ++++++++---- internal/zero/controller/controller.go | 9 ---- internal/zero/controller/telemetry.go | 50 +++++++++++++++++++- internal/zero/telemetry/sessions/producer.go | 19 ++++++-- 4 files changed, 83 insertions(+), 24 deletions(-) diff --git a/internal/telemetry/prometheus/producer.go b/internal/telemetry/prometheus/producer.go index 60ea36abf..39d1a1adc 100644 --- a/internal/telemetry/prometheus/producer.go +++ b/internal/telemetry/prometheus/producer.go @@ -8,7 +8,6 @@ import ( "time" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -63,6 +62,12 @@ func WithIncludeLabels(labels ...string) ProducerOption { } } +func WithScrapeURL(scrapeURL string) ProducerOption { + return func(cfg *producerConfig) { + cfg.scrapeURL = scrapeURL + } +} + func (cfg *producerConfig) Validate() error { if cfg.client == nil { return fmt.Errorf("HTTP client is required") @@ -89,31 +94,37 @@ func newProducerConfig(opts ...ProducerOption) (*producerConfig, error) { return cfg, nil } -type producer struct { +type Producer struct { producerConfig atomic.Value } -func NewProducer(opts ...ProducerOption) (metric.Producer, error) { +func NewProducer(opts ...ProducerOption) (*Producer, error) { cfg, err := newProducerConfig(opts...) if err != nil { return nil, err } - p := new(producer) + p := new(Producer) p.setConfig(cfg) return p, nil } -func (p *producer) SetConfig(opts ...ProducerOption) { +func (p *Producer) SetConfig(opts ...ProducerOption) error { cfg, err := newProducerConfig(opts...) if err != nil { - panic(err) + return err } p.setConfig(cfg) + return nil } -func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { +func (p *Producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { cfg := p.loadConfig() + + if len(cfg.metrics) == 0 { + return nil, nil + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, cfg.scrapeURL, nil) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) @@ -135,11 +146,11 @@ func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, erro }, nil } -func (p *producer) setConfig(cfg *producerConfig) { +func (p *Producer) setConfig(cfg *producerConfig) { p.producerConfig.Store(cfg) } -func (p *producer) loadConfig() *producerConfig { +func (p *Producer) loadConfig() *producerConfig { return p.producerConfig.Load().(*producerConfig) } diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index db1f878eb..5fef00417 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -13,12 +13,10 @@ import ( sdk "github.com/pomerium/pomerium/internal/zero/api" "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/internal/zero/bootstrap/writers" - connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" "github.com/pomerium/pomerium/internal/zero/reconciler" "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/grpc/databroker" - "github.com/pomerium/pomerium/pkg/zero/connect" ) // Run runs Pomerium is managed mode using the provided token. @@ -140,13 +138,6 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error { } defer c.shutdownTelemetry(ctx) - err = c.api.Watch(ctx, connect_mux.WithOnTelemetryRequested(func(ctx context.Context, _ *connect.TelemetryRequest) { - c.telemetryReporter.CollectAndExportMetrics(ctx) - })) - if err != nil { - return fmt.Errorf("watch telemetry: %w", err) - } - eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { return r.Run(ctx, diff --git a/internal/zero/controller/telemetry.go b/internal/zero/controller/telemetry.go index ac1b62b91..ee70027a1 100644 --- a/internal/zero/controller/telemetry.go +++ b/internal/zero/controller/telemetry.go @@ -3,35 +3,83 @@ package controller import ( "context" "fmt" + "net" + "net/url" "time" "github.com/rs/zerolog" "go.opentelemetry.io/otel/sdk/instrumentation" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/prometheus" + connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" "github.com/pomerium/pomerium/internal/zero/healthcheck" "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" "github.com/pomerium/pomerium/internal/zero/telemetry/sessions" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/health" + "github.com/pomerium/pomerium/pkg/zero/connect" ) const ( producerSessionAnalytics = "session-analytics" + producerEnvoy = "envoy" ) func (c *controller) initTelemetry(ctx context.Context, clientProvider func() (databroker.DataBrokerServiceClient, error)) error { - sessionMetricProducer := sessions.NewProducer(instrumentation.Scope{}, clientProvider) + startTime := time.Now() + + sessionMetricProducer := sessions.NewProducer(instrumentation.Scope{Name: "cluster"}, clientProvider) + envoyMetricProducer, err := prometheus.NewProducer(c.buildEnvoyMetricProducerOptions(nil, nil, startTime)...) + if err != nil { + return fmt.Errorf("error creating envoy metric producer: %w", err) + } + r, err := reporter.New(ctx, c.api.GetTelemetryConn(), reporter.WithProducer(producerSessionAnalytics, sessionMetricProducer), + reporter.WithProducer(producerEnvoy, envoyMetricProducer), ) if err != nil { return fmt.Errorf("error creating telemetry metrics reporter: %w", err) } + + err = c.api.Watch(ctx, connect_mux.WithOnTelemetryRequested(func(ctx context.Context, req *connect.TelemetryRequest) { + sessionMetricProducer.SetEnabled(req.GetSessionAnalytics() != nil) + + if envoyMetricRequest := req.GetEnvoyMetrics(); envoyMetricRequest != nil { + opts := c.buildEnvoyMetricProducerOptions(envoyMetricRequest.GetMetrics(), envoyMetricRequest.GetLabels(), startTime) + err := envoyMetricProducer.SetConfig(opts...) + if err != nil { + log.Warn(ctx).Err(err).Msg("failed to set envoy metric producer options") + } + } else { + _ = envoyMetricProducer.SetConfig(c.buildEnvoyMetricProducerOptions(nil, nil, startTime)...) + } + + c.telemetryReporter.CollectAndExportMetrics(ctx) + })) + if err != nil { + return fmt.Errorf("watch telemetry: %w", err) + } + c.telemetryReporter = r return nil } +func (c *controller) buildEnvoyMetricProducerOptions(metrics, labels []string, startTime time.Time) []prometheus.ProducerOption { + return []prometheus.ProducerOption{ + prometheus.WithIncludeMetrics(metrics...), + prometheus.WithIncludeLabels(labels...), + prometheus.WithScope(instrumentation.Scope{Name: "envoy"}), + prometheus.WithScrapeURL((&url.URL{ + Scheme: "http", + Host: net.JoinHostPort("localhost", c.bootstrapConfig.GetConfig().OutboundPort), + Path: "/envoy/stats/prometheus", + }).String()), + prometheus.WithStartTime(startTime), + } +} + func (c *controller) shutdownTelemetry(ctx context.Context) { ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), c.cfg.shutdownTimeout) defer cancel() diff --git a/internal/zero/telemetry/sessions/producer.go b/internal/zero/telemetry/sessions/producer.go index cc5b57a57..1a087a872 100644 --- a/internal/zero/telemetry/sessions/producer.go +++ b/internal/zero/telemetry/sessions/producer.go @@ -3,32 +3,37 @@ package sessions import ( "context" "fmt" + "sync/atomic" "time" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "golang.org/x/sync/errgroup" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) -type producer struct { +type Producer struct { scope instrumentation.Scope clientProvider func() (databroker.DataBrokerServiceClient, error) + enabled atomic.Bool } func NewProducer( scope instrumentation.Scope, clientProvider func() (databroker.DataBrokerServiceClient, error), -) metric.Producer { - return &producer{ +) *Producer { + return &Producer{ clientProvider: clientProvider, scope: scope, } } -func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { +func (p *Producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + if !p.enabled.Load() { + return nil, nil + } + client, err := p.clientProvider() if err != nil { return nil, fmt.Errorf("error getting client: %w", err) @@ -73,3 +78,7 @@ func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, erro }, }, nil } + +func (p *Producer) SetEnabled(enabled bool) { + p.enabled.Store(enabled) +}