From f67b33484bcbe8b1b948f7ad154e6b4ca1598707 Mon Sep 17 00:00:00 2001 From: Denis Mishin Date: Thu, 30 Jun 2022 10:52:45 -0400 Subject: [PATCH] add metrics aggregation (#3452) --- config/config.go | 27 ++- config/config_source.go | 6 +- config/metrics.go | 42 +++- internal/telemetry/metrics/providers.go | 222 ++++++++++++++----- internal/telemetry/metrics/providers_test.go | 3 +- pkg/cmd/pomerium/pomerium.go | 6 +- {internal => pkg}/netutil/netutil.go | 0 7 files changed, 229 insertions(+), 77 deletions(-) rename {internal => pkg}/netutil/netutil.go (100%) diff --git a/config/config.go b/config/config.go index 0b5c6d3db..42716b45a 100644 --- a/config/config.go +++ b/config/config.go @@ -4,9 +4,12 @@ import ( "crypto/tls" "github.com/pomerium/pomerium/internal/hashutil" - "github.com/pomerium/pomerium/internal/netutil" + "github.com/pomerium/pomerium/internal/telemetry/metrics" ) +// MetricsScrapeEndpoint defines additional metrics endpoints that would be scraped and exposed by pomerium +type MetricsScrapeEndpoint metrics.ScrapeEndpoint + // Config holds pomerium configuration options. type Config struct { Options *Options @@ -23,12 +26,21 @@ type Config struct { MetricsPort string // DebugPort is the port the debug listener is running on. DebugPort string + + // MetricsScrapeEndpoints additional metrics endpoints to scrape and provide part of metrics + MetricsScrapeEndpoints []MetricsScrapeEndpoint } // Clone creates a clone of the config. func (cfg *Config) Clone() *Config { newOptions := new(Options) - *newOptions = *cfg.Options + if cfg.Options != nil { + *newOptions = *cfg.Options + } + + endpoints := make([]MetricsScrapeEndpoint, len(cfg.MetricsScrapeEndpoints)) + _ = copy(endpoints, cfg.MetricsScrapeEndpoints) + return &Config{ Options: newOptions, AutoCertificates: cfg.AutoCertificates, @@ -39,6 +51,8 @@ func (cfg *Config) Clone() *Config { OutboundPort: cfg.OutboundPort, MetricsPort: cfg.MetricsPort, DebugPort: cfg.DebugPort, + + MetricsScrapeEndpoints: endpoints, } } @@ -61,17 +75,10 @@ func (cfg *Config) Checksum() uint64 { } // AllocatePorts populates -func (cfg *Config) AllocatePorts() error { - ports, err := netutil.AllocatePorts(5) - if err != nil { - return err - } - +func (cfg *Config) AllocatePorts(ports [5]string) { cfg.GRPCPort = ports[0] cfg.HTTPPort = ports[1] cfg.OutboundPort = ports[2] cfg.MetricsPort = ports[3] cfg.DebugPort = ports[4] - - return nil } diff --git a/config/config_source.go b/config/config_source.go index fd1d72bb1..25c3290d7 100644 --- a/config/config_source.go +++ b/config/config_source.go @@ -13,6 +13,7 @@ import ( "github.com/pomerium/pomerium/internal/fileutil" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry/metrics" + "github.com/pomerium/pomerium/pkg/netutil" ) // A ChangeListener is called when configuration changes. @@ -115,10 +116,13 @@ func NewFileOrEnvironmentSource( EnvoyVersion: envoyVersion, } - if err = cfg.AllocatePorts(); err != nil { + ports, err := netutil.AllocatePorts(5) + if err != nil { return nil, fmt.Errorf("allocating ports: %w", err) } + cfg.AllocatePorts(*(*[5]string)(ports)) + metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true) src := &FileOrEnvironmentSource{ diff --git a/config/metrics.go b/config/metrics.go index f671c2af9..d49756cac 100644 --- a/config/metrics.go +++ b/config/metrics.go @@ -5,7 +5,9 @@ import ( "net/http" "net/url" "os" + "reflect" "sync" + "time" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/middleware" @@ -15,6 +17,11 @@ import ( "github.com/rs/zerolog" ) +const ( + // defaultMetricsTimeout sets max time to collect and send aggregate pomerium metrics + defaultMetricsTimeout = time.Second * 30 +) + // A MetricsManager manages metrics for a given configuration. type MetricsManager struct { mu sync.RWMutex @@ -24,6 +31,7 @@ type MetricsManager struct { basicAuth string envoyAdminAddress string handler http.Handler + endpoints []MetricsScrapeEndpoint } // NewMetricsManager creates a new MetricsManager. @@ -80,10 +88,7 @@ func (mgr *MetricsManager) updateInfo(ctx context.Context, cfg *Config) { } func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) { - if cfg.Options.MetricsAddr == mgr.addr && - cfg.Options.MetricsBasicAuth == mgr.basicAuth && - cfg.Options.InstallationID == mgr.installationID && - cfg.Options.EnvoyAdminAddress == mgr.envoyAdminAddress { + if mgr.configUnchanged(cfg) { return } @@ -98,13 +103,12 @@ func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) { return } - envoyURL, err := url.Parse("http://" + cfg.Options.EnvoyAdminAddress) - if err != nil { - log.Error(ctx).Err(err).Msg("metrics: invalid envoy admin address, disabling") - return - } - - handler, err := metrics.PrometheusHandler(envoyURL, mgr.installationID) + mgr.endpoints = append(cfg.MetricsScrapeEndpoints, + MetricsScrapeEndpoint{ + Name: "envoy", + URL: url.URL{Scheme: "http", Host: cfg.Options.EnvoyAdminAddress, Path: "/stats/prometheus"}, + }) + handler, err := metrics.PrometheusHandler(toInternalEndpoints(mgr.endpoints), mgr.installationID, defaultMetricsTimeout) if err != nil { log.Error(ctx).Err(err).Msg("metrics: failed to create prometheus handler") return @@ -116,3 +120,19 @@ func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) { mgr.handler = handler } + +func (mgr *MetricsManager) configUnchanged(cfg *Config) bool { + return cfg.Options.MetricsAddr == mgr.addr && + cfg.Options.MetricsBasicAuth == mgr.basicAuth && + cfg.Options.InstallationID == mgr.installationID && + cfg.Options.EnvoyAdminAddress == mgr.envoyAdminAddress && + reflect.DeepEqual(mgr.endpoints, cfg.MetricsScrapeEndpoints) +} + +func toInternalEndpoints(src []MetricsScrapeEndpoint) []metrics.ScrapeEndpoint { + dst := make([]metrics.ScrapeEndpoint, 0, len(src)) + for _, e := range src { + dst = append(dst, metrics.ScrapeEndpoint(e)) + } + return dst +} diff --git a/internal/telemetry/metrics/providers.go b/internal/telemetry/metrics/providers.go index 867403a90..3d7fa9809 100644 --- a/internal/telemetry/metrics/providers.go +++ b/internal/telemetry/metrics/providers.go @@ -1,29 +1,47 @@ package metrics import ( + "context" + "errors" "fmt" "io" "net/http" "net/http/httptest" "net/url" "os" + "strings" "sync" + "time" ocprom "contrib.go.opencensus.io/exporter/prometheus" + "github.com/hashicorp/go-multierror" prom "github.com/prometheus/client_golang/prometheus" io_prometheus_client "github.com/prometheus/client_model/go" "github.com/prometheus/common/expfmt" "go.opencensus.io/stats/view" "google.golang.org/protobuf/proto" + "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/pkg/metrics" - - log "github.com/pomerium/pomerium/internal/log" ) +// ScrapeEndpoint external endpoints to scrape and decorate +type ScrapeEndpoint struct { + // Name is the logical name of the endpoint + Name string + // URL of the endpoint to scrape that must output a prometheus-style metrics + URL url.URL + // Labels to append to each metric records + Labels map[string]string +} + +func (e *ScrapeEndpoint) String() string { + return fmt.Sprintf("%s(%s)", e.Name, e.URL.String()) +} + // PrometheusHandler creates an exporter that exports stats to Prometheus // and returns a handler suitable for exporting metrics. -func PrometheusHandler(envoyURL *url.URL, installationID string) (http.Handler, error) { +func PrometheusHandler(endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) (http.Handler, error) { exporter, err := getGlobalExporter() if err != nil { return nil, err @@ -31,12 +49,7 @@ func PrometheusHandler(envoyURL *url.URL, installationID string) (http.Handler, mux := http.NewServeMux() - envoyMetricsURL, err := envoyURL.Parse("/stats/prometheus") - if err != nil { - return nil, fmt.Errorf("telemetry/metrics: invalid proxy URL: %w", err) - } - - mux.Handle("/metrics", newProxyMetricsHandler(exporter, *envoyMetricsURL, installationID)) + mux.Handle("/metrics", newProxyMetricsHandler(exporter, endpoints, installationID, timeout)) return mux, nil } @@ -79,54 +92,73 @@ func registerDefaultViews() error { } // newProxyMetricsHandler creates a subrequest to the envoy control plane for metrics and -// combines them with our own -func newProxyMetricsHandler(exporter *ocprom.Exporter, envoyURL url.URL, installationID string) http.HandlerFunc { - hostname, err := os.Hostname() - if err != nil { - hostname = "__none__" - } - extraLabels := []*io_prometheus_client.LabelPair{{ - Name: proto.String(metrics.InstallationIDLabel), - Value: proto.String(installationID), - }, { - Name: proto.String(metrics.HostnameLabel), - Value: proto.String(hostname), - }} - +// combines them with internal envoy-provided +func newProxyMetricsHandler(exporter *ocprom.Exporter, endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - // Ensure we don't get entangled with compression from ocprom - r.Header.Del("Accept-Encoding") + ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() - rec := httptest.NewRecorder() - exporter.ServeHTTP(rec, r) - - err := writeMetricsWithLabels(w, rec.Body, extraLabels) - if err != nil { - log.Error(r.Context()).Err(err).Send() - return - } - - req, err := http.NewRequestWithContext(r.Context(), "GET", envoyURL.String(), nil) - if err != nil { - log.Error(r.Context()).Err(err).Msg("telemetry/metrics: failed to create request for envoy") - return - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - log.Error(r.Context()).Err(err).Msg("telemetry/metrics: fail to fetch proxy metrics") - return - } - defer resp.Body.Close() - - err = writeMetricsWithLabels(w, resp.Body, extraLabels) - if err != nil { - log.Error(r.Context()).Err(err).Send() - return + labels := getCommonLabels(installationID) + if err := writeMetricsMux(ctx, w, append( + scrapeEndpoints(endpoints, labels), + ocExport("pomerium", exporter, r, labels)), + ); err != nil { + log.Error(ctx).Msg("responding to metrics request") } } } +type promProducerResult struct { + name string + src io.ReadCloser + labels []*io_prometheus_client.LabelPair + err error +} + +// promProducerFn returns a reader containing prometheus-style metrics and additional labels to add to each record +type promProducerFn func(context.Context) promProducerResult + +// writeMetricsMux runs producers concurrently and pipes output to destination yet avoiding data interleaving +func writeMetricsMux(ctx context.Context, w io.Writer, producers []promProducerFn) error { + results := make(chan promProducerResult) + + for _, p := range producers { + go func(fn promProducerFn) { + results <- fn(ctx) + }(p) + } + + var errs *multierror.Error +loop_producers: + for i := 0; i < len(producers); i++ { + select { + case <-ctx.Done(): + err := fmt.Errorf("processed %d metric producers out of %d: %w", i, len(producers), ctx.Err()) + errs = multierror.Append(errs, err, writePrometheusComment(w, err.Error())) + break loop_producers + case res := <-results: + if err := writeMetricsResult(w, res); err != nil { + errs = multierror.Append(errs, fmt.Errorf("%s: %w", res.name, err)) + } + } + } + + return errs.ErrorOrNil() +} + +func writeMetricsResult(w io.Writer, res promProducerResult) error { + if res.err != nil { + return fmt.Errorf("fetch: %w", res.err) + } + if err := writeMetricsWithLabels(w, res.src, res.labels); err != nil { + return fmt.Errorf("%s: write: %w", res.name, err) + } + if err := res.src.Close(); err != nil { + return fmt.Errorf("%s: close: %w", res.name, err) + } + return nil +} + func writeMetricsWithLabels(w io.Writer, r io.Reader, extra []*io_prometheus_client.LabelPair) error { var parser expfmt.TextParser ms, err := parser.TextToMetricFamilies(r) @@ -146,3 +178,91 @@ func writeMetricsWithLabels(w io.Writer, r io.Reader, extra []*io_prometheus_cli return nil } + +func writePrometheusComment(w io.Writer, txt string) error { + lines := strings.Split(txt, "\n") + for _, line := range lines { + if _, err := w.Write([]byte(fmt.Sprintf("# %s\n", line))); err != nil { + return fmt.Errorf("write prometheus comment: %w", err) + } + } + return nil +} + +func ocExport(name string, exporter *ocprom.Exporter, r *http.Request, labels []*io_prometheus_client.LabelPair) promProducerFn { + return func(context.Context) promProducerResult { + // Ensure we don't get entangled with compression from ocprom + r.Header.Del("Accept-Encoding") + + rec := httptest.NewRecorder() + exporter.ServeHTTP(rec, r) + + if rec.Code/100 != 2 { + return promProducerResult{name: name, err: errors.New(rec.Result().Status)} + } + + return promProducerResult{ + name: name, + src: rec.Result().Body, + labels: labels, + } + } +} +func scrapeEndpoints(endpoints []ScrapeEndpoint, labels []*io_prometheus_client.LabelPair) []promProducerFn { + out := make([]promProducerFn, 0, len(endpoints)) + for _, endpoint := range endpoints { + out = append(out, scrapeEndpoint(endpoint, labels)) + } + return out +} + +func scrapeEndpoint(endpoint ScrapeEndpoint, labels []*io_prometheus_client.LabelPair) promProducerFn { + return func(ctx context.Context) promProducerResult { + name := fmt.Sprintf("%s %s", endpoint.Name, endpoint.URL.String()) + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint.URL.String(), nil) + if err != nil { + return promProducerResult{name: name, err: fmt.Errorf("make request: %w", err)} + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return promProducerResult{name: name, err: fmt.Errorf("request: %w", err)} + } + + if resp.StatusCode/100 != 2 { + return promProducerResult{name: name, err: errors.New(resp.Status)} + } + + return promProducerResult{ + name: name, + src: resp.Body, + labels: append(toPrometheusLabels(endpoint.Labels), labels...), + } + } +} + +func getCommonLabels(installationID string) []*io_prometheus_client.LabelPair { + hostname, err := os.Hostname() + if err != nil { + hostname = "__none__" + } + return []*io_prometheus_client.LabelPair{{ + Name: proto.String(metrics.InstallationIDLabel), + Value: proto.String(installationID), + }, { + Name: proto.String(metrics.HostnameLabel), + Value: proto.String(hostname), + }} +} + +func toPrometheusLabels(labels map[string]string) []*io_prometheus_client.LabelPair { + out := make([]*io_prometheus_client.LabelPair, 0, len(labels)) + for k, v := range labels { + out = append(out, &io_prometheus_client.LabelPair{ + Name: proto.String(k), + Value: proto.String(v), + }) + } + return out +} diff --git a/internal/telemetry/metrics/providers_test.go b/internal/telemetry/metrics/providers_test.go index 65ac5b672..be68547e6 100644 --- a/internal/telemetry/metrics/providers_test.go +++ b/internal/telemetry/metrics/providers_test.go @@ -7,6 +7,7 @@ import ( "net/url" "regexp" "testing" + "time" ) func newEnvoyMetricsHandler() http.HandlerFunc { @@ -28,7 +29,7 @@ envoy_server_initialization_time_ms_bucket{le="1000"} 1 } func getMetrics(t *testing.T, envoyURL *url.URL) []byte { - h, err := PrometheusHandler(envoyURL, "test_installation_id") + h, err := PrometheusHandler([]ScrapeEndpoint{{Name: "envoy", URL: *envoyURL}}, "test_installation_id", time.Second*20) if err != nil { t.Fatal(err) } diff --git a/pkg/cmd/pomerium/pomerium.go b/pkg/cmd/pomerium/pomerium.go index d56084ca9..571a74870 100644 --- a/pkg/cmd/pomerium/pomerium.go +++ b/pkg/cmd/pomerium/pomerium.go @@ -176,7 +176,7 @@ func setupAuthorize(ctx context.Context, src config.Source, controlPlane *contro } envoy_service_auth_v3.RegisterAuthorizationServer(controlPlane.GRPCServer, svc) - log.Info(context.TODO()).Msg("enabled authorize service") + log.Info(ctx).Msg("enabled authorize service") src.OnConfigChange(ctx, svc.OnConfigChange) svc.OnConfigChange(ctx, src.GetConfig()) return svc, nil @@ -192,7 +192,7 @@ func setupDataBroker(ctx context.Context, return nil, fmt.Errorf("error creating databroker service: %w", err) } svc.Register(controlPlane.GRPCServer) - log.Info(context.TODO()).Msg("enabled databroker service") + log.Info(ctx).Msg("enabled databroker service") src.OnConfigChange(ctx, svc.OnConfigChange) svc.OnConfigChange(ctx, src.GetConfig()) return svc, nil @@ -219,7 +219,7 @@ func setupProxy(ctx context.Context, src config.Source, controlPlane *controlpla return fmt.Errorf("error adding proxy service to control plane: %w", err) } - log.Info(context.TODO()).Msg("enabled proxy service") + log.Info(ctx).Msg("enabled proxy service") src.OnConfigChange(ctx, svc.OnConfigChange) svc.OnConfigChange(ctx, src.GetConfig()) diff --git a/internal/netutil/netutil.go b/pkg/netutil/netutil.go similarity index 100% rename from internal/netutil/netutil.go rename to pkg/netutil/netutil.go