merge main, wire with rest of funcs

This commit is contained in:
Denis Mishin 2024-06-12 22:34:45 -04:00
parent 74c5bfbbd8
commit c945b08bcf
4 changed files with 83 additions and 24 deletions

View file

@ -8,7 +8,6 @@ import (
"time"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@ -63,6 +62,12 @@ func WithIncludeLabels(labels ...string) ProducerOption {
}
}
func WithScrapeURL(scrapeURL string) ProducerOption {
return func(cfg *producerConfig) {
cfg.scrapeURL = scrapeURL
}
}
func (cfg *producerConfig) Validate() error {
if cfg.client == nil {
return fmt.Errorf("HTTP client is required")
@ -89,31 +94,37 @@ func newProducerConfig(opts ...ProducerOption) (*producerConfig, error) {
return cfg, nil
}
type producer struct {
type Producer struct {
producerConfig atomic.Value
}
func NewProducer(opts ...ProducerOption) (metric.Producer, error) {
func NewProducer(opts ...ProducerOption) (*Producer, error) {
cfg, err := newProducerConfig(opts...)
if err != nil {
return nil, err
}
p := new(producer)
p := new(Producer)
p.setConfig(cfg)
return p, nil
}
func (p *producer) SetConfig(opts ...ProducerOption) {
func (p *Producer) SetConfig(opts ...ProducerOption) error {
cfg, err := newProducerConfig(opts...)
if err != nil {
panic(err)
return err
}
p.setConfig(cfg)
return nil
}
func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
func (p *Producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
cfg := p.loadConfig()
if len(cfg.metrics) == 0 {
return nil, nil
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, cfg.scrapeURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
@ -135,11 +146,11 @@ func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, erro
}, nil
}
func (p *producer) setConfig(cfg *producerConfig) {
func (p *Producer) setConfig(cfg *producerConfig) {
p.producerConfig.Store(cfg)
}
func (p *producer) loadConfig() *producerConfig {
func (p *Producer) loadConfig() *producerConfig {
return p.producerConfig.Load().(*producerConfig)
}

View file

@ -13,12 +13,10 @@ import (
sdk "github.com/pomerium/pomerium/internal/zero/api"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/pomerium/pomerium/internal/zero/bootstrap/writers"
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/internal/zero/reconciler"
"github.com/pomerium/pomerium/internal/zero/telemetry/reporter"
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/zero/connect"
)
// Run runs Pomerium is managed mode using the provided token.
@ -140,13 +138,6 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error {
}
defer c.shutdownTelemetry(ctx)
err = c.api.Watch(ctx, connect_mux.WithOnTelemetryRequested(func(ctx context.Context, _ *connect.TelemetryRequest) {
c.telemetryReporter.CollectAndExportMetrics(ctx)
}))
if err != nil {
return fmt.Errorf("watch telemetry: %w", err)
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return r.Run(ctx,

View file

@ -3,35 +3,83 @@ package controller
import (
"context"
"fmt"
"net"
"net/url"
"time"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/sdk/instrumentation"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/prometheus"
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/internal/zero/healthcheck"
"github.com/pomerium/pomerium/internal/zero/telemetry/reporter"
"github.com/pomerium/pomerium/internal/zero/telemetry/sessions"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/health"
"github.com/pomerium/pomerium/pkg/zero/connect"
)
const (
producerSessionAnalytics = "session-analytics"
producerEnvoy = "envoy"
)
func (c *controller) initTelemetry(ctx context.Context, clientProvider func() (databroker.DataBrokerServiceClient, error)) error {
sessionMetricProducer := sessions.NewProducer(instrumentation.Scope{}, clientProvider)
startTime := time.Now()
sessionMetricProducer := sessions.NewProducer(instrumentation.Scope{Name: "cluster"}, clientProvider)
envoyMetricProducer, err := prometheus.NewProducer(c.buildEnvoyMetricProducerOptions(nil, nil, startTime)...)
if err != nil {
return fmt.Errorf("error creating envoy metric producer: %w", err)
}
r, err := reporter.New(ctx, c.api.GetTelemetryConn(),
reporter.WithProducer(producerSessionAnalytics, sessionMetricProducer),
reporter.WithProducer(producerEnvoy, envoyMetricProducer),
)
if err != nil {
return fmt.Errorf("error creating telemetry metrics reporter: %w", err)
}
err = c.api.Watch(ctx, connect_mux.WithOnTelemetryRequested(func(ctx context.Context, req *connect.TelemetryRequest) {
sessionMetricProducer.SetEnabled(req.GetSessionAnalytics() != nil)
if envoyMetricRequest := req.GetEnvoyMetrics(); envoyMetricRequest != nil {
opts := c.buildEnvoyMetricProducerOptions(envoyMetricRequest.GetMetrics(), envoyMetricRequest.GetLabels(), startTime)
err := envoyMetricProducer.SetConfig(opts...)
if err != nil {
log.Warn(ctx).Err(err).Msg("failed to set envoy metric producer options")
}
} else {
_ = envoyMetricProducer.SetConfig(c.buildEnvoyMetricProducerOptions(nil, nil, startTime)...)
}
c.telemetryReporter.CollectAndExportMetrics(ctx)
}))
if err != nil {
return fmt.Errorf("watch telemetry: %w", err)
}
c.telemetryReporter = r
return nil
}
func (c *controller) buildEnvoyMetricProducerOptions(metrics, labels []string, startTime time.Time) []prometheus.ProducerOption {
return []prometheus.ProducerOption{
prometheus.WithIncludeMetrics(metrics...),
prometheus.WithIncludeLabels(labels...),
prometheus.WithScope(instrumentation.Scope{Name: "envoy"}),
prometheus.WithScrapeURL((&url.URL{
Scheme: "http",
Host: net.JoinHostPort("localhost", c.bootstrapConfig.GetConfig().OutboundPort),
Path: "/envoy/stats/prometheus",
}).String()),
prometheus.WithStartTime(startTime),
}
}
func (c *controller) shutdownTelemetry(ctx context.Context) {
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), c.cfg.shutdownTimeout)
defer cancel()

View file

@ -3,32 +3,37 @@ package sessions
import (
"context"
"fmt"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
type producer struct {
type Producer struct {
scope instrumentation.Scope
clientProvider func() (databroker.DataBrokerServiceClient, error)
enabled atomic.Bool
}
func NewProducer(
scope instrumentation.Scope,
clientProvider func() (databroker.DataBrokerServiceClient, error),
) metric.Producer {
return &producer{
) *Producer {
return &Producer{
clientProvider: clientProvider,
scope: scope,
}
}
func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
func (p *Producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
if !p.enabled.Load() {
return nil, nil
}
client, err := p.clientProvider()
if err != nil {
return nil, fmt.Errorf("error getting client: %w", err)
@ -73,3 +78,7 @@ func (p *producer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, erro
},
}, nil
}
func (p *Producer) SetEnabled(enabled bool) {
p.enabled.Store(enabled)
}