diff --git a/config/metrics.go b/config/metrics.go index 39746dfb3..5b03a230c 100644 --- a/config/metrics.go +++ b/config/metrics.go @@ -15,6 +15,7 @@ import ( "github.com/pomerium/pomerium/internal/middleware" "github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/telemetry/metrics" + metrics_const "github.com/pomerium/pomerium/pkg/metrics" ) const ( @@ -96,12 +97,17 @@ func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) { return } + var labels map[string]string + if cfg.Options.IsRuntimeFlagSet(RuntimeFlagAddExtraMetricsLabels) { + labels = getCommonLabels(mgr.installationID) + } + mgr.endpoints = append(cfg.MetricsScrapeEndpoints, MetricsScrapeEndpoint{ Name: "envoy", URL: url.URL{Scheme: "http", Host: cfg.Options.MetricsAddr, Path: "/metrics/envoy"}, }) - handler, err := metrics.PrometheusHandler(toInternalEndpoints(mgr.endpoints), mgr.installationID, defaultMetricsTimeout) + handler, err := metrics.PrometheusHandler(toInternalEndpoints(mgr.endpoints), defaultMetricsTimeout, labels) if err != nil { log.Ctx(ctx).Error().Err(err).Msg("metrics: failed to create prometheus handler") return @@ -128,3 +134,17 @@ func toInternalEndpoints(src []MetricsScrapeEndpoint) []metrics.ScrapeEndpoint { } return dst } + +func getCommonLabels(installationID string) map[string]string { + hostname, err := os.Hostname() + if err != nil { + hostname = "__none__" + } + m := map[string]string{ + metrics_const.HostnameLabel: hostname, + } + if installationID != "" { + m[metrics_const.InstallationIDLabel] = installationID + } + return m +} diff --git a/config/runtime_flags.go b/config/runtime_flags.go index fe01550e6..8ba41cc81 100644 --- a/config/runtime_flags.go +++ b/config/runtime_flags.go @@ -25,6 +25,9 @@ var ( // is deprecated pending removal in a future release, but this flag allows a temporary // opt-out from the deprecation. RuntimeFlagPomeriumJWTEndpoint = runtimeFlag("pomerium_jwt_endpoint", false) + + // RuntimeFlagAddExtraMetricsLabels enables adding extra labels to metrics (host and installation id) + RuntimeFlagAddExtraMetricsLabels = runtimeFlag("add_extra_metrics_labels", true) ) // RuntimeFlag is a runtime flag that can flip on/off certain features diff --git a/internal/telemetry/metrics/bench_test.go b/internal/telemetry/metrics/bench_test.go new file mode 100644 index 000000000..c2383f64d --- /dev/null +++ b/internal/telemetry/metrics/bench_test.go @@ -0,0 +1,115 @@ +package metrics_test + +import ( + "fmt" + "io" + "net/http" + "sort" + "testing" + "time" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/internal/testenv" + "github.com/pomerium/pomerium/internal/testenv/snippets" + "github.com/pomerium/pomerium/internal/testenv/upstreams" + "github.com/stretchr/testify/assert" +) + +func TestScrapeMetricsEndpoint(t *testing.T) { + t.Skip("this test is for profiling purposes only") + + env := testenv.New(t, testenv.WithTraceDebugFlags(testenv.StandardTraceDebugFlags)) + upstream := upstreams.HTTP(nil) + upstream.Handle("/test", func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte("OK")) + }) + + routes := []testenv.Route{} + for i := range 10 { + routes = append(routes, upstream.Route(). + From(env.SubdomainURL(fmt.Sprintf("test-%d", i))). + Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true })) + } + env.AddUpstream(upstream) + env.Start() + snippets.WaitStartupComplete(env) + + for _, r := range routes { + resp, err := upstream.Get(r, upstreams.Path("/test")) + assert.NoError(t, err) + data, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + resp.Body.Close() + assert.Equal(t, "OK", string(data)) + } + + metricsURL := fmt.Sprintf("http://localhost:%d/metrics", env.Ports().Metrics.Value()) + + client := &http.Client{ + Timeout: 10 * time.Second, + } + + var durations []time.Duration + var totalBytes int64 + var errors int + + pct := 0 + niter := 200 + for i := 0; i < niter; i++ { + pct = i * 100 / niter + if pct%10 == 0 { + t.Log(pct, "%") + } + + start := time.Now() + resp, err := client.Get(metricsURL) + elapsed := time.Since(start) + + if err != nil { + t.Logf("Request %d failed: %v", i, err) + errors++ + continue + } + + nb, err := io.Copy(io.Discard, resp.Body) + if err != nil { + resp.Body.Close() + errors++ + continue + } + resp.Body.Close() + + durations = append(durations, elapsed) + totalBytes += nb + } + + if len(durations) > 0 { + sort.Slice(durations, func(i, j int) bool { + return durations[i] < durations[j] + }) + + var total time.Duration + for _, d := range durations { + total += d + } + + t.Logf("Metrics scraping statistics:") + t.Logf(" Successful requests: %d", len(durations)) + t.Logf(" Failed requests: %d", errors) + t.Logf(" Total bytes: %d", totalBytes) + t.Logf(" Avg bytes per request: %.2f", float64(totalBytes)/float64(len(durations))) + t.Logf(" Min: %v", durations[0]) + t.Logf(" Max: %v", durations[len(durations)-1]) + t.Logf(" Avg: %v", total/time.Duration(len(durations))) + t.Logf(" p50: %v", durations[len(durations)*50/100]) + t.Logf(" p90: %v", durations[len(durations)*90/100]) + t.Logf(" p95: %v", durations[len(durations)*95/100]) + t.Logf(" p99: %v", durations[len(durations)*99/100]) + } else { + t.Logf("No successful requests made") + } + + t.Logf("metrics endpoint: %s", metricsURL) + + env.Stop() +} diff --git a/internal/telemetry/metrics/providers.go b/internal/telemetry/metrics/providers.go index 3a783cb0f..bca3b16d5 100644 --- a/internal/telemetry/metrics/providers.go +++ b/internal/telemetry/metrics/providers.go @@ -10,7 +10,6 @@ import ( "net/http" "net/http/httptest" "net/url" - "os" "strings" "sync" "time" @@ -22,7 +21,6 @@ import ( "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry/prometheus" - "github.com/pomerium/pomerium/pkg/metrics" ) // EnvoyMetricsPath is the path on the metrics listener that retrieves envoy metrics. @@ -44,7 +42,7 @@ func (e *ScrapeEndpoint) String() string { // PrometheusHandler creates an exporter that exports stats to Prometheus // and returns a handler suitable for exporting metrics. -func PrometheusHandler(endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) (http.Handler, error) { +func PrometheusHandler(endpoints []ScrapeEndpoint, timeout time.Duration, labels map[string]string) (http.Handler, error) { exporter, err := getGlobalExporter() if err != nil { return nil, err @@ -52,7 +50,7 @@ func PrometheusHandler(endpoints []ScrapeEndpoint, installationID string, timeou mux := http.NewServeMux() - mux.Handle("/metrics", newProxyMetricsHandler(exporter, endpoints, installationID, timeout)) + mux.Handle("/metrics", newProxyMetricsHandler(exporter, endpoints, timeout, labels)) return mux, nil } @@ -96,12 +94,16 @@ func registerDefaultViews() error { // newProxyMetricsHandler creates a subrequest to the envoy control plane for metrics and // combines them with internal envoy-provided -func newProxyMetricsHandler(exporter *ocprom.Exporter, endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) http.HandlerFunc { +func newProxyMetricsHandler( + exporter *ocprom.Exporter, + endpoints []ScrapeEndpoint, + timeout time.Duration, + labels map[string]string, +) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), timeout) defer cancel() - labels := getCommonLabels(installationID) if err := writeMetricsMux(ctx, w, append( scrapeEndpoints(endpoints, labels), ocExport("pomerium", exporter, r, labels)), @@ -156,7 +158,7 @@ func writeMetricsResult(w io.Writer, res promProducerResult) error { return fmt.Errorf("fetch: %w", res.err) } return errors.Join( - prometheus.Export(w, prometheus.AddLabels(prometheus.NewMetricFamilyStream(res.src), res.labels)), + prometheus.RelabelTextStream(w, res.src, res.labels), res.src.Close(), ) } @@ -229,17 +231,3 @@ func scrapeEndpoint(endpoint ScrapeEndpoint, extra map[string]string) promProduc } } } - -func getCommonLabels(installationID string) map[string]string { - hostname, err := os.Hostname() - if err != nil { - hostname = "__none__" - } - m := map[string]string{ - metrics.HostnameLabel: hostname, - } - if installationID != "" { - m[metrics.InstallationIDLabel] = installationID - } - return m -} diff --git a/internal/telemetry/metrics/providers_test.go b/internal/telemetry/metrics/providers_test.go index d11eab792..86d3ad0a1 100644 --- a/internal/telemetry/metrics/providers_test.go +++ b/internal/telemetry/metrics/providers_test.go @@ -29,7 +29,7 @@ envoy_server_initialization_time_ms_bucket{le="1000"} 1 } func getMetrics(t *testing.T, envoyURL *url.URL, header http.Header) []byte { - h, err := PrometheusHandler([]ScrapeEndpoint{{Name: "envoy", URL: *envoyURL}}, "test_installation_id", time.Second*20) + h, err := PrometheusHandler([]ScrapeEndpoint{{Name: "envoy", URL: *envoyURL}}, time.Second*20, nil) if err != nil { t.Fatal(err) } diff --git a/internal/telemetry/prometheus/relabel_text_stream.go b/internal/telemetry/prometheus/relabel_text_stream.go new file mode 100644 index 000000000..65e51c039 --- /dev/null +++ b/internal/telemetry/prometheus/relabel_text_stream.go @@ -0,0 +1,103 @@ +package prometheus + +import ( + "bufio" + "bytes" + "errors" + "io" + "maps" + "slices" + "strings" +) + +func writeMulti(dst io.Writer, b ...[]byte) error { + for _, buf := range b { + if _, err := dst.Write(buf); err != nil { + return err + } + } + return nil +} + +// RelabelTextStream relabels a prometheus text stream by adding additional labels to each metric. +func RelabelTextStream(dst io.Writer, src io.Reader, addLabels map[string]string) error { + if len(addLabels) == 0 { + _, err := io.Copy(dst, src) + return err + } + + var labelsBuilder strings.Builder + for _, k := range slices.Sorted(maps.Keys(addLabels)) { + v := addLabels[k] + if labelsBuilder.Len() > 0 { + labelsBuilder.WriteByte(',') + } + labelsBuilder.WriteString(k) + labelsBuilder.WriteString("=\"") + labelsBuilder.WriteString(v) + labelsBuilder.WriteString("\"") + } + addedLabels := []byte(labelsBuilder.String()) + + r := bufio.NewReader(src) + + for { + line, err := r.ReadSlice('\n') + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return err + } + + if len(line) == 0 || line[0] == '#' { + if _, err := dst.Write(line); err != nil { + return err + } + continue + } + + spaceIdx := bytes.IndexByte(line, ' ') + if spaceIdx == -1 { + if _, err := dst.Write(line); err != nil { + return err + } + continue + } + + metricWithLabels := line[:spaceIdx] + value := line[spaceIdx:] + + openBraceIdx := bytes.IndexByte(metricWithLabels, '{') + if openBraceIdx == -1 { // no labels + if err := writeMulti(dst, metricWithLabels, []byte("{"), addedLabels, []byte("}"), value); err != nil { + return err + } + continue + } + + metricName := metricWithLabels[:openBraceIdx] + + closeBraceIdx := bytes.LastIndexByte(metricWithLabels, '}') + if closeBraceIdx == -1 || closeBraceIdx <= openBraceIdx { + if _, err := dst.Write(line); err != nil { + return err + } + continue + } + + existingLabels := metricWithLabels[openBraceIdx+1 : closeBraceIdx] + + if len(existingLabels) > 0 { + if err := writeMulti(dst, metricName, []byte("{"), existingLabels, []byte(","), addedLabels, []byte("}"), value); err != nil { + return err + } + } else { + if err := writeMulti(dst, metricName, []byte("{"), addedLabels, []byte("}"), value); err != nil { + return err + } + } + } + + return nil +} diff --git a/internal/telemetry/prometheus/relabel_text_stream_test.go b/internal/telemetry/prometheus/relabel_text_stream_test.go new file mode 100644 index 000000000..c63d433c3 --- /dev/null +++ b/internal/telemetry/prometheus/relabel_text_stream_test.go @@ -0,0 +1,141 @@ +package prometheus_test + +import ( + "bytes" + "io" + "strings" + "testing" + + "github.com/pomerium/pomerium/internal/telemetry/prometheus" + "github.com/stretchr/testify/require" +) + +// RepeatingReader repeats reading from the beginning after EOF for a specified number of times +type RepeatingReader struct { + reader *bytes.Reader + resets int + maxResets int +} + +// NewRepeatingReader creates a new reader that will reset up to maxResets times +func NewRepeatingReader(data []byte, maxResets int) *RepeatingReader { + return &RepeatingReader{ + reader: bytes.NewReader(data), + resets: 0, + maxResets: maxResets, + } +} + +// Read implements io.Reader +func (r *RepeatingReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + if err == io.EOF && r.resets < r.maxResets { + r.reader.Seek(0, io.SeekStart) + r.resets++ + return r.Read(p) + } + return +} + +func BenchmarkRelabelTestStream(b *testing.B) { + addLabels := map[string]string{"instance": "localhost:9090", "installation_id": "12345-67890-12345-67890"} + src := []byte(` +# TYPE envoy_cluster_upstream_cx_total counter +envoy_cluster_upstream_cx_total{service="pomerium-proxy",envoy_cluster_name="route-1",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 2 +envoy_cluster_upstream_cx_total{service="pomerium-proxy",envoy_cluster_name="route-2",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 3 +`) + + b.Run("RelabelTextStream", func(b *testing.B) { + inputReader := NewRepeatingReader(src, b.N) + err := prometheus.RelabelTextStream(io.Discard, inputReader, addLabels) + require.NoError(b, err) + }) + b.Run("Previous", func(b *testing.B) { + inputReader := NewRepeatingReader(src, b.N) + err := prometheus.Export(io.Discard, prometheus.AddLabels(prometheus.NewMetricFamilyStream(inputReader), addLabels)) + require.NoError(b, err) + }) +} + +func TestRelabelTextStream(t *testing.T) { + testCases := []struct { + name string + input string + addLabels map[string]string + expected string + }{ + { + name: "empty input", + input: "", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "", + }, + { + name: "no labels to add", + input: "metric 42\n", + addLabels: map[string]string{}, + expected: "metric 42\n", + }, + { + name: "comment lines", + input: "# HELP metric_name Help text\n# TYPE metric_name counter\n", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "# HELP metric_name Help text\n# TYPE metric_name counter\n", + }, + { + name: "metric without labels", + input: "http_requests_total 42\n", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "http_requests_total{instance=\"localhost:9090\"} 42\n", + }, + { + name: "metric with existing labels", + input: "http_requests_total{method=\"GET\"} 42\n", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "http_requests_total{method=\"GET\",instance=\"localhost:9090\"} 42\n", + }, + { + name: "multiple labels to add", + input: "http_requests_total 42\n", + addLabels: map[string]string{"instance": "localhost:9090", "job": "prometheus"}, + expected: "http_requests_total{instance=\"localhost:9090\",job=\"prometheus\"} 42\n", + }, + { + name: "malformed metric (no space)", + input: "invalid_metric\n", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "invalid_metric\n", + }, + { + name: "malformed metric (no closing brace)", + input: "invalid_metric{label=\"value\" 42\n", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "invalid_metric{label=\"value\" 42\n", + }, + { + name: "empty labels", + input: "http_requests_total{} 42\n", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "http_requests_total{instance=\"localhost:9090\"} 42\n", + }, + { + name: "multiple metrics", + input: "metric1 10\nmetric2{label=\"value\"} 20\n# COMMENT\nmetric3 30\n", + addLabels: map[string]string{"instance": "localhost:9090"}, + expected: "metric1{instance=\"localhost:9090\"} 10\nmetric2{label=\"value\",instance=\"localhost:9090\"} 20\n# COMMENT\nmetric3{instance=\"localhost:9090\"} 30\n", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + inputReader := strings.NewReader(tc.input) + outputBuffer := &bytes.Buffer{} + + err := prometheus.RelabelTextStream(outputBuffer, inputReader, tc.addLabels) + require.NoError(t, err) + + actual := outputBuffer.String() + require.Equal(t, tc.expected, actual) + }) + } +}