telemetry: add prometheus streaming converter to OTLP (#5132)

This commit is contained in:
Denis Mishin 2024-06-10 15:39:09 -04:00 committed by GitHub
parent 990517a89e
commit 2b1dcf7355
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 514 additions and 0 deletions

View file

@ -0,0 +1,166 @@
package prometheus
import (
"errors"
"fmt"
"io"
"math"
"time"
dto "github.com/prometheus/client_model/go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// FilterFn is a function that filters metric names
// returns the new metric name and true if the metric should be included
type FilterFn func(metricname string) (string, bool)
// RelabelFn is a function that relabels metric attributes
// returns the new attribute name and true if the attribute should be included
type RelabelFn func(metricname string) (string, bool)
// ToOTLP converts a prometheus metric stream to OTLP metrics
// the filter function is used to filter out unwanted metrics
// the relabel function is used to relabel metric attributes
func ToOTLP(
src io.Reader,
filter FilterFn,
relabel RelabelFn,
startTime time.Time,
now time.Time,
) ([]metricdata.Metrics, error) {
stream := NewMetricFamilyStream(src)
var metrics []metricdata.Metrics
var conversionErrors []error
for {
family, err := stream.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
name, ok := filter(family.GetName())
if !ok {
continue
}
for _, metric := range family.GetMetric() {
data, err := convertMetric(metric, family.GetType(), relabel, startTime, now)
if err != nil {
conversionErrors = append(conversionErrors, fmt.Errorf("%s: %w", family.GetName(), err))
continue
}
metrics = append(metrics, metricdata.Metrics{
Data: data,
Description: family.GetHelp(),
Name: name,
Unit: family.GetUnit(),
})
}
}
return metrics, errors.Join(conversionErrors...)
}
func convertMetric(
src *dto.Metric,
typ dto.MetricType,
relabel RelabelFn,
startTime time.Time,
endTime time.Time,
) (metricdata.Aggregation, error) {
attr := convertLabels(src.GetLabel(), relabel)
switch typ {
case dto.MetricType_COUNTER:
return metricdata.Sum[float64]{
IsMonotonic: true,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attr,
StartTime: startTime,
Time: endTime,
Value: src.GetCounter().GetValue(),
},
},
}, nil
case dto.MetricType_GAUGE:
return metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attr,
StartTime: startTime,
Time: endTime,
Value: src.GetGauge().GetValue(),
},
},
}, nil
case dto.MetricType_HISTOGRAM:
histogram := src.GetHistogram()
bucket := histogram.GetBucket()
return metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attr,
StartTime: startTime,
Time: endTime,
Count: histogram.GetSampleCount(),
Sum: histogram.GetSampleSum(),
Bounds: convertBucketBounds(bucket),
BucketCounts: convertBucketCounts(bucket),
},
},
}, nil
default:
return nil, fmt.Errorf("unknown metric type: %s", typ)
}
}
// convertBucketBounds converts prometheus bucket bounds to OTLP bucket bounds
// the prometheus last bucket may be +Inf, so we need to ignore it, as OTLP implicitly adds it
func convertBucketBounds(
bucket []*dto.Bucket,
) []float64 {
bounds := make([]float64, 0, len(bucket))
for _, b := range bucket {
limit := b.GetUpperBound()
if !math.IsInf(limit, 1) {
bounds = append(bounds, limit)
}
}
return bounds
}
// convertBucketCounts converts prometheus bucket counts to OTLP bucket counts
// the prometheus bucket counts are cumulative, so we need to convert them to
// the count of samples in each bucket
func convertBucketCounts(
bucket []*dto.Bucket,
) []uint64 {
counts := make([]uint64, 0, len(bucket))
var prev uint64
for _, b := range bucket {
count := b.GetCumulativeCount()
counts = append(counts, count-prev)
prev = count
}
return counts
}
func convertLabels(
src []*dto.LabelPair,
relabel RelabelFn,
) attribute.Set {
kvs := make([]attribute.KeyValue, 0, len(src))
for _, label := range src {
if newLabel, ok := relabel(label.GetName()); ok {
kvs = append(kvs, attribute.String(newLabel, label.GetValue()))
}
}
return attribute.NewSet(kvs...)
}

View file

@ -0,0 +1,137 @@
package prometheus_test
import (
"embed"
"path"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
)
//go:embed testdata
var testdata embed.FS
func TestConvert(t *testing.T) {
t.Parallel()
start, err := time.Parse(time.RFC3339, "2024-05-29T00:00:01Z")
require.NoError(t, err)
end, err := time.Parse(time.RFC3339, "2021-05-29T01:00:00Z")
require.NoError(t, err)
cases := []struct {
name string
want []metricdata.Metrics
}{
{
"histogram",
[]metricdata.Metrics{
{
Name: "envoy_cluster_upstream_rq_time",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
attribute.String("pomerium_route_id", "metrics-00083"),
),
StartTime: start,
Time: end,
Count: 2,
Sum: 4.1,
Bounds: []float64{0.5, 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 300000, 600000, 1800000, 3600000},
BucketCounts: []uint64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
},
},
},
},
},
},
{
"counter",
[]metricdata.Metrics{
{
Name: "envoy_cluster_upstream_cx_total",
Data: metricdata.Sum[float64]{
IsMonotonic: true,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(
attribute.String("pomerium_route_id", "route-1"),
),
Value: 2,
StartTime: start,
Time: end,
},
},
},
},
{
Name: "envoy_cluster_upstream_cx_total",
Data: metricdata.Sum[float64]{
IsMonotonic: true,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(
attribute.String("pomerium_route_id", "route-2"),
),
Value: 3,
StartTime: start,
Time: end,
},
},
},
},
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
fd, err := testdata.Open(path.Join("testdata", tc.name+".txt"))
require.NoError(t, err)
defer fd.Close()
got, err := prometheus.ToOTLP(fd,
func(name string) (string, bool) {
return name, true
}, func(label string) (string, bool) {
if label == "envoy_cluster_name" {
return "pomerium_route_id", true
}
return "", false
}, start, end)
require.NoError(t, err)
assert.Empty(t, cmp.Diff(tc.want, got, cmpopts.IgnoreUnexported(
attribute.Set{},
metricdata.Extrema[float64]{},
),
cmp.Comparer(compareAttributeSets),
))
})
}
}
func compareAttributeSets(x, y attribute.Set) bool {
if x.Len() != y.Len() {
return false
}
for _, kv := range x.ToSlice() {
if v, found := y.Value(kv.Key); !found || v != kv.Value {
return false
}
}
return true
}

View file

@ -0,0 +1,72 @@
package prometheus
import (
"bufio"
"fmt"
"io"
"strings"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
type MetricFamilyStream struct {
reader io.Reader
scanner *bufio.Scanner
buffer strings.Builder
}
func NewMetricFamilyStream(reader io.Reader) *MetricFamilyStream {
return &MetricFamilyStream{
reader: reader,
scanner: bufio.NewScanner(reader),
}
}
func (mfs *MetricFamilyStream) Next() (*dto.MetricFamily, error) {
var afterHeader bool
var block *strings.Reader
for block == nil && mfs.scanner.Scan() {
line := mfs.scanner.Text()
if line == "" {
continue
}
if line[0] == '#' {
if afterHeader {
block = strings.NewReader(mfs.buffer.String())
mfs.buffer.Reset()
}
} else {
afterHeader = true
}
mfs.buffer.WriteString(line)
mfs.buffer.WriteString("\n")
}
if block == nil {
if err := mfs.scanner.Err(); err != nil {
return nil, err
}
if mfs.buffer.Len() == 0 {
return nil, io.EOF
}
block = strings.NewReader(mfs.buffer.String())
mfs.buffer.Reset()
}
var parser expfmt.TextParser
families, err := parser.TextToMetricFamilies(block)
if err != nil {
return nil, err
}
if len(families) > 1 {
return nil, fmt.Errorf("parse error: multiple metric families")
}
for _, mf := range families {
return mf, nil
}
return nil, io.EOF
}

View file

@ -0,0 +1,113 @@
package prometheus_test
import (
"errors"
"io"
"strings"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
)
func TestMetricFamilyStream(t *testing.T) {
tests := []struct {
name string
input string
expected []*dto.MetricFamily
wantErr bool
}{
{
name: "single metric family",
input: `
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
`,
expected: []*dto.MetricFamily{
{
Name: proto.String("http_requests_total"),
Help: proto.String("The total number of HTTP requests."),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{Name: proto.String("method"), Value: proto.String("post")},
{Name: proto.String("code"), Value: proto.String("200")},
},
Counter: &dto.Counter{Value: proto.Float64(1027)},
TimestampMs: proto.Int64(1395066363000),
},
},
},
},
wantErr: false,
},
{
name: "multiple metric families",
input: `
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
# TYPE cpu_seconds_total counter
cpu_seconds_total 12345.6
`,
expected: []*dto.MetricFamily{
{
Name: proto.String("http_requests_total"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{Name: proto.String("method"), Value: proto.String("post")},
{Name: proto.String("code"), Value: proto.String("200")},
},
Counter: &dto.Counter{Value: proto.Float64(1027)},
TimestampMs: proto.Int64(1395066363000),
},
},
},
{
Name: proto.String("cpu_seconds_total"),
Type: dto.MetricType_COUNTER.Enum(),
Metric: []*dto.Metric{
{
Counter: &dto.Counter{Value: proto.Float64(12345.6)},
},
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := strings.NewReader(tt.input)
metricStream := prometheus.NewMetricFamilyStream(reader)
var got []*dto.MetricFamily
for {
mf, err := metricStream.Next()
if errors.Is(err, io.EOF) {
break
}
if (err != nil) != tt.wantErr {
t.Errorf("MetricFamilyStream.Next() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err == nil {
got = append(got, mf)
}
}
diff := cmp.Diff(tt.expected, got, protocmp.Transform(), cmpopts.IgnoreUnexported(dto.MetricFamily{}, dto.Metric{}, dto.LabelPair{}, dto.Counter{}))
require.Empty(t, diff)
})
}
}

View file

@ -0,0 +1,3 @@
# TYPE envoy_cluster_upstream_cx_total counter
envoy_cluster_upstream_cx_total{service="pomerium-proxy",envoy_cluster_name="route-1",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 2
envoy_cluster_upstream_cx_total{service="pomerium-proxy",envoy_cluster_name="route-2",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 3

View file

@ -0,0 +1,23 @@
# TYPE envoy_cluster_upstream_rq_time histogram
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="0.5"} 0
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="1"} 0
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="5"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="10"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="25"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="50"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="100"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="250"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="500"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="1000"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="2500"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="5000"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="10000"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="30000"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="60000"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="300000"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="600000"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="1.8e+06"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="3.6e+06"} 2
envoy_cluster_upstream_rq_time_bucket{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb",le="+Inf"} 2
envoy_cluster_upstream_rq_time_sum{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 4.1
envoy_cluster_upstream_rq_time_count{service="pomerium-proxy",envoy_cluster_name="metrics-00083",installation_id="aecd6525-9eaa-448d-93d9-6363c04b1ccb",hostname="pomerium-proxy-55589cc5f-fjhsb"} 2