zero/telemetry: add reporter (#4855)

This commit is contained in:
Denis Mishin 2023-12-20 14:53:06 -05:00 committed by GitHub
parent 3adbc65d37
commit faa2a8652b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 425 additions and 203 deletions

View file

@ -10,7 +10,7 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
// Collect collects metrics and reports them to the cloud
// Collect collects metrics and stores them in the databroker
func Collect(
ctx context.Context,
client databroker.DataBrokerServiceClient,

View file

@ -0,0 +1,47 @@
package analytics
import (
"context"
"go.opentelemetry.io/otel/metric"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
// Metrics returns a list of metrics to be exported
func Metrics(
clientProvider func() databroker.DataBrokerServiceClient,
) []func(m metric.Meter) error {
return []func(m metric.Meter) error{
registerMetric("dau", clientProvider),
registerMetric("mau", clientProvider),
}
}
func registerMetric(
id string,
clientProvider func() databroker.DataBrokerServiceClient,
) func(m metric.Meter) error {
return func(m metric.Meter) error {
_, err := m.Int64ObservableGauge(id,
metric.WithInt64Callback(metricCallback(id, clientProvider)),
)
return err
}
}
func metricCallback(
id string,
clientProvider func() databroker.DataBrokerServiceClient,
) metric.Int64Callback {
return func(ctx context.Context, result metric.Int64Observer) error {
state, err := LoadMetricState(ctx, clientProvider(), id)
if err != nil {
log.Ctx(ctx).Error().Err(err).Str("metric", id).Msg("error loading metric state")
return nil // returning an error would block export of other metrics according to SDK design
}
result.Observe(int64(state.Count))
return nil
}
}

View file

@ -4,9 +4,12 @@ package zero
import (
"context"
"fmt"
"time"
"github.com/pomerium/pomerium/internal/zero/apierror"
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
"github.com/pomerium/pomerium/internal/zero/grpcconn"
"github.com/pomerium/pomerium/internal/zero/reporter"
token_api "github.com/pomerium/pomerium/internal/zero/token"
"github.com/pomerium/pomerium/pkg/fanout"
cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster"
@ -19,8 +22,18 @@ type API struct {
cluster cluster_api.ClientWithResponsesInterface
mux *connect_mux.Mux
downloadURLCache *cluster_api.URLCache
tokenFn func(ctx context.Context, ttl time.Duration) (string, error)
}
const (
// access tokens are only good for an hour,
// and they define the maximum connection time,
// so we want it to be as close to the max as possible for the streaming gRPC connection
minConnectTokenTTL = time.Minute * 55
minTelemetryTokenTTL = time.Minute * 5
)
// WatchOption defines which events to watch for
type WatchOption = connect_mux.WatchOption
@ -45,19 +58,33 @@ func NewAPI(ctx context.Context, opts ...Option) (*API, error) {
return nil, fmt.Errorf("error creating cluster client: %w", err)
}
connectClient, err := connect_api.NewAuthorizedConnectClient(ctx, cfg.connectAPIEndpoint, tokenCache.GetToken)
connectGRPCConn, err := grpcconn.New(ctx, cfg.connectAPIEndpoint, func(ctx context.Context) (string, error) {
return tokenCache.GetToken(ctx, minConnectTokenTTL)
})
if err != nil {
return nil, fmt.Errorf("error creating connect client: %w", err)
return nil, fmt.Errorf("error creating connect grpc client: %w", err)
}
return &API{
cfg: cfg,
cluster: clusterClient,
mux: connect_mux.New(connectClient),
mux: connect_mux.New(connect_api.NewConnectClient(connectGRPCConn)),
downloadURLCache: cluster_api.NewURLCache(),
tokenFn: tokenCache.GetToken,
}, nil
}
// Report runs metrics reporting to the cloud
func (api *API) Report(ctx context.Context, opts ...reporter.Option) error {
conn, err := grpcconn.New(ctx, api.cfg.otelEndpoint, func(ctx context.Context) (string, error) {
return api.tokenFn(ctx, minTelemetryTokenTTL)
})
if err != nil {
return fmt.Errorf("error creating OTEL exporter grpc client: %w", err)
}
return reporter.Run(ctx, conn, opts...)
}
// Connect connects to the connect API and allows watching for changes
func (api *API) Connect(ctx context.Context, opts ...fanout.Option) error {
return api.mux.Run(ctx, opts...)

View file

@ -12,6 +12,7 @@ type Option func(*config)
type config struct {
clusterAPIEndpoint string
connectAPIEndpoint string
otelEndpoint string
apiToken string
httpClient *http.Client
downloadURLCacheTTL time.Duration
@ -31,6 +32,13 @@ func WithConnectAPIEndpoint(endpoint string) Option {
}
}
// WithOTELEndpoint sets the OTEL API endpoint
func WithOTELEndpoint(endpoint string) Option {
return func(cfg *config) {
cfg.otelEndpoint = endpoint
}
}
// WithAPIToken sets the API token
func WithAPIToken(token string) Option {
return func(cfg *config) {
@ -77,6 +85,9 @@ func (c *config) validate() error {
if c.connectAPIEndpoint == "" {
return fmt.Errorf("connect API endpoint is required")
}
if c.otelEndpoint == "" {
return fmt.Errorf("OTEL API endpoint is required")
}
if c.apiToken == "" {
return fmt.Errorf("API token is required")
}

View file

@ -38,6 +38,7 @@ func Run(ctx context.Context, configFile string) error {
controller.WithAPIToken(token),
controller.WithClusterAPIEndpoint(getClusterAPIEndpoint()),
controller.WithConnectAPIEndpoint(getConnectAPIEndpoint()),
controller.WithOTELAPIEndpoint(getOTELAPIEndpoint()),
controller.WithBootstrapConfigFileName(bootstrapConfigFileName),
)
}

View file

@ -43,3 +43,10 @@ func getClusterAPIEndpoint() string {
}
return "https://console.pomerium.app/cluster/v1"
}
func getOTELAPIEndpoint() string {
if endpoint := os.Getenv("POMERIUM_OTEL_ENDPOINT"); endpoint != "" {
return endpoint
}
return "https://telemetry.pomerium.app"
}

View file

@ -9,6 +9,7 @@ type controllerConfig struct {
apiToken string
clusterAPIEndpoint string
connectAPIEndpoint string
otelEndpoint string
tmpDir string
bootstrapConfigFileName string
@ -38,6 +39,13 @@ func WithConnectAPIEndpoint(endpoint string) Option {
}
}
// WithOTELAPIEndpoint sets the endpoint to use for the OTEL API
func WithOTELAPIEndpoint(endpoint string) Option {
return func(c *controllerConfig) {
c.otelEndpoint = endpoint
}
}
// WithAPIToken sets the API token to use for authentication.
func WithAPIToken(token string) Option {
return func(c *controllerConfig) {

View file

@ -15,6 +15,7 @@ import (
sdk "github.com/pomerium/pomerium/internal/zero/api"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/pomerium/pomerium/internal/zero/reconciler"
"github.com/pomerium/pomerium/internal/zero/reporter"
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
@ -46,6 +47,7 @@ func Run(ctx context.Context, opts ...Option) error {
eg.Go(func() error { return run(ctx, "zero-reconciler", c.runReconciler, src.WaitReady) })
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics, src.WaitReady) })
eg.Go(func() error { return run(ctx, "zero-reporter", c.runReporter, src.WaitReady) })
return eg.Wait()
}
@ -64,6 +66,7 @@ func (c *controller) initAPI(ctx context.Context) error {
sdk.WithClusterAPIEndpoint(c.cfg.clusterAPIEndpoint),
sdk.WithAPIToken(c.cfg.apiToken),
sdk.WithConnectAPIEndpoint(c.cfg.connectAPIEndpoint),
sdk.WithOTELEndpoint(c.cfg.otelEndpoint),
)
if err != nil {
return fmt.Errorf("error initializing cloud api: %w", err)
@ -122,7 +125,11 @@ func (c *controller) runReconciler(ctx context.Context) error {
}
func (c *controller) runAnalytics(ctx context.Context) error {
err := analytics.Collect(ctx, c.GetDataBrokerServiceClient(), time.Second*30)
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-analytics")
})
err := analytics.Collect(ctx, c.GetDataBrokerServiceClient(), time.Hour)
if err != nil && ctx.Err() == nil {
log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling")
return nil
@ -130,3 +137,14 @@ func (c *controller) runAnalytics(ctx context.Context) error {
return err
}
func (c *controller) runReporter(ctx context.Context) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "zero-reporter")
})
return c.api.Report(ctx,
reporter.WithCollectInterval(time.Hour),
reporter.WithMetrics(analytics.Metrics(c.GetDataBrokerServiceClient)...),
)
}

View file

@ -0,0 +1,115 @@
// Package grpcconn provides a gRPC client with authentication
package grpcconn
import (
"context"
"fmt"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"github.com/pomerium/pomerium/internal/log"
)
type client struct {
config *config
tokenProvider TokenProviderFn
}
// TokenProviderFn is a function that returns an authorization token
type TokenProviderFn func(ctx context.Context) (string, error)
// New creates a new gRPC client with authentication
func New(
ctx context.Context,
endpoint string,
tokenProvider TokenProviderFn,
) (*grpc.ClientConn, error) {
cfg, err := getConfig(endpoint)
if err != nil {
return nil, err
}
cc := &client{
tokenProvider: tokenProvider,
config: cfg,
}
conn, err := cc.getGRPCConn(ctx)
if err != nil {
return nil, err
}
return conn, err
}
func (c *client) getGRPCConn(ctx context.Context) (*grpc.ClientConn, error) {
opts := append(
c.config.GetDialOptions(),
grpc.WithPerRPCCredentials(c),
grpc.WithDefaultCallOptions(
grpc.UseCompressor("gzip"),
),
grpc.WithChainUnaryInterceptor(
logging.UnaryClientInterceptor(logging.LoggerFunc(interceptorLogger)),
),
grpc.WithStreamInterceptor(
logging.StreamClientInterceptor(logging.LoggerFunc(interceptorLogger)),
),
)
conn, err := grpc.DialContext(ctx, c.config.GetConnectionURI(), opts...)
if err != nil {
return nil, fmt.Errorf("error dialing grpc server: %w", err)
}
go c.logConnectionState(ctx, conn)
return conn, nil
}
// GetRequestMetadata implements credentials.PerRPCCredentials
func (c *client) GetRequestMetadata(ctx context.Context, _ ...string) (map[string]string, error) {
token, err := c.tokenProvider(ctx)
if err != nil {
return nil, err
}
return map[string]string{
"authorization": fmt.Sprintf("Bearer %s", token),
}, nil
}
// RequireTransportSecurity implements credentials.PerRPCCredentials
func (c *client) RequireTransportSecurity() bool {
return c.config.RequireTLS()
}
func (c *client) logConnectionState(ctx context.Context, conn *grpc.ClientConn) {
var state connectivity.State = -1
for ctx.Err() == nil && state != connectivity.Shutdown {
_ = conn.WaitForStateChange(ctx, state)
state = conn.GetState()
log.Ctx(ctx).Info().
Str("endpoint", c.config.connectionURI).
Str("state", state.String()).
Msg("grpc connection state")
}
}
func interceptorLogger(ctx context.Context, lvl logging.Level, msg string, fields ...any) {
l := log.Ctx(ctx).With().Fields(fields).Logger()
switch lvl {
case logging.LevelDebug:
l.Info().Msg(msg)
case logging.LevelInfo:
l.Info().Msg(msg)
case logging.LevelWarn:
l.Warn().Msg(msg)
case logging.LevelError:
l.Error().Msg(msg)
default:
panic(fmt.Sprintf("unknown level %v", lvl))
}
}

View file

@ -0,0 +1,47 @@
package grpcconn
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConfig(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
endpoint string
connectionURI string
requireTLS bool
expectError bool
}{
{"http://localhost:8721", "dns:localhost:8721", false, false},
{"https://localhost:8721", "dns:localhost:8721", true, false},
{"http://localhost:8721/", "dns:localhost:8721", false, false},
{"https://localhost:8721/", "dns:localhost:8721", true, false},
{"http://localhost", "dns:localhost:80", false, false},
{"https://localhost", "dns:localhost:443", true, false},
{endpoint: "", expectError: true},
{endpoint: "http://", expectError: true},
{endpoint: "https://", expectError: true},
{endpoint: "localhost:8721", expectError: true},
{endpoint: "http://localhost:8721/path", expectError: true},
{endpoint: "https://localhost:8721/path", expectError: true},
} {
tc := tc
t.Run(tc.endpoint, func(t *testing.T) {
t.Parallel()
cfg, err := getConfig(tc.endpoint)
if tc.expectError {
require.Error(t, err)
return
}
if assert.NoError(t, err) {
assert.Equal(t, tc.connectionURI, cfg.GetConnectionURI(), "connection uri")
assert.Equal(t, tc.requireTLS, cfg.RequireTLS(), "require tls")
}
})
}
}

View file

@ -0,0 +1,121 @@
package grpcconn
import (
"crypto/tls"
"fmt"
"net"
"net/url"
"regexp"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
// config is the configuration for the gRPC client
type config struct {
connectionURI string
// requireTLS is whether TLS should be used or cleartext
requireTLS bool
// opts are additional options to pass to the gRPC client
opts []grpc.DialOption
}
// NewConfig returns a new Config from an endpoint string, that has to be in a URL format.
// The endpoint can be either http:// or https:// that will be used to determine whether TLS should be used.
// if port is not specified, it will be inferred from the scheme (80 for http, 443 for https).
func getConfig(endpoint string) (*config, error) {
c := new(config)
err := c.parseEndpoint(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid endpoint: %w", err)
}
c.buildTLSOptions()
return c, nil
}
// GetConnectionURI returns connection string conforming to https://github.com/grpc/grpc/blob/master/doc/naming.md
func (c *config) GetConnectionURI() string {
return c.connectionURI
}
// GetDialTimeout returns the timeout for the dial operation
func (c *config) GetDialTimeout() time.Duration {
return time.Hour
}
// RequireTLS returns whether TLS should be used or cleartext
func (c *config) RequireTLS() bool {
return c.requireTLS
}
// GetDialOptions returns the dial options to pass to the gRPC client
func (c *config) GetDialOptions() []grpc.DialOption {
return c.opts
}
func (c *config) buildTLSOptions() {
creds := insecure.NewCredentials()
if c.requireTLS {
creds = credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
})
}
c.opts = append(c.opts, grpc.WithTransportCredentials(creds))
}
func (c *config) parseEndpoint(endpoint string) error {
u, err := url.Parse(endpoint)
if err != nil {
return fmt.Errorf("error parsing endpoint url: %w", err)
}
if u.Path != "" && u.Path != "/" {
return fmt.Errorf("endpoint path is not supported: %s", u.Path)
}
host, port, err := splitHostPort(u.Host)
if err != nil {
return fmt.Errorf("error splitting host and port: %w", err)
}
var requireTLS bool
if u.Scheme == "http" {
requireTLS = false
if port == "" {
port = "80"
}
} else if u.Scheme == "https" {
requireTLS = true
if port == "" {
port = "443"
}
} else {
return fmt.Errorf("unsupported url scheme: %s", u.Scheme)
}
c.connectionURI = fmt.Sprintf("dns:%s:%s", host, port)
c.requireTLS = requireTLS
return nil
}
var rePort = regexp.MustCompile(`:(\d+)$`)
func splitHostPort(hostport string) (host, port string, err error) {
if hostport == "" {
return "", "", fmt.Errorf("empty hostport")
}
if rePort.MatchString(hostport) {
host, port, err = net.SplitHostPort(hostport)
if host == "" {
return "", "", fmt.Errorf("empty host")
}
if port == "" {
return "", "", fmt.Errorf("empty port")
}
return host, port, err
}
return hostport, "", nil
}

View file

@ -0,0 +1,49 @@
package reporter
import (
"time"
"go.opentelemetry.io/otel/metric"
)
type config struct {
shutdownTimeout time.Duration
collectInterval time.Duration
metrics []func(metric.Meter) error
}
// Option is a functional option for configuring the dialhome package.
type Option func(*config)
// WithShutdownTimeout sets the shutdown timeout to use for dialhome.
func WithShutdownTimeout(timeout time.Duration) Option {
return func(c *config) {
c.shutdownTimeout = timeout
}
}
// WithCollectInterval sets the collect interval for metrics to be queried.
func WithCollectInterval(interval time.Duration) Option {
return func(c *config) {
c.collectInterval = interval
}
}
// WithMetrics adds metrics to be collected
func WithMetrics(fns ...func(metric.Meter) error) Option {
return func(c *config) {
c.metrics = append(c.metrics, fns...)
}
}
func getConfig(opts ...Option) *config {
c := new(config)
defaults := []Option{
WithShutdownTimeout(time.Second * 5),
WithCollectInterval(time.Hour),
}
for _, opt := range append(defaults, opts...) {
opt(c)
}
return c
}

View file

@ -0,0 +1,61 @@
// Package reporter periodically submits metrics back to the cloud.
package reporter
import (
"context"
"fmt"
"time"
export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
metric_sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"google.golang.org/grpc"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/version"
)
// Run starts loop that pushes metrics via OTEL protocol until ctx is canceled
func Run(
ctx context.Context,
conn *grpc.ClientConn,
opts ...Option,
) error {
cfg := getConfig(opts...)
exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn))
if err != nil {
return fmt.Errorf("starting OTEL exporter: %w", err)
}
defer shutdown(exporter.Shutdown, cfg.shutdownTimeout)
provider := metric_sdk.NewMeterProvider(
metric_sdk.WithResource(resource.NewSchemaless(
semconv.ServiceNameKey.String("pomerium-managed-core"),
semconv.ServiceVersionKey.String(version.FullVersion()),
)),
metric_sdk.WithReader(
metric_sdk.NewPeriodicReader(
exporter,
metric_sdk.WithInterval(cfg.collectInterval),
)))
defer shutdown(provider.Shutdown, cfg.shutdownTimeout)
meter := provider.Meter("pomerium-managed-core")
for _, fn := range cfg.metrics {
err := fn(meter)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("error registering metric")
}
}
<-ctx.Done()
return ctx.Err()
}
func shutdown(fn func(ctx context.Context) error, timeout time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
_ = fn(ctx)
}