pomerium/internal/telemetry/prometheus/stream.go
2024-12-03 11:17:34 -05:00

91 lines
1.7 KiB
Go

package prometheus
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"iter"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
type metricFamilyStream struct {
reader io.Reader
scanner *bufio.Scanner
buffer bytes.Buffer
parser expfmt.TextParser
}
func NewMetricFamilyStream(reader io.Reader) iter.Seq2[*dto.MetricFamily, error] {
mfs := &metricFamilyStream{
reader: reader,
scanner: bufio.NewScanner(reader),
}
return func(yield func(*dto.MetricFamily, error) bool) {
for {
m, err := mfs.Next()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
yield(nil, err)
return
} else if !yield(m, nil) {
return
}
}
}
}
func (mfs *metricFamilyStream) Next() (*dto.MetricFamily, error) {
var afterHeader bool
for mfs.scanner.Scan() {
line := mfs.scanner.Bytes()
if len(line) == 0 {
continue
}
if line[0] == '#' {
if afterHeader {
result, err := mfs.parseMetricFamilyBlock(&mfs.buffer)
mfs.buffer.Reset()
mfs.buffer.Write(line)
mfs.buffer.WriteRune('\n')
return result, err
}
} else {
afterHeader = true
}
mfs.buffer.Write(line)
mfs.buffer.WriteRune('\n')
}
if err := mfs.scanner.Err(); err != nil {
return nil, err
}
if mfs.buffer.Len() == 0 {
return nil, io.EOF
}
result, err := mfs.parseMetricFamilyBlock(&mfs.buffer)
mfs.buffer.Reset()
return result, err
}
func (mfs *metricFamilyStream) parseMetricFamilyBlock(r io.Reader) (*dto.MetricFamily, error) {
families, err := mfs.parser.TextToMetricFamilies(r)
if err != nil {
return nil, err
}
if len(families) > 1 {
return nil, fmt.Errorf("parse error: multiple metric families")
}
for _, mf := range families {
return mf, nil
}
return nil, io.EOF
}