mirror of
https://github.com/pomerium/pomerium.git
synced 2025-04-30 10:56:28 +02:00
metrics: reduce gc pressure (#5531)
metrics: reduce gc pressure (#5530) Co-authored-by: Denis Mishin <dmishin@pomerium.com>
This commit is contained in:
parent
98a5779d77
commit
a078f93986
7 changed files with 393 additions and 23 deletions
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/pomerium/pomerium/internal/middleware"
|
"github.com/pomerium/pomerium/internal/middleware"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry"
|
"github.com/pomerium/pomerium/internal/telemetry"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||||
|
metrics_const "github.com/pomerium/pomerium/pkg/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -96,12 +97,17 @@ func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var labels map[string]string
|
||||||
|
if cfg.Options.IsRuntimeFlagSet(RuntimeFlagAddExtraMetricsLabels) {
|
||||||
|
labels = getCommonLabels(mgr.installationID)
|
||||||
|
}
|
||||||
|
|
||||||
mgr.endpoints = append(cfg.MetricsScrapeEndpoints,
|
mgr.endpoints = append(cfg.MetricsScrapeEndpoints,
|
||||||
MetricsScrapeEndpoint{
|
MetricsScrapeEndpoint{
|
||||||
Name: "envoy",
|
Name: "envoy",
|
||||||
URL: url.URL{Scheme: "http", Host: cfg.Options.MetricsAddr, Path: "/metrics/envoy"},
|
URL: url.URL{Scheme: "http", Host: cfg.Options.MetricsAddr, Path: "/metrics/envoy"},
|
||||||
})
|
})
|
||||||
handler, err := metrics.PrometheusHandler(toInternalEndpoints(mgr.endpoints), mgr.installationID, defaultMetricsTimeout)
|
handler, err := metrics.PrometheusHandler(toInternalEndpoints(mgr.endpoints), defaultMetricsTimeout, labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Ctx(ctx).Error().Err(err).Msg("metrics: failed to create prometheus handler")
|
log.Ctx(ctx).Error().Err(err).Msg("metrics: failed to create prometheus handler")
|
||||||
return
|
return
|
||||||
|
@ -128,3 +134,17 @@ func toInternalEndpoints(src []MetricsScrapeEndpoint) []metrics.ScrapeEndpoint {
|
||||||
}
|
}
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getCommonLabels(installationID string) map[string]string {
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
hostname = "__none__"
|
||||||
|
}
|
||||||
|
m := map[string]string{
|
||||||
|
metrics_const.HostnameLabel: hostname,
|
||||||
|
}
|
||||||
|
if installationID != "" {
|
||||||
|
m[metrics_const.InstallationIDLabel] = installationID
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
|
@ -25,6 +25,9 @@ var (
|
||||||
// is deprecated pending removal in a future release, but this flag allows a temporary
|
// is deprecated pending removal in a future release, but this flag allows a temporary
|
||||||
// opt-out from the deprecation.
|
// opt-out from the deprecation.
|
||||||
RuntimeFlagPomeriumJWTEndpoint = runtimeFlag("pomerium_jwt_endpoint", false)
|
RuntimeFlagPomeriumJWTEndpoint = runtimeFlag("pomerium_jwt_endpoint", false)
|
||||||
|
|
||||||
|
// RuntimeFlagAddExtraMetricsLabels enables adding extra labels to metrics (host and installation id)
|
||||||
|
RuntimeFlagAddExtraMetricsLabels = runtimeFlag("add_extra_metrics_labels", true)
|
||||||
)
|
)
|
||||||
|
|
||||||
// RuntimeFlag is a runtime flag that can flip on/off certain features
|
// RuntimeFlag is a runtime flag that can flip on/off certain features
|
||||||
|
|
115
internal/telemetry/metrics/bench_test.go
Normal file
115
internal/telemetry/metrics/bench_test.go
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
package metrics_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/config"
|
||||||
|
"github.com/pomerium/pomerium/internal/testenv"
|
||||||
|
"github.com/pomerium/pomerium/internal/testenv/snippets"
|
||||||
|
"github.com/pomerium/pomerium/internal/testenv/upstreams"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestScrapeMetricsEndpoint(t *testing.T) {
|
||||||
|
t.Skip("this test is for profiling purposes only")
|
||||||
|
|
||||||
|
env := testenv.New(t, testenv.WithTraceDebugFlags(testenv.StandardTraceDebugFlags))
|
||||||
|
upstream := upstreams.HTTP(nil)
|
||||||
|
upstream.Handle("/test", func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
w.Write([]byte("OK"))
|
||||||
|
})
|
||||||
|
|
||||||
|
routes := []testenv.Route{}
|
||||||
|
for i := range 10 {
|
||||||
|
routes = append(routes, upstream.Route().
|
||||||
|
From(env.SubdomainURL(fmt.Sprintf("test-%d", i))).
|
||||||
|
Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true }))
|
||||||
|
}
|
||||||
|
env.AddUpstream(upstream)
|
||||||
|
env.Start()
|
||||||
|
snippets.WaitStartupComplete(env)
|
||||||
|
|
||||||
|
for _, r := range routes {
|
||||||
|
resp, err := upstream.Get(r, upstreams.Path("/test"))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
data, err := io.ReadAll(resp.Body)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
resp.Body.Close()
|
||||||
|
assert.Equal(t, "OK", string(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
metricsURL := fmt.Sprintf("http://localhost:%d/metrics", env.Ports().Metrics.Value())
|
||||||
|
|
||||||
|
client := &http.Client{
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
var durations []time.Duration
|
||||||
|
var totalBytes int64
|
||||||
|
var errors int
|
||||||
|
|
||||||
|
pct := 0
|
||||||
|
niter := 200
|
||||||
|
for i := 0; i < niter; i++ {
|
||||||
|
pct = i * 100 / niter
|
||||||
|
if pct%10 == 0 {
|
||||||
|
t.Log(pct, "%")
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
resp, err := client.Get(metricsURL)
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("Request %d failed: %v", i, err)
|
||||||
|
errors++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
nb, err := io.Copy(io.Discard, resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
errors++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
durations = append(durations, elapsed)
|
||||||
|
totalBytes += nb
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(durations) > 0 {
|
||||||
|
sort.Slice(durations, func(i, j int) bool {
|
||||||
|
return durations[i] < durations[j]
|
||||||
|
})
|
||||||
|
|
||||||
|
var total time.Duration
|
||||||
|
for _, d := range durations {
|
||||||
|
total += d
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Metrics scraping statistics:")
|
||||||
|
t.Logf(" Successful requests: %d", len(durations))
|
||||||
|
t.Logf(" Failed requests: %d", errors)
|
||||||
|
t.Logf(" Total bytes: %d", totalBytes)
|
||||||
|
t.Logf(" Avg bytes per request: %.2f", float64(totalBytes)/float64(len(durations)))
|
||||||
|
t.Logf(" Min: %v", durations[0])
|
||||||
|
t.Logf(" Max: %v", durations[len(durations)-1])
|
||||||
|
t.Logf(" Avg: %v", total/time.Duration(len(durations)))
|
||||||
|
t.Logf(" p50: %v", durations[len(durations)*50/100])
|
||||||
|
t.Logf(" p90: %v", durations[len(durations)*90/100])
|
||||||
|
t.Logf(" p95: %v", durations[len(durations)*95/100])
|
||||||
|
t.Logf(" p99: %v", durations[len(durations)*99/100])
|
||||||
|
} else {
|
||||||
|
t.Logf("No successful requests made")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("metrics endpoint: %s", metricsURL)
|
||||||
|
|
||||||
|
env.Stop()
|
||||||
|
}
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -22,7 +21,6 @@ import (
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/log"
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
|
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
|
||||||
"github.com/pomerium/pomerium/pkg/metrics"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// EnvoyMetricsPath is the path on the metrics listener that retrieves envoy metrics.
|
// EnvoyMetricsPath is the path on the metrics listener that retrieves envoy metrics.
|
||||||
|
@ -44,7 +42,7 @@ func (e *ScrapeEndpoint) String() string {
|
||||||
|
|
||||||
// PrometheusHandler creates an exporter that exports stats to Prometheus
|
// PrometheusHandler creates an exporter that exports stats to Prometheus
|
||||||
// and returns a handler suitable for exporting metrics.
|
// and returns a handler suitable for exporting metrics.
|
||||||
func PrometheusHandler(endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) (http.Handler, error) {
|
func PrometheusHandler(endpoints []ScrapeEndpoint, timeout time.Duration, labels map[string]string) (http.Handler, error) {
|
||||||
exporter, err := getGlobalExporter()
|
exporter, err := getGlobalExporter()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -52,7 +50,7 @@ func PrometheusHandler(endpoints []ScrapeEndpoint, installationID string, timeou
|
||||||
|
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
mux.Handle("/metrics", newProxyMetricsHandler(exporter, endpoints, installationID, timeout))
|
mux.Handle("/metrics", newProxyMetricsHandler(exporter, endpoints, timeout, labels))
|
||||||
return mux, nil
|
return mux, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,12 +94,16 @@ func registerDefaultViews() error {
|
||||||
|
|
||||||
// newProxyMetricsHandler creates a subrequest to the envoy control plane for metrics and
|
// newProxyMetricsHandler creates a subrequest to the envoy control plane for metrics and
|
||||||
// combines them with internal envoy-provided
|
// combines them with internal envoy-provided
|
||||||
func newProxyMetricsHandler(exporter *ocprom.Exporter, endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) http.HandlerFunc {
|
func newProxyMetricsHandler(
|
||||||
|
exporter *ocprom.Exporter,
|
||||||
|
endpoints []ScrapeEndpoint,
|
||||||
|
timeout time.Duration,
|
||||||
|
labels map[string]string,
|
||||||
|
) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx, cancel := context.WithTimeout(r.Context(), timeout)
|
ctx, cancel := context.WithTimeout(r.Context(), timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
labels := getCommonLabels(installationID)
|
|
||||||
if err := writeMetricsMux(ctx, w, append(
|
if err := writeMetricsMux(ctx, w, append(
|
||||||
scrapeEndpoints(endpoints, labels),
|
scrapeEndpoints(endpoints, labels),
|
||||||
ocExport("pomerium", exporter, r, labels)),
|
ocExport("pomerium", exporter, r, labels)),
|
||||||
|
@ -156,7 +158,7 @@ func writeMetricsResult(w io.Writer, res promProducerResult) error {
|
||||||
return fmt.Errorf("fetch: %w", res.err)
|
return fmt.Errorf("fetch: %w", res.err)
|
||||||
}
|
}
|
||||||
return errors.Join(
|
return errors.Join(
|
||||||
prometheus.Export(w, prometheus.AddLabels(prometheus.NewMetricFamilyStream(res.src), res.labels)),
|
prometheus.RelabelTextStream(w, res.src, res.labels),
|
||||||
res.src.Close(),
|
res.src.Close(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -229,17 +231,3 @@ func scrapeEndpoint(endpoint ScrapeEndpoint, extra map[string]string) promProduc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getCommonLabels(installationID string) map[string]string {
|
|
||||||
hostname, err := os.Hostname()
|
|
||||||
if err != nil {
|
|
||||||
hostname = "__none__"
|
|
||||||
}
|
|
||||||
m := map[string]string{
|
|
||||||
metrics.HostnameLabel: hostname,
|
|
||||||
}
|
|
||||||
if installationID != "" {
|
|
||||||
m[metrics.InstallationIDLabel] = installationID
|
|
||||||
}
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ envoy_server_initialization_time_ms_bucket{le="1000"} 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMetrics(t *testing.T, envoyURL *url.URL, header http.Header) []byte {
|
func getMetrics(t *testing.T, envoyURL *url.URL, header http.Header) []byte {
|
||||||
h, err := PrometheusHandler([]ScrapeEndpoint{{Name: "envoy", URL: *envoyURL}}, "test_installation_id", time.Second*20)
|
h, err := PrometheusHandler([]ScrapeEndpoint{{Name: "envoy", URL: *envoyURL}}, time.Second*20, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
103
internal/telemetry/prometheus/relabel_text_stream.go
Normal file
103
internal/telemetry/prometheus/relabel_text_stream.go
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
package prometheus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"maps"
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func writeMulti(dst io.Writer, b ...[]byte) error {
|
||||||
|
for _, buf := range b {
|
||||||
|
if _, err := dst.Write(buf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RelabelTextStream relabels a prometheus text stream by adding additional labels to each metric.
|
||||||
|
func RelabelTextStream(dst io.Writer, src io.Reader, addLabels map[string]string) error {
|
||||||
|
if len(addLabels) == 0 {
|
||||||
|
_, err := io.Copy(dst, src)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var labelsBuilder strings.Builder
|
||||||
|
for _, k := range slices.Sorted(maps.Keys(addLabels)) {
|
||||||
|
v := addLabels[k]
|
||||||
|
if labelsBuilder.Len() > 0 {
|
||||||
|
labelsBuilder.WriteByte(',')
|
||||||
|
}
|
||||||
|
labelsBuilder.WriteString(k)
|
||||||
|
labelsBuilder.WriteString("=\"")
|
||||||
|
labelsBuilder.WriteString(v)
|
||||||
|
labelsBuilder.WriteString("\"")
|
||||||
|
}
|
||||||
|
addedLabels := []byte(labelsBuilder.String())
|
||||||
|
|
||||||
|
r := bufio.NewReader(src)
|
||||||
|
|
||||||
|
for {
|
||||||
|
line, err := r.ReadSlice('\n')
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(line) == 0 || line[0] == '#' {
|
||||||
|
if _, err := dst.Write(line); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
spaceIdx := bytes.IndexByte(line, ' ')
|
||||||
|
if spaceIdx == -1 {
|
||||||
|
if _, err := dst.Write(line); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
metricWithLabels := line[:spaceIdx]
|
||||||
|
value := line[spaceIdx:]
|
||||||
|
|
||||||
|
openBraceIdx := bytes.IndexByte(metricWithLabels, '{')
|
||||||
|
if openBraceIdx == -1 { // no labels
|
||||||
|
if err := writeMulti(dst, metricWithLabels, []byte("{"), addedLabels, []byte("}"), value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
metricName := metricWithLabels[:openBraceIdx]
|
||||||
|
|
||||||
|
closeBraceIdx := bytes.LastIndexByte(metricWithLabels, '}')
|
||||||
|
if closeBraceIdx == -1 || closeBraceIdx <= openBraceIdx {
|
||||||
|
if _, err := dst.Write(line); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
existingLabels := metricWithLabels[openBraceIdx+1 : closeBraceIdx]
|
||||||
|
|
||||||
|
if len(existingLabels) > 0 {
|
||||||
|
if err := writeMulti(dst, metricName, []byte("{"), existingLabels, []byte(","), addedLabels, []byte("}"), value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := writeMulti(dst, metricName, []byte("{"), addedLabels, []byte("}"), value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
141
internal/telemetry/prometheus/relabel_text_stream_test.go
Normal file
141
internal/telemetry/prometheus/relabel_text_stream_test.go
Normal file
|
@ -0,0 +1,141 @@
|
||||||
|
package prometheus_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RepeatingReader repeats reading from the beginning after EOF for a specified number of times
|
||||||
|
type RepeatingReader struct {
|
||||||
|
reader *bytes.Reader
|
||||||
|
resets int
|
||||||
|
maxResets int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRepeatingReader creates a new reader that will reset up to maxResets times
|
||||||
|
func NewRepeatingReader(data []byte, maxResets int) *RepeatingReader {
|
||||||
|
return &RepeatingReader{
|
||||||
|
reader: bytes.NewReader(data),
|
||||||
|
resets: 0,
|
||||||
|
maxResets: maxResets,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read implements io.Reader
|
||||||
|
func (r *RepeatingReader) Read(p []byte) (n int, err error) {
|
||||||
|
n, err = r.reader.Read(p)
|
||||||
|
if err == io.EOF && r.resets < r.maxResets {
|
||||||
|
r.reader.Seek(0, io.SeekStart)
|
||||||
|
r.resets++
|
||||||
|
return r.Read(p)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRelabelTestStream(b *testing.B) {
|
||||||
|
addLabels := map[string]string{"instance": "localhost:9090", "installation_id": "12345-67890-12345-67890"}
|
||||||
|
src := []byte(`
|
||||||
|
# TYPE envoy_cluster_upstream_cx_total counter
|
||||||
|
envoy_cluster_upstream_cx_total{service="pomerium-proxy",envoy_cluster_name="route-1",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 2
|
||||||
|
envoy_cluster_upstream_cx_total{service="pomerium-proxy",envoy_cluster_name="route-2",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 3
|
||||||
|
`)
|
||||||
|
|
||||||
|
b.Run("RelabelTextStream", func(b *testing.B) {
|
||||||
|
inputReader := NewRepeatingReader(src, b.N)
|
||||||
|
err := prometheus.RelabelTextStream(io.Discard, inputReader, addLabels)
|
||||||
|
require.NoError(b, err)
|
||||||
|
})
|
||||||
|
b.Run("Previous", func(b *testing.B) {
|
||||||
|
inputReader := NewRepeatingReader(src, b.N)
|
||||||
|
err := prometheus.Export(io.Discard, prometheus.AddLabels(prometheus.NewMetricFamilyStream(inputReader), addLabels))
|
||||||
|
require.NoError(b, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRelabelTextStream(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
input string
|
||||||
|
addLabels map[string]string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty input",
|
||||||
|
input: "",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no labels to add",
|
||||||
|
input: "metric 42\n",
|
||||||
|
addLabels: map[string]string{},
|
||||||
|
expected: "metric 42\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "comment lines",
|
||||||
|
input: "# HELP metric_name Help text\n# TYPE metric_name counter\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "# HELP metric_name Help text\n# TYPE metric_name counter\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "metric without labels",
|
||||||
|
input: "http_requests_total 42\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "http_requests_total{instance=\"localhost:9090\"} 42\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "metric with existing labels",
|
||||||
|
input: "http_requests_total{method=\"GET\"} 42\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "http_requests_total{method=\"GET\",instance=\"localhost:9090\"} 42\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple labels to add",
|
||||||
|
input: "http_requests_total 42\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090", "job": "prometheus"},
|
||||||
|
expected: "http_requests_total{instance=\"localhost:9090\",job=\"prometheus\"} 42\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "malformed metric (no space)",
|
||||||
|
input: "invalid_metric\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "invalid_metric\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "malformed metric (no closing brace)",
|
||||||
|
input: "invalid_metric{label=\"value\" 42\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "invalid_metric{label=\"value\" 42\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty labels",
|
||||||
|
input: "http_requests_total{} 42\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "http_requests_total{instance=\"localhost:9090\"} 42\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple metrics",
|
||||||
|
input: "metric1 10\nmetric2{label=\"value\"} 20\n# COMMENT\nmetric3 30\n",
|
||||||
|
addLabels: map[string]string{"instance": "localhost:9090"},
|
||||||
|
expected: "metric1{instance=\"localhost:9090\"} 10\nmetric2{label=\"value\",instance=\"localhost:9090\"} 20\n# COMMENT\nmetric3{instance=\"localhost:9090\"} 30\n",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
inputReader := strings.NewReader(tc.input)
|
||||||
|
outputBuffer := &bytes.Buffer{}
|
||||||
|
|
||||||
|
err := prometheus.RelabelTextStream(outputBuffer, inputReader, tc.addLabels)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
actual := outputBuffer.String()
|
||||||
|
require.Equal(t, tc.expected, actual)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue