core/metrics: improve memory usage (#5364)

This commit is contained in:
Denis Mishin 2024-12-03 11:17:34 -05:00 committed by GitHub
parent 1a448708fa
commit 699679bc57
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 3978 additions and 101 deletions

View file

@ -1,10 +1,12 @@
package metrics package metrics
import ( import (
"bufio"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"maps"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
@ -16,12 +18,10 @@ import (
ocprom "contrib.go.opencensus.io/exporter/prometheus" ocprom "contrib.go.opencensus.io/exporter/prometheus"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"google.golang.org/protobuf/proto"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
"github.com/pomerium/pomerium/pkg/metrics" "github.com/pomerium/pomerium/pkg/metrics"
) )
@ -114,7 +114,7 @@ func newProxyMetricsHandler(exporter *ocprom.Exporter, endpoints []ScrapeEndpoin
type promProducerResult struct { type promProducerResult struct {
name string name string
src io.ReadCloser src io.ReadCloser
labels []*io_prometheus_client.LabelPair labels map[string]string
err error err error
} }
@ -124,6 +124,7 @@ type promProducerFn func(context.Context) promProducerResult
// writeMetricsMux runs producers concurrently and pipes output to destination yet avoiding data interleaving // writeMetricsMux runs producers concurrently and pipes output to destination yet avoiding data interleaving
func writeMetricsMux(ctx context.Context, w io.Writer, producers []promProducerFn) error { func writeMetricsMux(ctx context.Context, w io.Writer, producers []promProducerFn) error {
results := make(chan promProducerResult) results := make(chan promProducerResult)
w = bufio.NewWriter(w)
for _, p := range producers { for _, p := range producers {
go func(fn promProducerFn) { go func(fn promProducerFn) {
@ -153,33 +154,10 @@ func writeMetricsResult(w io.Writer, res promProducerResult) error {
if res.err != nil { if res.err != nil {
return fmt.Errorf("fetch: %w", res.err) return fmt.Errorf("fetch: %w", res.err)
} }
if err := writeMetricsWithLabels(w, res.src, res.labels); err != nil { return errors.Join(
return fmt.Errorf("%s: write: %w", res.name, err) prometheus.Export(w, prometheus.AddLabels(prometheus.NewMetricFamilyStream(res.src), res.labels)),
} res.src.Close(),
if err := res.src.Close(); err != nil { )
return fmt.Errorf("%s: close: %w", res.name, err)
}
return nil
}
func writeMetricsWithLabels(w io.Writer, r io.Reader, extra []*io_prometheus_client.LabelPair) error {
var parser expfmt.TextParser
ms, err := parser.TextToMetricFamilies(r)
if err != nil {
return fmt.Errorf("telemetry/metric: failed to read prometheus metrics: %w", err)
}
for _, m := range ms {
for _, mm := range m.Metric {
mm.Label = append(mm.Label, extra...)
}
_, err = expfmt.MetricFamilyToText(w, m)
if err != nil {
return fmt.Errorf("telemetry/metric: failed to write prometheus metrics: %w", err)
}
}
return nil
} }
func writePrometheusComment(w io.Writer, txt string) error { func writePrometheusComment(w io.Writer, txt string) error {
@ -192,7 +170,7 @@ func writePrometheusComment(w io.Writer, txt string) error {
return nil return nil
} }
func ocExport(name string, exporter *ocprom.Exporter, r *http.Request, labels []*io_prometheus_client.LabelPair) promProducerFn { func ocExport(name string, exporter *ocprom.Exporter, r *http.Request, labels map[string]string) promProducerFn {
return func(context.Context) promProducerResult { return func(context.Context) promProducerResult {
// Ensure we don't get entangled with compression from ocprom // Ensure we don't get entangled with compression from ocprom
r.Header.Del("Accept-Encoding") r.Header.Del("Accept-Encoding")
@ -214,7 +192,7 @@ func ocExport(name string, exporter *ocprom.Exporter, r *http.Request, labels []
} }
} }
func scrapeEndpoints(endpoints []ScrapeEndpoint, labels []*io_prometheus_client.LabelPair) []promProducerFn { func scrapeEndpoints(endpoints []ScrapeEndpoint, labels map[string]string) []promProducerFn {
out := make([]promProducerFn, 0, len(endpoints)) out := make([]promProducerFn, 0, len(endpoints))
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
out = append(out, scrapeEndpoint(endpoint, labels)) out = append(out, scrapeEndpoint(endpoint, labels))
@ -222,7 +200,7 @@ func scrapeEndpoints(endpoints []ScrapeEndpoint, labels []*io_prometheus_client.
return out return out
} }
func scrapeEndpoint(endpoint ScrapeEndpoint, labels []*io_prometheus_client.LabelPair) promProducerFn { func scrapeEndpoint(endpoint ScrapeEndpoint, extra map[string]string) promProducerFn {
return func(ctx context.Context) promProducerResult { return func(ctx context.Context) promProducerResult {
name := fmt.Sprintf("%s %s", endpoint.Name, endpoint.URL.String()) name := fmt.Sprintf("%s %s", endpoint.Name, endpoint.URL.String())
@ -240,35 +218,27 @@ func scrapeEndpoint(endpoint ScrapeEndpoint, labels []*io_prometheus_client.Labe
return promProducerResult{name: name, err: errors.New(resp.Status)} return promProducerResult{name: name, err: errors.New(resp.Status)}
} }
labels := make(map[string]string, len(endpoint.Labels)+len(extra))
maps.Copy(labels, extra)
maps.Copy(labels, endpoint.Labels)
return promProducerResult{ return promProducerResult{
name: name, name: name,
src: resp.Body, src: resp.Body,
labels: append(toPrometheusLabels(endpoint.Labels), labels...), labels: labels,
} }
} }
} }
func getCommonLabels(installationID string) []*io_prometheus_client.LabelPair { func getCommonLabels(installationID string) map[string]string {
hostname, err := os.Hostname() hostname, err := os.Hostname()
if err != nil { if err != nil {
hostname = "__none__" hostname = "__none__"
} }
return []*io_prometheus_client.LabelPair{{ m := map[string]string{
Name: proto.String(metrics.InstallationIDLabel), metrics.HostnameLabel: hostname,
Value: proto.String(installationID),
}, {
Name: proto.String(metrics.HostnameLabel),
Value: proto.String(hostname),
}}
}
func toPrometheusLabels(labels map[string]string) []*io_prometheus_client.LabelPair {
out := make([]*io_prometheus_client.LabelPair, 0, len(labels))
for k, v := range labels {
out = append(out, &io_prometheus_client.LabelPair{
Name: proto.String(k),
Value: proto.String(v),
})
} }
return out if installationID != "" {
m[metrics.InstallationIDLabel] = installationID
}
return m
} }

View file

@ -30,14 +30,9 @@ func ToOTLP(
startTime time.Time, startTime time.Time,
now time.Time, now time.Time,
) ([]metricdata.Metrics, error) { ) ([]metricdata.Metrics, error) {
stream := NewMetricFamilyStream(src)
var metrics []metricdata.Metrics var metrics []metricdata.Metrics
var conversionErrors []error var conversionErrors []error
for { for family, err := range NewMetricFamilyStream(src) {
family, err := stream.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -0,0 +1,30 @@
package prometheus
import (
"io"
"iter"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
// Export writes the metric families to the writer in text format
func Export(
w io.Writer,
src iter.Seq2[*dto.MetricFamily, error],
) error {
for mf, err := range src {
if err != nil {
return err
}
if err := exportMetricFamily(w, mf); err != nil {
return err
}
}
return nil
}
func exportMetricFamily(w io.Writer, mf *dto.MetricFamily) error {
_, err := expfmt.MetricFamilyToText(w, mf)
return err
}

View file

@ -0,0 +1,116 @@
package prometheus_test
import (
"bytes"
_ "embed"
"io"
"iter"
"testing"
"github.com/google/go-cmp/cmp"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
)
//go:embed testdata/large.txt
var exportTestData []byte
func BenchmarkExport(b *testing.B) {
r := bytes.NewReader(exportTestData)
err := prometheus.Export(io.Discard,
prometheus.AddLabels(prometheus.NewMetricFamilyStream(r),
map[string]string{"installation_id": "abc1231-1231-1231-1231-1231", "hostname": "ec2-1231-1231-1231-1231-1231.us-west-2.compute.amazonaws.com"},
))
require.NoError(b, err)
}
func TestExport(t *testing.T) {
it := func(data []*dto.MetricFamily) iter.Seq2[*dto.MetricFamily, error] {
return func(yield func(*dto.MetricFamily, error) bool) {
for _, mf := range data {
if !yield(mf, nil) {
return
}
}
}
}
tests := []struct {
name string
expected string
input []*dto.MetricFamily
}{
{
name: "single metric family",
expected: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
`,
input: []*dto.MetricFamily{
{
Name: proto.String("http_requests_total"),
Help: proto.String("The total number of HTTP requests."),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{Name: proto.String("method"), Value: proto.String("post")},
{Name: proto.String("code"), Value: proto.String("200")},
},
Counter: &dto.Counter{Value: proto.Float64(1027)},
TimestampMs: proto.Int64(1395066363000),
},
},
},
},
},
{
name: "multiple metric families",
expected: `# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
# TYPE cpu_seconds_total counter
cpu_seconds_total 12345.6
`,
input: []*dto.MetricFamily{
{
Name: proto.String("http_requests_total"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{Name: proto.String("method"), Value: proto.String("post")},
{Name: proto.String("code"), Value: proto.String("200")},
},
Counter: &dto.Counter{Value: proto.Float64(1027)},
TimestampMs: proto.Int64(1395066363000),
},
},
},
{
Name: proto.String("cpu_seconds_total"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Counter: &dto.Counter{Value: proto.Float64(12345.6)},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var w bytes.Buffer
err := prometheus.Export(&w, it(tt.input))
require.NoError(t, err)
got := w.String()
t.Log(got)
diff := cmp.Diff(tt.expected, got)
require.Empty(t, diff)
})
}
}

View file

@ -0,0 +1,36 @@
package prometheus
import (
"iter"
dto "github.com/prometheus/client_model/go"
)
func AddLabels(
src iter.Seq2[*dto.MetricFamily, error],
addLabels map[string]string,
) iter.Seq2[*dto.MetricFamily, error] {
var extra []*dto.LabelPair
for k, v := range addLabels {
k, v := k, v
extra = append(extra, &dto.LabelPair{
Name: &k,
Value: &v,
})
}
return func(yield func(*dto.MetricFamily, error) bool) {
for mf, err := range src {
if err != nil {
yield(nil, err)
return
}
for _, metric := range mf.Metric {
metric.Label = append(metric.Label, extra...)
}
if !yield(mf, nil) {
return
}
}
}
}

View file

@ -0,0 +1,104 @@
package prometheus_test
import (
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
)
func TestAddLabels(t *testing.T) {
tests := []struct {
name string
input string
addLabels map[string]string
expected []*dto.MetricFamily
}{
{
name: "single metric family",
input: `
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
`,
addLabels: map[string]string{"key": "value"},
expected: []*dto.MetricFamily{
{
Name: proto.String("http_requests_total"),
Help: proto.String("The total number of HTTP requests."),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{Name: proto.String("method"), Value: proto.String("post")},
{Name: proto.String("code"), Value: proto.String("200")},
{Name: proto.String("key"), Value: proto.String("value")},
},
Counter: &dto.Counter{Value: proto.Float64(1027)},
TimestampMs: proto.Int64(1395066363000),
},
},
},
},
},
{
name: "multiple metric families",
input: `
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
# TYPE cpu_seconds_total counter
cpu_seconds_total 12345.6
`,
addLabels: map[string]string{"key": "value"},
expected: []*dto.MetricFamily{
{
Name: proto.String("http_requests_total"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{Name: proto.String("method"), Value: proto.String("post")},
{Name: proto.String("code"), Value: proto.String("200")},
{Name: proto.String("key"), Value: proto.String("value")},
},
Counter: &dto.Counter{Value: proto.Float64(1027)},
TimestampMs: proto.Int64(1395066363000),
},
},
},
{
Name: proto.String("cpu_seconds_total"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Counter: &dto.Counter{Value: proto.Float64(12345.6)},
Label: []*dto.LabelPair{
{Name: proto.String("key"), Value: proto.String("value")},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := strings.NewReader(tt.input)
got, err := collect(prometheus.AddLabels(
prometheus.NewMetricFamilyStream(reader),
tt.addLabels,
))
require.NoError(t, err)
diff := cmp.Diff(tt.expected, got, protocmp.Transform(), cmpopts.IgnoreUnexported(dto.MetricFamily{}, dto.Metric{}, dto.LabelPair{}, dto.Counter{}))
require.Empty(t, diff)
})
}
}

View file

@ -2,61 +2,80 @@ package prometheus
import ( import (
"bufio" "bufio"
"bytes"
"errors"
"fmt" "fmt"
"io" "io"
"strings" "iter"
dto "github.com/prometheus/client_model/go" dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
) )
type MetricFamilyStream struct { type metricFamilyStream struct {
reader io.Reader reader io.Reader
scanner *bufio.Scanner scanner *bufio.Scanner
buffer strings.Builder buffer bytes.Buffer
parser expfmt.TextParser
} }
func NewMetricFamilyStream(reader io.Reader) *MetricFamilyStream { func NewMetricFamilyStream(reader io.Reader) iter.Seq2[*dto.MetricFamily, error] {
return &MetricFamilyStream{ mfs := &metricFamilyStream{
reader: reader, reader: reader,
scanner: bufio.NewScanner(reader), scanner: bufio.NewScanner(reader),
} }
return func(yield func(*dto.MetricFamily, error) bool) {
for {
m, err := mfs.Next()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
yield(nil, err)
return
} else if !yield(m, nil) {
return
}
}
}
} }
func (mfs *MetricFamilyStream) Next() (*dto.MetricFamily, error) { func (mfs *metricFamilyStream) Next() (*dto.MetricFamily, error) {
var afterHeader bool var afterHeader bool
var block *strings.Reader for mfs.scanner.Scan() {
for block == nil && mfs.scanner.Scan() { line := mfs.scanner.Bytes()
line := mfs.scanner.Text() if len(line) == 0 {
if line == "" {
continue continue
} }
if line[0] == '#' { if line[0] == '#' {
if afterHeader { if afterHeader {
block = strings.NewReader(mfs.buffer.String()) result, err := mfs.parseMetricFamilyBlock(&mfs.buffer)
mfs.buffer.Reset() mfs.buffer.Reset()
mfs.buffer.Write(line)
mfs.buffer.WriteRune('\n')
return result, err
} }
} else { } else {
afterHeader = true afterHeader = true
} }
mfs.buffer.WriteString(line) mfs.buffer.Write(line)
mfs.buffer.WriteString("\n") mfs.buffer.WriteRune('\n')
} }
if block == nil { if err := mfs.scanner.Err(); err != nil {
if err := mfs.scanner.Err(); err != nil { return nil, err
return nil, err
}
if mfs.buffer.Len() == 0 {
return nil, io.EOF
}
block = strings.NewReader(mfs.buffer.String())
mfs.buffer.Reset()
} }
if mfs.buffer.Len() == 0 {
return nil, io.EOF
}
result, err := mfs.parseMetricFamilyBlock(&mfs.buffer)
mfs.buffer.Reset()
return result, err
}
var parser expfmt.TextParser func (mfs *metricFamilyStream) parseMetricFamilyBlock(r io.Reader) (*dto.MetricFamily, error) {
families, err := parser.TextToMetricFamilies(block) families, err := mfs.parser.TextToMetricFamilies(r)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -1,8 +1,7 @@
package prometheus_test package prometheus_test
import ( import (
"errors" "iter"
"io"
"strings" "strings"
"testing" "testing"
@ -16,6 +15,17 @@ import (
"github.com/pomerium/pomerium/internal/telemetry/prometheus" "github.com/pomerium/pomerium/internal/telemetry/prometheus"
) )
func collect[T any](src iter.Seq2[T, error]) ([]T, error) {
var out []T
for v, err := range src {
if err != nil {
return nil, err
}
out = append(out, v)
}
return out, nil
}
func TestMetricFamilyStream(t *testing.T) { func TestMetricFamilyStream(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -89,23 +99,12 @@ cpu_seconds_total 12345.6
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
reader := strings.NewReader(tt.input) reader := strings.NewReader(tt.input)
metricStream := prometheus.NewMetricFamilyStream(reader) got, err := collect(prometheus.NewMetricFamilyStream(reader))
if tt.wantErr {
var got []*dto.MetricFamily require.Error(t, err)
for { return
mf, err := metricStream.Next()
if errors.Is(err, io.EOF) {
break
}
if (err != nil) != tt.wantErr {
t.Errorf("MetricFamilyStream.Next() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err == nil {
got = append(got, mf)
}
} }
require.NoError(t, err)
diff := cmp.Diff(tt.expected, got, protocmp.Transform(), cmpopts.IgnoreUnexported(dto.MetricFamily{}, dto.Metric{}, dto.LabelPair{}, dto.Counter{})) diff := cmp.Diff(tt.expected, got, protocmp.Transform(), cmpopts.IgnoreUnexported(dto.MetricFamily{}, dto.Metric{}, dto.LabelPair{}, dto.Counter{}))
require.Empty(t, diff) require.Empty(t, diff)
}) })

File diff suppressed because it is too large Load diff