add metrics aggregation (#3452)

This commit is contained in:
Denis Mishin 2022-06-30 10:52:45 -04:00 committed by GitHub
parent 86625a4ddb
commit f67b33484b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 229 additions and 77 deletions

View file

@ -4,9 +4,12 @@ import (
"crypto/tls"
"github.com/pomerium/pomerium/internal/hashutil"
"github.com/pomerium/pomerium/internal/netutil"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
)
// MetricsScrapeEndpoint defines additional metrics endpoints that would be scraped and exposed by pomerium
type MetricsScrapeEndpoint metrics.ScrapeEndpoint
// Config holds pomerium configuration options.
type Config struct {
Options *Options
@ -23,12 +26,21 @@ type Config struct {
MetricsPort string
// DebugPort is the port the debug listener is running on.
DebugPort string
// MetricsScrapeEndpoints additional metrics endpoints to scrape and provide part of metrics
MetricsScrapeEndpoints []MetricsScrapeEndpoint
}
// Clone creates a clone of the config.
func (cfg *Config) Clone() *Config {
newOptions := new(Options)
*newOptions = *cfg.Options
if cfg.Options != nil {
*newOptions = *cfg.Options
}
endpoints := make([]MetricsScrapeEndpoint, len(cfg.MetricsScrapeEndpoints))
_ = copy(endpoints, cfg.MetricsScrapeEndpoints)
return &Config{
Options: newOptions,
AutoCertificates: cfg.AutoCertificates,
@ -39,6 +51,8 @@ func (cfg *Config) Clone() *Config {
OutboundPort: cfg.OutboundPort,
MetricsPort: cfg.MetricsPort,
DebugPort: cfg.DebugPort,
MetricsScrapeEndpoints: endpoints,
}
}
@ -61,17 +75,10 @@ func (cfg *Config) Checksum() uint64 {
}
// AllocatePorts populates
func (cfg *Config) AllocatePorts() error {
ports, err := netutil.AllocatePorts(5)
if err != nil {
return err
}
func (cfg *Config) AllocatePorts(ports [5]string) {
cfg.GRPCPort = ports[0]
cfg.HTTPPort = ports[1]
cfg.OutboundPort = ports[2]
cfg.MetricsPort = ports[3]
cfg.DebugPort = ports[4]
return nil
}

View file

@ -13,6 +13,7 @@ import (
"github.com/pomerium/pomerium/internal/fileutil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/pkg/netutil"
)
// A ChangeListener is called when configuration changes.
@ -115,10 +116,13 @@ func NewFileOrEnvironmentSource(
EnvoyVersion: envoyVersion,
}
if err = cfg.AllocatePorts(); err != nil {
ports, err := netutil.AllocatePorts(5)
if err != nil {
return nil, fmt.Errorf("allocating ports: %w", err)
}
cfg.AllocatePorts(*(*[5]string)(ports))
metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true)
src := &FileOrEnvironmentSource{

View file

@ -5,7 +5,9 @@ import (
"net/http"
"net/url"
"os"
"reflect"
"sync"
"time"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/middleware"
@ -15,6 +17,11 @@ import (
"github.com/rs/zerolog"
)
const (
// defaultMetricsTimeout sets max time to collect and send aggregate pomerium metrics
defaultMetricsTimeout = time.Second * 30
)
// A MetricsManager manages metrics for a given configuration.
type MetricsManager struct {
mu sync.RWMutex
@ -24,6 +31,7 @@ type MetricsManager struct {
basicAuth string
envoyAdminAddress string
handler http.Handler
endpoints []MetricsScrapeEndpoint
}
// NewMetricsManager creates a new MetricsManager.
@ -80,10 +88,7 @@ func (mgr *MetricsManager) updateInfo(ctx context.Context, cfg *Config) {
}
func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) {
if cfg.Options.MetricsAddr == mgr.addr &&
cfg.Options.MetricsBasicAuth == mgr.basicAuth &&
cfg.Options.InstallationID == mgr.installationID &&
cfg.Options.EnvoyAdminAddress == mgr.envoyAdminAddress {
if mgr.configUnchanged(cfg) {
return
}
@ -98,13 +103,12 @@ func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) {
return
}
envoyURL, err := url.Parse("http://" + cfg.Options.EnvoyAdminAddress)
if err != nil {
log.Error(ctx).Err(err).Msg("metrics: invalid envoy admin address, disabling")
return
}
handler, err := metrics.PrometheusHandler(envoyURL, mgr.installationID)
mgr.endpoints = append(cfg.MetricsScrapeEndpoints,
MetricsScrapeEndpoint{
Name: "envoy",
URL: url.URL{Scheme: "http", Host: cfg.Options.EnvoyAdminAddress, Path: "/stats/prometheus"},
})
handler, err := metrics.PrometheusHandler(toInternalEndpoints(mgr.endpoints), mgr.installationID, defaultMetricsTimeout)
if err != nil {
log.Error(ctx).Err(err).Msg("metrics: failed to create prometheus handler")
return
@ -116,3 +120,19 @@ func (mgr *MetricsManager) updateServer(ctx context.Context, cfg *Config) {
mgr.handler = handler
}
func (mgr *MetricsManager) configUnchanged(cfg *Config) bool {
return cfg.Options.MetricsAddr == mgr.addr &&
cfg.Options.MetricsBasicAuth == mgr.basicAuth &&
cfg.Options.InstallationID == mgr.installationID &&
cfg.Options.EnvoyAdminAddress == mgr.envoyAdminAddress &&
reflect.DeepEqual(mgr.endpoints, cfg.MetricsScrapeEndpoints)
}
func toInternalEndpoints(src []MetricsScrapeEndpoint) []metrics.ScrapeEndpoint {
dst := make([]metrics.ScrapeEndpoint, 0, len(src))
for _, e := range src {
dst = append(dst, metrics.ScrapeEndpoint(e))
}
return dst
}

View file

@ -1,29 +1,47 @@
package metrics
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"sync"
"time"
ocprom "contrib.go.opencensus.io/exporter/prometheus"
"github.com/hashicorp/go-multierror"
prom "github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.opencensus.io/stats/view"
"google.golang.org/protobuf/proto"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/metrics"
log "github.com/pomerium/pomerium/internal/log"
)
// ScrapeEndpoint external endpoints to scrape and decorate
type ScrapeEndpoint struct {
// Name is the logical name of the endpoint
Name string
// URL of the endpoint to scrape that must output a prometheus-style metrics
URL url.URL
// Labels to append to each metric records
Labels map[string]string
}
func (e *ScrapeEndpoint) String() string {
return fmt.Sprintf("%s(%s)", e.Name, e.URL.String())
}
// PrometheusHandler creates an exporter that exports stats to Prometheus
// and returns a handler suitable for exporting metrics.
func PrometheusHandler(envoyURL *url.URL, installationID string) (http.Handler, error) {
func PrometheusHandler(endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) (http.Handler, error) {
exporter, err := getGlobalExporter()
if err != nil {
return nil, err
@ -31,12 +49,7 @@ func PrometheusHandler(envoyURL *url.URL, installationID string) (http.Handler,
mux := http.NewServeMux()
envoyMetricsURL, err := envoyURL.Parse("/stats/prometheus")
if err != nil {
return nil, fmt.Errorf("telemetry/metrics: invalid proxy URL: %w", err)
}
mux.Handle("/metrics", newProxyMetricsHandler(exporter, *envoyMetricsURL, installationID))
mux.Handle("/metrics", newProxyMetricsHandler(exporter, endpoints, installationID, timeout))
return mux, nil
}
@ -79,54 +92,73 @@ func registerDefaultViews() error {
}
// newProxyMetricsHandler creates a subrequest to the envoy control plane for metrics and
// combines them with our own
func newProxyMetricsHandler(exporter *ocprom.Exporter, envoyURL url.URL, installationID string) http.HandlerFunc {
hostname, err := os.Hostname()
if err != nil {
hostname = "__none__"
}
extraLabels := []*io_prometheus_client.LabelPair{{
Name: proto.String(metrics.InstallationIDLabel),
Value: proto.String(installationID),
}, {
Name: proto.String(metrics.HostnameLabel),
Value: proto.String(hostname),
}}
// combines them with internal envoy-provided
func newProxyMetricsHandler(exporter *ocprom.Exporter, endpoints []ScrapeEndpoint, installationID string, timeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Ensure we don't get entangled with compression from ocprom
r.Header.Del("Accept-Encoding")
ctx, cancel := context.WithTimeout(r.Context(), timeout)
defer cancel()
rec := httptest.NewRecorder()
exporter.ServeHTTP(rec, r)
err := writeMetricsWithLabels(w, rec.Body, extraLabels)
if err != nil {
log.Error(r.Context()).Err(err).Send()
return
}
req, err := http.NewRequestWithContext(r.Context(), "GET", envoyURL.String(), nil)
if err != nil {
log.Error(r.Context()).Err(err).Msg("telemetry/metrics: failed to create request for envoy")
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Error(r.Context()).Err(err).Msg("telemetry/metrics: fail to fetch proxy metrics")
return
}
defer resp.Body.Close()
err = writeMetricsWithLabels(w, resp.Body, extraLabels)
if err != nil {
log.Error(r.Context()).Err(err).Send()
return
labels := getCommonLabels(installationID)
if err := writeMetricsMux(ctx, w, append(
scrapeEndpoints(endpoints, labels),
ocExport("pomerium", exporter, r, labels)),
); err != nil {
log.Error(ctx).Msg("responding to metrics request")
}
}
}
type promProducerResult struct {
name string
src io.ReadCloser
labels []*io_prometheus_client.LabelPair
err error
}
// promProducerFn returns a reader containing prometheus-style metrics and additional labels to add to each record
type promProducerFn func(context.Context) promProducerResult
// writeMetricsMux runs producers concurrently and pipes output to destination yet avoiding data interleaving
func writeMetricsMux(ctx context.Context, w io.Writer, producers []promProducerFn) error {
results := make(chan promProducerResult)
for _, p := range producers {
go func(fn promProducerFn) {
results <- fn(ctx)
}(p)
}
var errs *multierror.Error
loop_producers:
for i := 0; i < len(producers); i++ {
select {
case <-ctx.Done():
err := fmt.Errorf("processed %d metric producers out of %d: %w", i, len(producers), ctx.Err())
errs = multierror.Append(errs, err, writePrometheusComment(w, err.Error()))
break loop_producers
case res := <-results:
if err := writeMetricsResult(w, res); err != nil {
errs = multierror.Append(errs, fmt.Errorf("%s: %w", res.name, err))
}
}
}
return errs.ErrorOrNil()
}
func writeMetricsResult(w io.Writer, res promProducerResult) error {
if res.err != nil {
return fmt.Errorf("fetch: %w", res.err)
}
if err := writeMetricsWithLabels(w, res.src, res.labels); err != nil {
return fmt.Errorf("%s: write: %w", res.name, err)
}
if err := res.src.Close(); err != nil {
return fmt.Errorf("%s: close: %w", res.name, err)
}
return nil
}
func writeMetricsWithLabels(w io.Writer, r io.Reader, extra []*io_prometheus_client.LabelPair) error {
var parser expfmt.TextParser
ms, err := parser.TextToMetricFamilies(r)
@ -146,3 +178,91 @@ func writeMetricsWithLabels(w io.Writer, r io.Reader, extra []*io_prometheus_cli
return nil
}
func writePrometheusComment(w io.Writer, txt string) error {
lines := strings.Split(txt, "\n")
for _, line := range lines {
if _, err := w.Write([]byte(fmt.Sprintf("# %s\n", line))); err != nil {
return fmt.Errorf("write prometheus comment: %w", err)
}
}
return nil
}
func ocExport(name string, exporter *ocprom.Exporter, r *http.Request, labels []*io_prometheus_client.LabelPair) promProducerFn {
return func(context.Context) promProducerResult {
// Ensure we don't get entangled with compression from ocprom
r.Header.Del("Accept-Encoding")
rec := httptest.NewRecorder()
exporter.ServeHTTP(rec, r)
if rec.Code/100 != 2 {
return promProducerResult{name: name, err: errors.New(rec.Result().Status)}
}
return promProducerResult{
name: name,
src: rec.Result().Body,
labels: labels,
}
}
}
func scrapeEndpoints(endpoints []ScrapeEndpoint, labels []*io_prometheus_client.LabelPair) []promProducerFn {
out := make([]promProducerFn, 0, len(endpoints))
for _, endpoint := range endpoints {
out = append(out, scrapeEndpoint(endpoint, labels))
}
return out
}
func scrapeEndpoint(endpoint ScrapeEndpoint, labels []*io_prometheus_client.LabelPair) promProducerFn {
return func(ctx context.Context) promProducerResult {
name := fmt.Sprintf("%s %s", endpoint.Name, endpoint.URL.String())
req, err := http.NewRequestWithContext(ctx, "GET", endpoint.URL.String(), nil)
if err != nil {
return promProducerResult{name: name, err: fmt.Errorf("make request: %w", err)}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return promProducerResult{name: name, err: fmt.Errorf("request: %w", err)}
}
if resp.StatusCode/100 != 2 {
return promProducerResult{name: name, err: errors.New(resp.Status)}
}
return promProducerResult{
name: name,
src: resp.Body,
labels: append(toPrometheusLabels(endpoint.Labels), labels...),
}
}
}
func getCommonLabels(installationID string) []*io_prometheus_client.LabelPair {
hostname, err := os.Hostname()
if err != nil {
hostname = "__none__"
}
return []*io_prometheus_client.LabelPair{{
Name: proto.String(metrics.InstallationIDLabel),
Value: proto.String(installationID),
}, {
Name: proto.String(metrics.HostnameLabel),
Value: proto.String(hostname),
}}
}
func toPrometheusLabels(labels map[string]string) []*io_prometheus_client.LabelPair {
out := make([]*io_prometheus_client.LabelPair, 0, len(labels))
for k, v := range labels {
out = append(out, &io_prometheus_client.LabelPair{
Name: proto.String(k),
Value: proto.String(v),
})
}
return out
}

View file

@ -7,6 +7,7 @@ import (
"net/url"
"regexp"
"testing"
"time"
)
func newEnvoyMetricsHandler() http.HandlerFunc {
@ -28,7 +29,7 @@ envoy_server_initialization_time_ms_bucket{le="1000"} 1
}
func getMetrics(t *testing.T, envoyURL *url.URL) []byte {
h, err := PrometheusHandler(envoyURL, "test_installation_id")
h, err := PrometheusHandler([]ScrapeEndpoint{{Name: "envoy", URL: *envoyURL}}, "test_installation_id", time.Second*20)
if err != nil {
t.Fatal(err)
}

View file

@ -176,7 +176,7 @@ func setupAuthorize(ctx context.Context, src config.Source, controlPlane *contro
}
envoy_service_auth_v3.RegisterAuthorizationServer(controlPlane.GRPCServer, svc)
log.Info(context.TODO()).Msg("enabled authorize service")
log.Info(ctx).Msg("enabled authorize service")
src.OnConfigChange(ctx, svc.OnConfigChange)
svc.OnConfigChange(ctx, src.GetConfig())
return svc, nil
@ -192,7 +192,7 @@ func setupDataBroker(ctx context.Context,
return nil, fmt.Errorf("error creating databroker service: %w", err)
}
svc.Register(controlPlane.GRPCServer)
log.Info(context.TODO()).Msg("enabled databroker service")
log.Info(ctx).Msg("enabled databroker service")
src.OnConfigChange(ctx, svc.OnConfigChange)
svc.OnConfigChange(ctx, src.GetConfig())
return svc, nil
@ -219,7 +219,7 @@ func setupProxy(ctx context.Context, src config.Source, controlPlane *controlpla
return fmt.Errorf("error adding proxy service to control plane: %w", err)
}
log.Info(context.TODO()).Msg("enabled proxy service")
log.Info(ctx).Msg("enabled proxy service")
src.OnConfigChange(ctx, svc.OnConfigChange)
svc.OnConfigChange(ctx, src.GetConfig())