Merge branch 'wasaga/telemetry-refactor-reporter' into wasaga/telemetry-converter-1

This commit is contained in:
Denis Mishin 2024-05-30 16:41:05 -04:00
commit 7f80c0a8d6
3 changed files with 72 additions and 19 deletions

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"time" "time"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
@ -13,7 +14,7 @@ import (
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/internal/zero/grpcconn" "github.com/pomerium/pomerium/internal/zero/grpcconn"
"github.com/pomerium/pomerium/internal/zero/healthcheck" "github.com/pomerium/pomerium/internal/zero/healthcheck"
metrics_reporter "github.com/pomerium/pomerium/internal/zero/telemetry/reporter" "github.com/pomerium/pomerium/internal/zero/telemetry/reporter"
token_api "github.com/pomerium/pomerium/internal/zero/token" token_api "github.com/pomerium/pomerium/internal/zero/token"
"github.com/pomerium/pomerium/pkg/fanout" "github.com/pomerium/pomerium/pkg/fanout"
cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster" cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster"
@ -25,6 +26,7 @@ type API struct {
cfg *config cfg *config
cluster cluster_api.ClientWithResponsesInterface cluster cluster_api.ClientWithResponsesInterface
telemetryConn *grpc.ClientConn telemetryConn *grpc.ClientConn
reporter *reporter.Reporter
mux *connect_mux.Mux mux *connect_mux.Mux
downloadURLCache *cluster_api.URLCache downloadURLCache *cluster_api.URLCache
tokenFn func(ctx context.Context, ttl time.Duration) (string, error) tokenFn func(ctx context.Context, ttl time.Duration) (string, error)
@ -84,19 +86,29 @@ func NewAPI(ctx context.Context, opts ...Option) (*API, error) {
return nil, fmt.Errorf("error creating OTEL exporter grpc client: %w", err) return nil, fmt.Errorf("error creating OTEL exporter grpc client: %w", err)
} }
reporter, err := reporter.New(ctx, telemetryGRPCConn)
if err != nil {
return nil, fmt.Errorf("error creating metrics reporter: %w", err)
}
return &API{ return &API{
cfg: cfg, cfg: cfg,
cluster: clusterClient, cluster: clusterClient,
mux: connect_mux.New(connect_api.NewConnectClient(connectGRPCConn)), mux: connect_mux.New(connect_api.NewConnectClient(connectGRPCConn)),
telemetryConn: telemetryGRPCConn, telemetryConn: telemetryGRPCConn,
reporter: reporter,
downloadURLCache: cluster_api.NewURLCache(), downloadURLCache: cluster_api.NewURLCache(),
tokenFn: tokenCache.GetToken, tokenFn: tokenCache.GetToken,
}, nil }, nil
} }
// ReportMetrics runs metrics reporting to the cloud // ReportPeriodicMetrics runs periodic metrics collection and reporting to the cloud
func (api *API) ReportMetrics(ctx context.Context, opts ...metrics_reporter.Option) error { func (api *API) ReportPeriodicMetrics(ctx context.Context, opts ...reporter.Option) error {
return metrics_reporter.Run(ctx, api.telemetryConn, opts...) return api.reporter.RunPeriodicMetricReporter(ctx, opts...)
}
func (api *API) ReportMetrics(ctx context.Context, metrics []metricdata.Metrics) error {
return api.reporter.ReportMetrics(ctx, metrics)
} }
// ReportHealthChecks runs health check reporting to the cloud // ReportHealthChecks runs health check reporting to the cloud

View file

@ -143,7 +143,7 @@ func (c *controller) runMetricsReporterLeased(ctx context.Context, client databr
return c.Str("service", "zero-reporter") return c.Str("service", "zero-reporter")
}) })
return c.api.ReportMetrics(ctx, return c.api.ReportPeriodicMetrics(ctx,
reporter.WithCollectInterval(time.Hour), reporter.WithCollectInterval(time.Hour),
reporter.WithMetrics(analytics.Metrics(func() databroker.DataBrokerServiceClient { return client })...), reporter.WithMetrics(analytics.Metrics(func() databroker.DataBrokerServiceClient { return client })...),
) )

View file

@ -4,10 +4,14 @@ package reporter
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"time" "time"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/otel/attribute"
export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
metric_sdk "go.opentelemetry.io/otel/sdk/metric" metric_sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -16,28 +20,51 @@ import (
"github.com/pomerium/pomerium/internal/version" "github.com/pomerium/pomerium/internal/version"
) )
// Run starts loop that pushes metrics via OTEL protocol until ctx is canceled type Reporter struct {
func Run( exporter *export_grpc.Exporter
resource *resource.Resource
}
func New(ctx context.Context, conn *grpc.ClientConn) (*Reporter, error) {
exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("starting OTEL exporter: %w", err)
}
return &Reporter{
exporter: exporter,
resource: getResource(),
}, nil
}
func (r *Reporter) ReportMetrics(ctx context.Context, metrics []metricdata.Metrics) error {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
req := &metricdata.ResourceMetrics{
Resource: r.resource,
ScopeMetrics: []metricdata.ScopeMetrics{
{Metrics: metrics},
},
}
return backoff.RetryNotify(func() error {
return r.exporter.Export(ctx, req)
}, backoff.WithContext(bo, ctx), func(err error, d time.Duration) {
log.Ctx(ctx).Warn().Err(err).Str("retry_in", d.String()).Msg("error exporting metrics")
})
}
// RunPeriodicMetricReporter starts loop that pushes metrics collected periodically via OTEL protocol until ctx is canceled
func (r *Reporter) RunPeriodicMetricReporter(
ctx context.Context, ctx context.Context,
conn *grpc.ClientConn,
opts ...Option, opts ...Option,
) error { ) error {
cfg := getConfig(opts...) cfg := getConfig(opts...)
exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn))
if err != nil {
return fmt.Errorf("starting OTEL exporter: %w", err)
}
defer shutdown(exporter.Shutdown, cfg.shutdownTimeout)
provider := metric_sdk.NewMeterProvider( provider := metric_sdk.NewMeterProvider(
metric_sdk.WithResource(resource.NewSchemaless( metric_sdk.WithResource(r.resource),
semconv.ServiceNameKey.String("pomerium-managed-core"),
semconv.ServiceVersionKey.String(version.FullVersion()),
)),
metric_sdk.WithReader( metric_sdk.WithReader(
metric_sdk.NewPeriodicReader( metric_sdk.NewPeriodicReader(
exporter, r.exporter,
metric_sdk.WithInterval(cfg.collectInterval), metric_sdk.WithInterval(cfg.collectInterval),
))) )))
defer shutdown(provider.Shutdown, cfg.shutdownTimeout) defer shutdown(provider.Shutdown, cfg.shutdownTimeout)
@ -59,3 +86,17 @@ func shutdown(fn func(ctx context.Context) error, timeout time.Duration) {
defer cancel() defer cancel()
_ = fn(ctx) _ = fn(ctx)
} }
func getResource() *resource.Resource {
attr := []attribute.KeyValue{
semconv.ServiceNameKey.String("pomerium-managed-core"),
semconv.ServiceVersionKey.String(version.FullVersion()),
}
hostname, err := os.Hostname()
if err == nil {
attr = append(attr, semconv.HostNameKey.String(hostname))
}
return resource.NewSchemaless(attr...)
}