diff --git a/config/envoyconfig/outbound.go b/config/envoyconfig/outbound.go index 400de258c..2c44f95c2 100644 --- a/config/envoyconfig/outbound.go +++ b/config/envoyconfig/outbound.go @@ -132,5 +132,19 @@ func (b *Builder) buildOutboundRoutes() []*envoy_config_route_v3.Route { }) } } + routes = append(routes, &envoy_config_route_v3.Route{ + Name: "envoy-metrics", + Match: &envoy_config_route_v3.RouteMatch{ + PathSpecifier: &envoy_config_route_v3.RouteMatch_Prefix{Prefix: "/envoy/stats/prometheus"}, + }, + Action: &envoy_config_route_v3.Route_Route{ + Route: &envoy_config_route_v3.RouteAction{ + ClusterSpecifier: &envoy_config_route_v3.RouteAction_Cluster{ + Cluster: envoyAdminClusterName, + }, + PrefixRewrite: "/stats/prometheus", + }, + }, + }) return routes } diff --git a/config/envoyconfig/outbound_test.go b/config/envoyconfig/outbound_test.go index 960c64195..2a12488c2 100644 --- a/config/envoyconfig/outbound_test.go +++ b/config/envoyconfig/outbound_test.go @@ -61,6 +61,16 @@ func Test_buildOutboundRoutes(t *testing.T) { "idleTimeout": "0s", "timeout": "0s" } + }, + { + "match": { + "prefix": "/envoy/stats/prometheus" + }, + "name": "envoy-metrics", + "route": { + "cluster": "pomerium-envoy-admin", + "prefixRewrite": "/stats/prometheus" + } } ]`, routes) } diff --git a/go.mod b/go.mod index 7ad8f8ce1..46251d846 100644 --- a/go.mod +++ b/go.mod @@ -192,6 +192,7 @@ require ( github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/tchap/go-patricia/v2 v2.3.1 // indirect github.com/tinylib/msgp v1.1.2 // indirect diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go index 008da0b39..509d75ce9 100644 --- a/internal/retry/backoff.go +++ b/internal/retry/backoff.go @@ -2,16 +2,27 @@ package retry import ( "context" + "strings" + "time" "github.com/cenkalti/backoff/v4" + + "github.com/pomerium/pomerium/internal/log" ) +type serviceName struct{} + // WithBackoff retries the given function with an exponential backoff, // stopping when the context is done or the function returns a terminal error. -func WithBackoff(ctx context.Context, fn func(context.Context) error) error { +func WithBackoff(ctx context.Context, name string, fn func(context.Context) error) error { + name, ctx = getServiceNameContext(ctx, name) + + log.Debug(ctx).Str("service-name", name).Msg("starting") + defer log.Debug(ctx).Str("service-name", name).Msg("stopped") + b := backoff.NewExponentialBackOff() b.MaxElapsedTime = 0 - return backoff.Retry( + return backoff.RetryNotify( func() error { err := fn(ctx) if IsTerminalError(err) { @@ -20,5 +31,18 @@ func WithBackoff(ctx context.Context, fn func(context.Context) error) error { return err }, backoff.WithContext(b, ctx), + func(err error, next time.Duration) { + log.Warn(ctx).Err(err).Str("service-name", name).Dur("next", next).Msg("retrying") + }, ) } + +func getServiceNameContext(ctx context.Context, name string) (string, context.Context) { + names, ok := ctx.Value(serviceName{}).([]string) + if ok { + names = append(names, name) + } else { + names = []string{name} + } + return strings.Join(names, "."), context.WithValue(ctx, serviceName{}, names) +} diff --git a/internal/telemetry/prometheus/producer.go b/internal/telemetry/prometheus/producer.go new file mode 100644 index 000000000..d90f2de45 --- /dev/null +++ b/internal/telemetry/prometheus/producer.go @@ -0,0 +1,147 @@ +package prometheus + +import ( + "context" + "fmt" + "net/http" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +type producerConfig struct { + client *http.Client + scrapeURL string + scope instrumentation.Scope + startTime time.Time + metrics map[string]struct{} + labels map[string]struct{} +} + +type ProducerOption func(*producerConfig) + +func WithClient(client *http.Client) ProducerOption { + return func(cfg *producerConfig) { + cfg.client = client + } +} + +func WithScope(scope instrumentation.Scope) ProducerOption { + return func(cfg *producerConfig) { + cfg.scope = scope + } +} + +func WithStartTime(startTime time.Time) ProducerOption { + return func(cfg *producerConfig) { + cfg.startTime = startTime + } +} + +func WithIncludeMetrics(metrics ...string) ProducerOption { + return func(cfg *producerConfig) { + if cfg.metrics == nil { + cfg.metrics = make(map[string]struct{}, len(metrics)) + } + for _, metric := range metrics { + cfg.metrics[metric] = struct{}{} + } + } +} + +func WithIncludeLabels(labels ...string) ProducerOption { + return func(cfg *producerConfig) { + if cfg.labels == nil { + cfg.labels = make(map[string]struct{}, len(labels)) + } + for _, label := range labels { + cfg.labels[label] = struct{}{} + } + } +} + +func WithScrapeURL(scrapeURL string) ProducerOption { + return func(cfg *producerConfig) { + cfg.scrapeURL = scrapeURL + } +} + +func newProducerConfig(opts ...ProducerOption) *producerConfig { + cfg := &producerConfig{ + client: http.DefaultClient, + } + for _, opt := range opts { + opt(cfg) + } + return cfg +} + +type Producer struct { + producerConfig atomic.Value +} + +func NewProducer(opts ...ProducerOption) *Producer { + cfg := newProducerConfig(opts...) + + p := new(Producer) + p.setConfig(cfg) + return p +} + +func (p *Producer) UpdateConfig(opts ...ProducerOption) { + cfg := *p.loadConfig() + for _, opt := range opts { + opt(&cfg) + } + p.setConfig(&cfg) +} + +func (p *Producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + cfg := p.loadConfig() + + if len(cfg.metrics) == 0 { + return nil, nil + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, cfg.scrapeURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + resp, err := cfg.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to do request: %w", err) + } + defer resp.Body.Close() + metrics, err := ToOTLP(resp.Body, filter(cfg.metrics), filter(cfg.labels), cfg.startTime, time.Now()) + if err != nil { + return nil, fmt.Errorf("failed to convert metrics to OTLP: %w", err) + } + return []metricdata.ScopeMetrics{ + { + Scope: cfg.scope, + Metrics: metrics, + }, + }, nil +} + +func (p *Producer) setConfig(cfg *producerConfig) { + p.producerConfig.Store(cfg) +} + +func (p *Producer) loadConfig() *producerConfig { + return p.producerConfig.Load().(*producerConfig) +} + +func filter(src map[string]struct{}) func(k string) (string, bool) { + return func(k string) (string, bool) { + if len(src) == 0 { + return k, true + } + if _, ok := src[k]; ok { + return k, true + } + return "", false + } +} diff --git a/internal/zero/connect-mux/messages.go b/internal/zero/connect-mux/messages.go index 7255e6094..c12f71d9d 100644 --- a/internal/zero/connect-mux/messages.go +++ b/internal/zero/connect-mux/messages.go @@ -3,6 +3,9 @@ package mux import ( "context" "fmt" + "strings" + + "google.golang.org/protobuf/encoding/protojson" "github.com/pomerium/pomerium/internal/zero/apierror" "github.com/pomerium/pomerium/pkg/zero/connect" @@ -64,6 +67,19 @@ type message struct { *connect.Message } +func (msg message) String() string { + var b strings.Builder + if msg.stateChange != nil { + b.WriteString("stateChange: ") + b.WriteString(string(*msg.stateChange)) + } + if msg.Message != nil { + b.WriteString("message: ") + b.WriteString(protojson.Format(msg.Message)) + } + return b.String() +} + type stateChange string const ( diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index db1f878eb..624f31f19 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -5,20 +5,24 @@ import ( "context" "errors" "fmt" + "net" + "net/url" + "time" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/retry" sdk "github.com/pomerium/pomerium/internal/zero/api" "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/internal/zero/bootstrap/writers" - connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" + "github.com/pomerium/pomerium/internal/zero/healthcheck" "github.com/pomerium/pomerium/internal/zero/reconciler" - "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" + "github.com/pomerium/pomerium/internal/zero/telemetry" + "github.com/pomerium/pomerium/internal/zero/telemetry/sessions" "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/grpc/databroker" - "github.com/pomerium/pomerium/pkg/zero/connect" ) // Run runs Pomerium is managed mode using the provided token. @@ -67,8 +71,7 @@ type controller struct { api *sdk.API - bootstrapConfig *bootstrap.Source - telemetryReporter *reporter.Reporter + bootstrapConfig *bootstrap.Source } func (c *controller) initAPI(ctx context.Context) error { @@ -128,47 +131,70 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error { return fmt.Errorf("waiting for config source to be ready: %w", err) } - r := c.NewDatabrokerRestartRunner(ctx) + r := NewDatabrokerRestartRunner(ctx, c.bootstrapConfig) defer r.Close() - err = c.initTelemetry(ctx, func() (databroker.DataBrokerServiceClient, error) { - client, _, err := r.getDatabrokerClient() - return client, err - }) + var leaseStatus LeaseStatus + tm, err := telemetry.New(ctx, c.api, + r.GetDatabrokerClient, + leaseStatus.HasLease, + c.getEnvoyScrapeURL(), + ) if err != nil { return fmt.Errorf("init telemetry: %w", err) } - defer c.shutdownTelemetry(ctx) - - err = c.api.Watch(ctx, connect_mux.WithOnTelemetryRequested(func(ctx context.Context, _ *connect.TelemetryRequest) { - c.telemetryReporter.CollectAndExportMetrics(ctx) - })) - if err != nil { - return fmt.Errorf("watch telemetry: %w", err) - } + defer c.shutdownTelemetry(ctx, tm) eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { return tm.Run(ctx) }) eg.Go(func() error { return r.Run(ctx, WithLease( c.runReconcilerLeased, c.runSessionAnalyticsLeased, - c.enableSessionAnalyticsReporting, - c.runHealthChecksLeased, + c.runPeriodicHealthChecksLeased, + leaseStatus.MonitorLease, ), ) }) - eg.Go(func() error { return c.runTelemetryReporter(ctx) }) return eg.Wait() } -func (c *controller) runReconcilerLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-reconciler") - }) +func (c *controller) shutdownTelemetry(ctx context.Context, tm *telemetry.Telemetry) { + ctx, cancel := context.WithTimeout(ctx, c.cfg.shutdownTimeout) + defer cancel() - return reconciler.Run(ctx, - reconciler.WithAPI(c.api), - reconciler.WithDataBrokerClient(client), - ) + err := tm.Shutdown(ctx) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("error shutting down telemetry") + } +} + +func (c *controller) runReconcilerLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { + return retry.WithBackoff(ctx, "zero-reconciler", func(ctx context.Context) error { + return reconciler.Run(ctx, + reconciler.WithAPI(c.api), + reconciler.WithDataBrokerClient(client), + ) + }) +} + +func (c *controller) runSessionAnalyticsLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { + return retry.WithBackoff(ctx, "zero-analytics", func(ctx context.Context) error { + return sessions.Collect(ctx, client, time.Hour) + }) +} + +func (c *controller) runPeriodicHealthChecksLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { + return retry.WithBackoff(ctx, "zero-healthcheck", func(ctx context.Context) error { + return healthcheck.RunChecks(ctx, c.bootstrapConfig, client) + }) +} + +func (c *controller) getEnvoyScrapeURL() string { + return (&url.URL{ + Scheme: "http", + Host: net.JoinHostPort("localhost", c.bootstrapConfig.GetConfig().OutboundPort), + Path: "/envoy/stats/prometheus", + }).String() } diff --git a/internal/zero/controller/runner.go b/internal/zero/controller/databroker_restart.go similarity index 64% rename from internal/zero/controller/runner.go rename to internal/zero/controller/databroker_restart.go index f9886bd4b..78bc56afa 100644 --- a/internal/zero/controller/runner.go +++ b/internal/zero/controller/databroker_restart.go @@ -9,13 +9,10 @@ import ( "net/url" "sync" - "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/pomerium/pomerium/config" - "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/retry" - "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpcutil" ) @@ -23,9 +20,7 @@ import ( // ErrBootstrapConfigurationChanged is returned when the bootstrap configuration has changed and the function needs to be restarted. var ErrBootstrapConfigurationChanged = errors.New("bootstrap configuration changed") -type runner struct { - source *bootstrap.Source - +type DatabrokerRestartRunner struct { lock sync.RWMutex cancel chan struct{} conn *grpc.ClientConn @@ -33,31 +28,39 @@ type runner struct { initError error } -func (c *controller) NewDatabrokerRestartRunner(ctx context.Context) *runner { - p := &runner{ - source: c.bootstrapConfig, - } - p.initLocked(ctx, c.bootstrapConfig.GetConfig()) - c.bootstrapConfig.OnConfigChange(context.Background(), p.onConfigChange) +// NewDatabrokerRestartRunner is a helper to run a function that needs to be restarted when the underlying databroker configuration changes. +func NewDatabrokerRestartRunner( + ctx context.Context, + src config.Source, +) *DatabrokerRestartRunner { + p := new(DatabrokerRestartRunner) + p.initLocked(ctx, src.GetConfig()) + src.OnConfigChange(ctx, p.onConfigChange) return p } -type DbcFunc func(context.Context, databroker.DataBrokerServiceClient) error - -func (p *runner) Run(ctx context.Context, funcs ...DbcFunc) error { - return retry.WithBackoff(ctx, func(ctx context.Context) error { return p.runUntilDatabrokerChanges(ctx, funcs...) }) +func (p *DatabrokerRestartRunner) Run( + ctx context.Context, + fn func(context.Context, databroker.DataBrokerServiceClient) error, +) error { + return retry.WithBackoff(ctx, "databroker-restart", func(ctx context.Context) error { return p.runUntilDatabrokerChanges(ctx, fn) }) } // Close releases the resources used by the databroker provider. -func (p *runner) Close() { +func (p *DatabrokerRestartRunner) Close() { p.lock.Lock() defer p.lock.Unlock() p.closeLocked() } +func (p *DatabrokerRestartRunner) GetDatabrokerClient() (databroker.DataBrokerServiceClient, error) { + client, _, err := p.getDatabrokerClient() + return client, err +} + // GetDatabrokerClient returns the databroker client and a channel that will be closed when the client is no longer valid. -func (p *runner) getDatabrokerClient() (databroker.DataBrokerServiceClient, <-chan struct{}, error) { +func (p *DatabrokerRestartRunner) getDatabrokerClient() (databroker.DataBrokerServiceClient, <-chan struct{}, error) { p.lock.RLock() defer p.lock.RUnlock() @@ -68,7 +71,7 @@ func (p *runner) getDatabrokerClient() (databroker.DataBrokerServiceClient, <-ch return p.client, p.cancel, nil } -func (p *runner) onConfigChange(ctx context.Context, cfg *config.Config) { +func (p *DatabrokerRestartRunner) onConfigChange(ctx context.Context, cfg *config.Config) { p.lock.Lock() defer p.lock.Unlock() @@ -76,7 +79,7 @@ func (p *runner) onConfigChange(ctx context.Context, cfg *config.Config) { p.initLocked(ctx, cfg) } -func (p *runner) initLocked(ctx context.Context, cfg *config.Config) { +func (p *DatabrokerRestartRunner) initLocked(ctx context.Context, cfg *config.Config) { conn, err := newDataBrokerConnection(ctx, cfg) if err != nil { p.initError = fmt.Errorf("databroker connection: %w", err) @@ -89,7 +92,7 @@ func (p *runner) initLocked(ctx context.Context, cfg *config.Config) { p.initError = nil } -func (p *runner) closeLocked() { +func (p *DatabrokerRestartRunner) closeLocked() { if p.conn != nil { p.conn.Close() p.conn = nil @@ -101,13 +104,10 @@ func (p *runner) closeLocked() { p.initError = errors.New("databroker connection closed") } -func (p *runner) runUntilDatabrokerChanges( +func (p *DatabrokerRestartRunner) runUntilDatabrokerChanges( ctx context.Context, - funcs ...DbcFunc, + fn func(context.Context, databroker.DataBrokerServiceClient) error, ) error { - log.Debug(ctx).Msg("starting") - defer log.Debug(ctx).Msg("stop") - client, cancelCh, err := p.getDatabrokerClient() if err != nil { return fmt.Errorf("get databroker client: %w", err) @@ -120,18 +120,11 @@ func (p *runner) runUntilDatabrokerChanges( select { case <-ctx.Done(): case <-cancelCh: - log.Debug(ctx).Msg("bootstrap configuration changed, restarting...") cancel(ErrBootstrapConfigurationChanged) } }() - eg, ctx := errgroup.WithContext(ctx) - for _, fn := range funcs { - eg.Go(func() error { - return retry.WithBackoff(ctx, func(ctx context.Context) error { return fn(ctx, client) }) - }) - } - return eg.Wait() + return fn(ctx, client) } func newDataBrokerConnection(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, error) { diff --git a/internal/zero/controller/databroker_restart_test.go b/internal/zero/controller/databroker_restart_test.go new file mode 100644 index 000000000..11bbd7f1e --- /dev/null +++ b/internal/zero/controller/databroker_restart_test.go @@ -0,0 +1,113 @@ +package controller_test + +import ( + "context" + "encoding/base64" + "errors" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/internal/zero/controller" + "github.com/pomerium/pomerium/pkg/cryptutil" + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +type mockConfigSource struct { + mock.Mock + config.Source +} + +func (s *mockConfigSource) GetConfig() *config.Config { + args := s.Called() + return args.Get(0).(*config.Config) +} + +func (s *mockConfigSource) OnConfigChange(ctx context.Context, cl config.ChangeListener) { + s.Called(ctx, cl) +} + +func TestDatabrokerRestart(t *testing.T) { + t.Parallel() + + newConfig := func() *config.Config { + return &config.Config{ + Options: &config.Options{ + SharedKey: base64.StdEncoding.EncodeToString(cryptutil.NewKey()), + }, + GRPCPort: ":12345", + } + } + + t.Run("no error", func(t *testing.T) { + t.Parallel() + + src := new(mockConfigSource) + src.On("OnConfigChange", mock.Anything, mock.Anything).Once() + src.On("GetConfig").Once().Return(newConfig()) + + ctx := context.Background() + r := controller.NewDatabrokerRestartRunner(ctx, src) + defer r.Close() + + err := r.Run(ctx, func(_ context.Context, _ databroker.DataBrokerServiceClient) error { + return nil + }) + require.NoError(t, err) + }) + t.Run("error, retry", func(t *testing.T) { + t.Parallel() + + src := new(mockConfigSource) + src.On("OnConfigChange", mock.Anything, mock.Anything).Once() + src.On("GetConfig").Once().Return(newConfig()) + + ctx := context.Background() + r := controller.NewDatabrokerRestartRunner(ctx, src) + defer r.Close() + + count := 0 + err := r.Run(ctx, func(_ context.Context, _ databroker.DataBrokerServiceClient) error { + count++ + if count == 1 { + return errors.New("simulated error") + } + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, count) + }) + t.Run("config changed, execution restarted", func(t *testing.T) { + t.Parallel() + + src := new(mockConfigSource) + var cl config.ChangeListener + src.On("OnConfigChange", mock.Anything, mock.Anything).Once().Run(func(args mock.Arguments) { + cl = args.Get(1).(config.ChangeListener) + }) + src.On("GetConfig").Once().Return(newConfig()) + + ctx := context.Background() + r := controller.NewDatabrokerRestartRunner(ctx, src) + defer r.Close() + + count := 0 + var clients [2]databroker.DataBrokerServiceClient + err := r.Run(ctx, func(ctx context.Context, client databroker.DataBrokerServiceClient) error { + clients[count] = client + count++ + if count == 1 { + cl(context.Background(), newConfig()) + <-ctx.Done() + require.ErrorIs(t, context.Cause(ctx), controller.ErrBootstrapConfigurationChanged) + return ctx.Err() + } + return nil + }) + require.NoError(t, err) + require.Equal(t, 2, count) + require.NotEqual(t, clients[0], clients[1]) + }) +} diff --git a/internal/zero/controller/leaser.go b/internal/zero/controller/leaser.go index cc295ab1d..de3303e7d 100644 --- a/internal/zero/controller/leaser.go +++ b/internal/zero/controller/leaser.go @@ -2,18 +2,17 @@ package controller import ( "context" + "sync/atomic" "time" "golang.org/x/sync/errgroup" - "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/internal/retry" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) type leaser struct { client databroker.DataBrokerServiceClient - funcs []DbcFunc + funcs []func(context.Context, databroker.DataBrokerServiceClient) error } // GetDataBrokerServiceClient implements the databroker.LeaseHandler interface. @@ -23,20 +22,18 @@ func (c *leaser) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient // RunLeased implements the databroker.LeaseHandler interface. func (c *leaser) RunLeased(ctx context.Context) error { - log.Debug(ctx).Msg("leaser: running leased functions") - eg, ctx := errgroup.WithContext(ctx) for _, fn := range c.funcs { - eg.Go(func() error { - return retry.WithBackoff(ctx, func(ctx context.Context) error { return fn(ctx, c.client) }) - }) + fn := fn + eg.Go(func() error { return fn(ctx, c.client) }) } err := eg.Wait() - log.Debug(ctx).Err(err).Msg("leaser: done running leased functions") return err } -func WithLease(funcs ...DbcFunc) DbcFunc { +func WithLease( + funcs ...func(context.Context, databroker.DataBrokerServiceClient) error, +) func(context.Context, databroker.DataBrokerServiceClient) error { return func(ctx context.Context, client databroker.DataBrokerServiceClient) error { srv := &leaser{ client: client, @@ -46,3 +43,18 @@ func WithLease(funcs ...DbcFunc) DbcFunc { return leaser.Run(ctx) } } + +type LeaseStatus struct { + v atomic.Bool +} + +func (w *LeaseStatus) HasLease() bool { + return w.v.Load() +} + +func (w *LeaseStatus) MonitorLease(ctx context.Context, _ databroker.DataBrokerServiceClient) error { + w.v.Store(true) + <-ctx.Done() + w.v.Store(false) + return ctx.Err() +} diff --git a/internal/zero/controller/telemetry.go b/internal/zero/controller/telemetry.go deleted file mode 100644 index ac1b62b91..000000000 --- a/internal/zero/controller/telemetry.go +++ /dev/null @@ -1,79 +0,0 @@ -package controller - -import ( - "context" - "fmt" - "time" - - "github.com/rs/zerolog" - "go.opentelemetry.io/otel/sdk/instrumentation" - - "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/internal/zero/healthcheck" - "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" - "github.com/pomerium/pomerium/internal/zero/telemetry/sessions" - "github.com/pomerium/pomerium/pkg/grpc/databroker" - "github.com/pomerium/pomerium/pkg/health" -) - -const ( - producerSessionAnalytics = "session-analytics" -) - -func (c *controller) initTelemetry(ctx context.Context, clientProvider func() (databroker.DataBrokerServiceClient, error)) error { - sessionMetricProducer := sessions.NewProducer(instrumentation.Scope{}, clientProvider) - r, err := reporter.New(ctx, c.api.GetTelemetryConn(), - reporter.WithProducer(producerSessionAnalytics, sessionMetricProducer), - ) - if err != nil { - return fmt.Errorf("error creating telemetry metrics reporter: %w", err) - } - c.telemetryReporter = r - return nil -} - -func (c *controller) shutdownTelemetry(ctx context.Context) { - ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), c.cfg.shutdownTimeout) - defer cancel() - - err := c.telemetryReporter.Shutdown(ctx) - if err != nil { - log.Warn(ctx).Err(err).Msg("telemetry reporter shutdown error") - } -} - -func (c *controller) runSessionAnalyticsLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-analytics") - }) - - return sessions.Collect(ctx, client, time.Hour) -} - -// those metrics are cluster-wide, so we only enable their reporting when we have the lease -func (c *controller) enableSessionAnalyticsReporting(ctx context.Context, _ databroker.DataBrokerServiceClient) error { - _ = c.telemetryReporter.SetMetricProducerEnabled(producerSessionAnalytics, true) - defer func() { _ = c.telemetryReporter.SetMetricProducerEnabled(producerSessionAnalytics, false) }() - - <-ctx.Done() - return nil -} - -func (c *controller) runHealthChecksLeased(ctx context.Context, client databroker.DataBrokerServiceClient) error { - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-health-checks") - }) - - return healthcheck.RunChecks(ctx, c.bootstrapConfig, client) -} - -func (c *controller) runTelemetryReporter(ctx context.Context) error { - health.SetProvider(c.telemetryReporter) - defer health.SetProvider(nil) - - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("service", "zero-bootstrap") - }) - - return c.telemetryReporter.Run(ctx) -} diff --git a/internal/zero/telemetry/metrics_producer.go b/internal/zero/telemetry/metrics_producer.go new file mode 100644 index 000000000..b5e6d3600 --- /dev/null +++ b/internal/zero/telemetry/metrics_producer.go @@ -0,0 +1,54 @@ +package telemetry + +import ( + "context" + "sync/atomic" + + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/pomerium/pomerium/internal/log" +) + +// metricsProducer is a wrapper around a metric producer that can be enabled or disabled +type metricsProducer[P metric.Producer] struct { + enabled atomic.Bool + name string + producer P +} + +func newMetricsProducer[P metric.Producer](name string, p P) *metricsProducer[P] { + return &metricsProducer[P]{ + name: name, + producer: p, + } +} + +// Produce wraps the underlying producer's Produce method and logs any errors, +// to prevent the error from blocking the export of other metrics. +// also checks if the producer is enabled before producing metrics +func (p *metricsProducer[P]) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { + if enabled := p.enabled.Load(); !enabled { + return nil, nil + } + + data, err := p.producer.Produce(ctx) + if err != nil { + // we do not return the error here, as we do not want to block the export of other metrics + log.Error(ctx).Err(err).Str("producer", p.name).Msg("failed to produce metrics") + return nil, nil + } + return data, nil +} + +func (p *metricsProducer[P]) SetEnabled(v bool) { + p.enabled.Store(v) +} + +func (p *metricsProducer[P]) Name() string { + return p.name +} + +func (p *metricsProducer[P]) Producer() P { + return p.producer +} diff --git a/internal/zero/telemetry/reporter/config.go b/internal/zero/telemetry/reporter/config.go index 3c9b29e5f..32302664e 100644 --- a/internal/zero/telemetry/reporter/config.go +++ b/internal/zero/telemetry/reporter/config.go @@ -5,25 +5,20 @@ import ( ) type config struct { - producers map[string]*metricsProducer + producers []metric.Producer } type Option func(*config) // WithProducer adds a metric producer to the reporter -func WithProducer(name string, p metric.Producer) Option { +func WithProducer(p metric.Producer) Option { return func(c *config) { - if _, ok := c.producers[name]; ok { - panic("duplicate producer name " + name) - } - c.producers[name] = newProducer(name, p) + c.producers = append(c.producers, p) } } func getConfig(opts ...Option) config { - c := config{ - producers: make(map[string]*metricsProducer), - } + var c config for _, opt := range opts { opt(&c) } diff --git a/internal/zero/telemetry/reporter/metrics_producer.go b/internal/zero/telemetry/reporter/metrics_producer.go deleted file mode 100644 index d3070c804..000000000 --- a/internal/zero/telemetry/reporter/metrics_producer.go +++ /dev/null @@ -1,47 +0,0 @@ -package reporter - -import ( - "context" - "sync/atomic" - - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - - "github.com/pomerium/pomerium/internal/log" -) - -type metricsProducer struct { - enabled atomic.Bool - name string - metric.Producer -} - -func newProducer(name string, p metric.Producer) *metricsProducer { - return &metricsProducer{ - name: name, - Producer: p, - } -} - -var _ metric.Producer = (*metricsProducer)(nil) - -// Produce wraps the underlying producer's Produce method and logs any errors, -// to prevent the error from blocking the export of other metrics. -// also checks if the producer is enabled before producing metrics -func (p *metricsProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { - if enabled := p.enabled.Load(); !enabled { - return nil, nil - } - - data, err := p.Producer.Produce(ctx) - if err != nil { - log.Error(ctx).Err(err).Str("producer", p.name).Msg("failed to produce metrics") - return nil, err - } - return data, nil -} - -// SetEnabled sets the enabled state of the producer -func (p *metricsProducer) SetEnabled(v bool) { - p.enabled.Store(v) -} diff --git a/internal/zero/telemetry/reporter/metrics_reporter.go b/internal/zero/telemetry/reporter/metrics_reporter.go index e36e8c38f..7807371cd 100644 --- a/internal/zero/telemetry/reporter/metrics_reporter.go +++ b/internal/zero/telemetry/reporter/metrics_reporter.go @@ -6,38 +6,38 @@ import ( "fmt" export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - metric_sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/grpc" - - "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/pkg/health" ) type metricsReporter struct { exporter *export_grpc.Exporter resource *resource.Resource - reader *metric_sdk.ManualReader - producers map[string]*metricsProducer - singleTask + reader *metric.ManualReader + producers []metric.Producer } func newMetricsReporter( ctx context.Context, conn *grpc.ClientConn, resource *resource.Resource, - producers map[string]*metricsProducer, + producers []metric.Producer, ) (*metricsReporter, error) { exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn)) if err != nil { return nil, fmt.Errorf("create exporter: %w", err) } - readerOpts := make([]metric_sdk.ManualReaderOption, 0, len(producers)) + readerOpts := make([]metric.ManualReaderOption, 0, len(producers)) for _, p := range producers { - readerOpts = append(readerOpts, metric_sdk.WithProducer(p)) + readerOpts = append(readerOpts, metric.WithProducer(p)) } - reader := metric_sdk.NewManualReader(readerOpts...) + reader := metric.NewManualReader(readerOpts...) + _ = metric.NewMeterProvider( + metric.WithResource(resource), + metric.WithReader(reader), + ) return &metricsReporter{ exporter: exporter, resource: resource, @@ -58,40 +58,16 @@ func (r *metricsReporter) Shutdown(ctx context.Context) error { ) } -func (r *metricsReporter) SetMetricProducerEnabled(name string, enabled bool) error { - p, ok := r.producers[name] - if !ok { - return fmt.Errorf("producer %q not found", name) - } - p.SetEnabled(enabled) - return nil -} - -func (r *metricsReporter) CollectAndExportMetrics(ctx context.Context) { - r.singleTask.Run(ctx, func(ctx context.Context) { - err := r.collectAndExport(ctx) - if errors.Is(err, ErrAnotherExecutionRequested) { - log.Warn(ctx).Msg("telemetry metrics were not sent, due to another execution requested") - return - } - if err != nil { - health.ReportError(health.CollectAndSendTelemetry, err) - } else { - health.ReportOK(health.CollectAndSendTelemetry) - } - }) -} - -func (r *metricsReporter) collectAndExport(ctx context.Context) error { +func (r *metricsReporter) CollectAndExportMetrics(ctx context.Context) error { rm := &metricdata.ResourceMetrics{ Resource: r.resource, } - err := withBackoff(ctx, "collect metrics", func(ctx context.Context) error { return r.reader.Collect(ctx, rm) }) + err := r.reader.Collect(ctx, rm) if err != nil { return fmt.Errorf("collect metrics: %w", err) } - err = withBackoff(ctx, "export metrics", func(ctx context.Context) error { return r.exporter.Export(ctx, rm) }) + err = r.exporter.Export(ctx, rm) if err != nil { return fmt.Errorf("export metrics: %w", err) } diff --git a/internal/zero/telemetry/reporter/reporter.go b/internal/zero/telemetry/reporter/reporter.go index e32418a88..64e789841 100644 --- a/internal/zero/telemetry/reporter/reporter.go +++ b/internal/zero/telemetry/reporter/reporter.go @@ -5,16 +5,14 @@ import ( "context" "fmt" "os" - "time" - "github.com/cenkalti/backoff/v4" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/retry" "github.com/pomerium/pomerium/internal/version" ) @@ -52,8 +50,8 @@ func New( func (r *Reporter) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { return withBackoff(ctx, "metrics reporter", r.metricsReporter.Run) }) - eg.Go(func() error { return withBackoff(ctx, "health check reporter", r.healthCheckReporter.Run) }) + eg.Go(func() error { return retry.WithBackoff(ctx, "metrics reporter", r.metricsReporter.Run) }) + eg.Go(func() error { return retry.WithBackoff(ctx, "health check reporter", r.healthCheckReporter.Run) }) return eg.Wait() } @@ -81,19 +79,3 @@ func getResource() *resource.Resource { return resource.NewSchemaless(attr...) } - -func withBackoff(ctx context.Context, name string, f func(context.Context) error) error { - bo := backoff.NewExponentialBackOff() - bo.MaxElapsedTime = 0 - return backoff.RetryNotify( - func() error { return f(ctx) }, - backoff.WithContext(bo, ctx), - func(err error, d time.Duration) { - log.Warn(ctx). - Str("name", name). - Err(err). - Dur("backoff", d). - Msg("retrying") - }, - ) -} diff --git a/internal/zero/telemetry/reporter/singletask.go b/internal/zero/telemetry/reporter/singletask.go deleted file mode 100644 index 303f59619..000000000 --- a/internal/zero/telemetry/reporter/singletask.go +++ /dev/null @@ -1,27 +0,0 @@ -package reporter - -import ( - "context" - "errors" - "sync" -) - -type singleTask struct { - lock sync.Mutex - cancel context.CancelCauseFunc -} - -var ErrAnotherExecutionRequested = errors.New("another execution requested") - -func (s *singleTask) Run(ctx context.Context, f func(context.Context)) { - s.lock.Lock() - defer s.lock.Unlock() - - if s.cancel != nil { - s.cancel(ErrAnotherExecutionRequested) - } - - ctx, cancel := context.WithCancelCause(ctx) - s.cancel = cancel - go f(ctx) -} diff --git a/internal/zero/telemetry/sessions/producer.go b/internal/zero/telemetry/sessions/producer.go index cc5b57a57..d30445169 100644 --- a/internal/zero/telemetry/sessions/producer.go +++ b/internal/zero/telemetry/sessions/producer.go @@ -6,14 +6,16 @@ import ( "time" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/slices" ) -type producer struct { +type Producer struct { scope instrumentation.Scope clientProvider func() (databroker.DataBrokerServiceClient, error) } @@ -21,14 +23,14 @@ type producer struct { func NewProducer( scope instrumentation.Scope, clientProvider func() (databroker.DataBrokerServiceClient, error), -) metric.Producer { - return &producer{ +) *Producer { + return &Producer{ clientProvider: clientProvider, scope: scope, } } -func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { +func (p *Producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) { client, err := p.clientProvider() if err != nil { return nil, fmt.Errorf("error getting client: %w", err) @@ -43,6 +45,9 @@ func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, erro eg.Go(func() error { state, err := LoadMetricState(ctx, client, ids[i]) if err != nil { + if status.Code(err) == codes.NotFound { + return nil + } return err } metrics[i] = metricdata.Metrics{ @@ -66,6 +71,11 @@ func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, erro return nil, err } + metrics = slices.Filter(metrics, func(v metricdata.Metrics) bool { return v.Name != "" }) + if len(metrics) == 0 { + return nil, nil + } + return []metricdata.ScopeMetrics{ { Scope: p.scope, diff --git a/internal/zero/telemetry/telemetry.go b/internal/zero/telemetry/telemetry.go new file mode 100644 index 000000000..5bce1f711 --- /dev/null +++ b/internal/zero/telemetry/telemetry.go @@ -0,0 +1,146 @@ +package telemetry + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog" + "go.opentelemetry.io/otel/sdk/instrumentation" + "golang.org/x/sync/errgroup" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/prometheus" + sdk "github.com/pomerium/pomerium/internal/zero/api" + connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" + "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" + "github.com/pomerium/pomerium/internal/zero/telemetry/sessions" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/health" + "github.com/pomerium/pomerium/pkg/zero/connect" +) + +type Telemetry struct { + api *sdk.API + reporter *reporter.Reporter + + envoyMetrics *metricsProducer[*prometheus.Producer] + sessionMetrics *metricsProducer[*sessions.Producer] + hasSessionMetricsLease func() bool +} + +func New( + ctx context.Context, + api *sdk.API, + clientProvider func() (databroker.DataBrokerServiceClient, error), + hasSessionMetricsLease func() bool, + envoyScrapeURL string, +) (*Telemetry, error) { + startTime := time.Now() + + sessionMetricProducer := newMetricsProducer("sessions", buildSessionMetricsProducer(clientProvider)) + envoyMetricProducer := newMetricsProducer("envoy", buildEnvoyMetricsProducer(envoyScrapeURL, startTime)) + + r, err := reporter.New(ctx, api.GetTelemetryConn(), + reporter.WithProducer(sessionMetricProducer), + reporter.WithProducer(envoyMetricProducer), + ) + if err != nil { + return nil, fmt.Errorf("error creating telemetry metrics reporter: %w", err) + } + + return &Telemetry{ + api: api, + reporter: r, + sessionMetrics: sessionMetricProducer, + envoyMetrics: envoyMetricProducer, + hasSessionMetricsLease: hasSessionMetricsLease, + }, nil +} + +func (srv *Telemetry) Shutdown(ctx context.Context) error { + return srv.reporter.Shutdown(ctx) +} + +func (srv *Telemetry) Run(ctx context.Context) error { + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("service", "telemetry-reporter") + }) + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + health.SetProvider(srv.reporter) + defer health.SetProvider(nil) + return srv.reporter.Run(ctx) + }) + eg.Go(func() error { return srv.handleRequests(ctx) }) + return eg.Wait() +} + +// handleRequests watches for telemetry requests as they are received from the cloud control plane and processes them. +func (srv *Telemetry) handleRequests(ctx context.Context) error { + requests := make(chan *connect.TelemetryRequest, 1) + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return srv.api.Watch(ctx, connect_mux.WithOnTelemetryRequested( + func(ctx context.Context, req *connect.TelemetryRequest) { + select { + case requests <- req: + default: + log.Warn(ctx).Msg("dropping telemetry request") + } + })) + }) + eg.Go(func() error { + for { + select { + case req := <-requests: + srv.handleRequest(ctx, req) + case <-ctx.Done(): + return ctx.Err() + } + } + }) + return eg.Wait() +} + +func (srv *Telemetry) handleRequest(ctx context.Context, req *connect.TelemetryRequest) { + srv.configureEnvoyMetricsProducer(req.GetEnvoyMetrics()) + srv.configureSessionMetricsProducer(req.GetSessionAnalytics()) + + err := srv.reporter.CollectAndExportMetrics(ctx) + if err != nil { + health.ReportError(health.CollectAndSendTelemetry, err) + } else { + health.ReportOK(health.CollectAndSendTelemetry) + } +} + +func buildSessionMetricsProducer(clientProvider func() (databroker.DataBrokerServiceClient, error)) *sessions.Producer { + return sessions.NewProducer(instrumentation.Scope{Name: "pomerium-cluster"}, clientProvider) +} + +func buildEnvoyMetricsProducer(scrapeURL string, startTime time.Time) *prometheus.Producer { + return prometheus.NewProducer( + prometheus.WithScope(instrumentation.Scope{Name: "envoy"}), + prometheus.WithScrapeURL(scrapeURL), + prometheus.WithStartTime(startTime), + ) +} + +func (srv *Telemetry) configureSessionMetricsProducer(req *connect.SessionAnalyticsRequest) { + srv.sessionMetrics.SetEnabled(req != nil && srv.hasSessionMetricsLease()) +} + +func (srv *Telemetry) configureEnvoyMetricsProducer(req *connect.EnvoyMetricsRequest) { + if req == nil { + srv.envoyMetrics.SetEnabled(false) + return + } + srv.envoyMetrics.Producer().UpdateConfig( + prometheus.WithIncludeMetrics(req.GetMetrics()...), + prometheus.WithIncludeLabels(req.GetLabels()...), + ) + srv.envoyMetrics.SetEnabled(true) +}