diff --git a/Makefile b/Makefile index 06f6c78ea..1515fa204 100644 --- a/Makefile +++ b/Makefile @@ -93,7 +93,7 @@ build-ui: yarn .PHONY: lint lint: ## Verifies `golint` passes. @echo "==> $@" - @golangci-lint run ./... + @go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 run ./... --fix .PHONY: test test: get-envoy ## Runs the go tests. diff --git a/go.mod b/go.mod index ed7e78ace..ecff0c583 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.1 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jackc/pgx/v5 v5.5.0 @@ -63,6 +64,11 @@ require ( github.com/volatiletech/null/v9 v9.0.0 github.com/yuin/gopher-lua v1.1.1 go.opencensus.io v0.24.0 + go.opentelemetry.io/otel v1.21.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 + go.opentelemetry.io/otel/metric v1.21.0 + go.opentelemetry.io/otel/sdk v1.21.0 + go.opentelemetry.io/otel/sdk/metric v1.21.0 go.uber.org/automaxprocs v1.5.3 go.uber.org/zap v1.26.0 golang.org/x/crypto v0.17.0 @@ -139,6 +145,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/securecookie v1.1.1 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.13 // indirect @@ -193,10 +200,8 @@ require ( github.com/yashtewari/glob-intersection v0.2.0 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/zeebo/blake3 v0.2.3 // indirect - go.opentelemetry.io/otel v1.21.0 // indirect - go.opentelemetry.io/otel/metric v1.21.0 // indirect - go.opentelemetry.io/otel/sdk v1.21.0 // indirect go.opentelemetry.io/otel/trace v1.21.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.14.0 // indirect diff --git a/go.sum b/go.sum index 19f0874ee..e7780d021 100644 --- a/go.sum +++ b/go.sum @@ -399,8 +399,9 @@ github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZH github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1 h1:HcUWd006luQPljE73d5sk+/VgYPGUReEVz2y1/qylwY= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.1/go.mod h1:w9Y7gY31krpLmrVU5ZPG9H7l9fZuRu5/3R3S3FMtVQ4= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= -github.com/grpc-ecosystem/grpc-gateway v1.9.0 h1:bM6ZAFZmc/wPFaRDi0d5L7hGEZEx/2u+Tmr2evNHDiI= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= @@ -623,8 +624,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= @@ -757,6 +758,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 h1:jd0+5t/YynESZqsSyPz+7PAFdEop0dlN0+PkyHYo8oI= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0/go.mod h1:U707O40ee1FpQGyhvqnzmCJm1Wh6OX6GGBVn0E6Uyyk= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk= @@ -765,6 +768,8 @@ go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ3 go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= +go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= +go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= diff --git a/internal/zero/analytics/collector.go b/internal/zero/analytics/collector.go index 051985a3d..f31cbdf1d 100644 --- a/internal/zero/analytics/collector.go +++ b/internal/zero/analytics/collector.go @@ -10,7 +10,7 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/databroker" ) -// Collect collects metrics and reports them to the cloud +// Collect collects metrics and stores them in the databroker func Collect( ctx context.Context, client databroker.DataBrokerServiceClient, diff --git a/internal/zero/analytics/metrics.go b/internal/zero/analytics/metrics.go new file mode 100644 index 000000000..de67dbcdd --- /dev/null +++ b/internal/zero/analytics/metrics.go @@ -0,0 +1,47 @@ +package analytics + +import ( + "context" + + "go.opentelemetry.io/otel/metric" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +// Metrics returns a list of metrics to be exported +func Metrics( + clientProvider func() databroker.DataBrokerServiceClient, +) []func(m metric.Meter) error { + return []func(m metric.Meter) error{ + registerMetric("dau", clientProvider), + registerMetric("mau", clientProvider), + } +} + +func registerMetric( + id string, + clientProvider func() databroker.DataBrokerServiceClient, +) func(m metric.Meter) error { + return func(m metric.Meter) error { + _, err := m.Int64ObservableGauge(id, + metric.WithInt64Callback(metricCallback(id, clientProvider)), + ) + return err + } +} + +func metricCallback( + id string, + clientProvider func() databroker.DataBrokerServiceClient, +) metric.Int64Callback { + return func(ctx context.Context, result metric.Int64Observer) error { + state, err := LoadMetricState(ctx, clientProvider(), id) + if err != nil { + log.Ctx(ctx).Error().Err(err).Str("metric", id).Msg("error loading metric state") + return nil // returning an error would block export of other metrics according to SDK design + } + result.Observe(int64(state.Count)) + return nil + } +} diff --git a/internal/zero/api/api.go b/internal/zero/api/api.go index 488c2c31f..673f596ed 100644 --- a/internal/zero/api/api.go +++ b/internal/zero/api/api.go @@ -4,9 +4,12 @@ package zero import ( "context" "fmt" + "time" "github.com/pomerium/pomerium/internal/zero/apierror" connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" + "github.com/pomerium/pomerium/internal/zero/grpcconn" + "github.com/pomerium/pomerium/internal/zero/reporter" token_api "github.com/pomerium/pomerium/internal/zero/token" "github.com/pomerium/pomerium/pkg/fanout" cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster" @@ -19,8 +22,18 @@ type API struct { cluster cluster_api.ClientWithResponsesInterface mux *connect_mux.Mux downloadURLCache *cluster_api.URLCache + tokenFn func(ctx context.Context, ttl time.Duration) (string, error) } +const ( + // access tokens are only good for an hour, + // and they define the maximum connection time, + // so we want it to be as close to the max as possible for the streaming gRPC connection + minConnectTokenTTL = time.Minute * 55 + + minTelemetryTokenTTL = time.Minute * 5 +) + // WatchOption defines which events to watch for type WatchOption = connect_mux.WatchOption @@ -45,19 +58,33 @@ func NewAPI(ctx context.Context, opts ...Option) (*API, error) { return nil, fmt.Errorf("error creating cluster client: %w", err) } - connectClient, err := connect_api.NewAuthorizedConnectClient(ctx, cfg.connectAPIEndpoint, tokenCache.GetToken) + connectGRPCConn, err := grpcconn.New(ctx, cfg.connectAPIEndpoint, func(ctx context.Context) (string, error) { + return tokenCache.GetToken(ctx, minConnectTokenTTL) + }) if err != nil { - return nil, fmt.Errorf("error creating connect client: %w", err) + return nil, fmt.Errorf("error creating connect grpc client: %w", err) } return &API{ cfg: cfg, cluster: clusterClient, - mux: connect_mux.New(connectClient), + mux: connect_mux.New(connect_api.NewConnectClient(connectGRPCConn)), downloadURLCache: cluster_api.NewURLCache(), + tokenFn: tokenCache.GetToken, }, nil } +// Report runs metrics reporting to the cloud +func (api *API) Report(ctx context.Context, opts ...reporter.Option) error { + conn, err := grpcconn.New(ctx, api.cfg.otelEndpoint, func(ctx context.Context) (string, error) { + return api.tokenFn(ctx, minTelemetryTokenTTL) + }) + if err != nil { + return fmt.Errorf("error creating OTEL exporter grpc client: %w", err) + } + return reporter.Run(ctx, conn, opts...) +} + // Connect connects to the connect API and allows watching for changes func (api *API) Connect(ctx context.Context, opts ...fanout.Option) error { return api.mux.Run(ctx, opts...) diff --git a/internal/zero/api/config.go b/internal/zero/api/config.go index a2bf2a71a..43e212e23 100644 --- a/internal/zero/api/config.go +++ b/internal/zero/api/config.go @@ -12,6 +12,7 @@ type Option func(*config) type config struct { clusterAPIEndpoint string connectAPIEndpoint string + otelEndpoint string apiToken string httpClient *http.Client downloadURLCacheTTL time.Duration @@ -31,6 +32,13 @@ func WithConnectAPIEndpoint(endpoint string) Option { } } +// WithOTELEndpoint sets the OTEL API endpoint +func WithOTELEndpoint(endpoint string) Option { + return func(cfg *config) { + cfg.otelEndpoint = endpoint + } +} + // WithAPIToken sets the API token func WithAPIToken(token string) Option { return func(cfg *config) { @@ -77,6 +85,9 @@ func (c *config) validate() error { if c.connectAPIEndpoint == "" { return fmt.Errorf("connect API endpoint is required") } + if c.otelEndpoint == "" { + return fmt.Errorf("OTEL API endpoint is required") + } if c.apiToken == "" { return fmt.Errorf("API token is required") } diff --git a/internal/zero/cmd/command.go b/internal/zero/cmd/command.go index 33818a3fd..b26bcad77 100644 --- a/internal/zero/cmd/command.go +++ b/internal/zero/cmd/command.go @@ -38,6 +38,7 @@ func Run(ctx context.Context, configFile string) error { controller.WithAPIToken(token), controller.WithClusterAPIEndpoint(getClusterAPIEndpoint()), controller.WithConnectAPIEndpoint(getConnectAPIEndpoint()), + controller.WithOTELAPIEndpoint(getOTELAPIEndpoint()), controller.WithBootstrapConfigFileName(bootstrapConfigFileName), ) } diff --git a/internal/zero/cmd/env.go b/internal/zero/cmd/env.go index 13f94e6d9..8f5077aa9 100644 --- a/internal/zero/cmd/env.go +++ b/internal/zero/cmd/env.go @@ -43,3 +43,10 @@ func getClusterAPIEndpoint() string { } return "https://console.pomerium.app/cluster/v1" } + +func getOTELAPIEndpoint() string { + if endpoint := os.Getenv("POMERIUM_OTEL_ENDPOINT"); endpoint != "" { + return endpoint + } + return "https://telemetry.pomerium.app" +} diff --git a/internal/zero/controller/config.go b/internal/zero/controller/config.go index 8aea41618..016bab07b 100644 --- a/internal/zero/controller/config.go +++ b/internal/zero/controller/config.go @@ -9,6 +9,7 @@ type controllerConfig struct { apiToken string clusterAPIEndpoint string connectAPIEndpoint string + otelEndpoint string tmpDir string bootstrapConfigFileName string @@ -38,6 +39,13 @@ func WithConnectAPIEndpoint(endpoint string) Option { } } +// WithOTELAPIEndpoint sets the endpoint to use for the OTEL API +func WithOTELAPIEndpoint(endpoint string) Option { + return func(c *controllerConfig) { + c.otelEndpoint = endpoint + } +} + // WithAPIToken sets the API token to use for authentication. func WithAPIToken(token string) Option { return func(c *controllerConfig) { diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index 9581f1ee6..00d797684 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -15,6 +15,7 @@ import ( sdk "github.com/pomerium/pomerium/internal/zero/api" "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/internal/zero/reconciler" + "github.com/pomerium/pomerium/internal/zero/reporter" "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) @@ -46,6 +47,7 @@ func Run(ctx context.Context, opts ...Option) error { eg.Go(func() error { return run(ctx, "zero-reconciler", c.runReconciler, src.WaitReady) }) eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) }) eg.Go(func() error { return run(ctx, "zero-analytics", c.runAnalytics, src.WaitReady) }) + eg.Go(func() error { return run(ctx, "zero-reporter", c.runReporter, src.WaitReady) }) return eg.Wait() } @@ -64,6 +66,7 @@ func (c *controller) initAPI(ctx context.Context) error { sdk.WithClusterAPIEndpoint(c.cfg.clusterAPIEndpoint), sdk.WithAPIToken(c.cfg.apiToken), sdk.WithConnectAPIEndpoint(c.cfg.connectAPIEndpoint), + sdk.WithOTELEndpoint(c.cfg.otelEndpoint), ) if err != nil { return fmt.Errorf("error initializing cloud api: %w", err) @@ -122,7 +125,11 @@ func (c *controller) runReconciler(ctx context.Context) error { } func (c *controller) runAnalytics(ctx context.Context) error { - err := analytics.Collect(ctx, c.GetDataBrokerServiceClient(), time.Second*30) + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("service", "zero-analytics") + }) + + err := analytics.Collect(ctx, c.GetDataBrokerServiceClient(), time.Hour) if err != nil && ctx.Err() == nil { log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling") return nil @@ -130,3 +137,14 @@ func (c *controller) runAnalytics(ctx context.Context) error { return err } + +func (c *controller) runReporter(ctx context.Context) error { + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("service", "zero-reporter") + }) + + return c.api.Report(ctx, + reporter.WithCollectInterval(time.Hour), + reporter.WithMetrics(analytics.Metrics(c.GetDataBrokerServiceClient)...), + ) +} diff --git a/internal/zero/grpcconn/client.go b/internal/zero/grpcconn/client.go new file mode 100644 index 000000000..795199e40 --- /dev/null +++ b/internal/zero/grpcconn/client.go @@ -0,0 +1,115 @@ +// Package grpcconn provides a gRPC client with authentication +package grpcconn + +import ( + "context" + "fmt" + + "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + + "github.com/pomerium/pomerium/internal/log" +) + +type client struct { + config *config + tokenProvider TokenProviderFn +} + +// TokenProviderFn is a function that returns an authorization token +type TokenProviderFn func(ctx context.Context) (string, error) + +// New creates a new gRPC client with authentication +func New( + ctx context.Context, + endpoint string, + tokenProvider TokenProviderFn, +) (*grpc.ClientConn, error) { + cfg, err := getConfig(endpoint) + if err != nil { + return nil, err + } + + cc := &client{ + tokenProvider: tokenProvider, + config: cfg, + } + + conn, err := cc.getGRPCConn(ctx) + if err != nil { + return nil, err + } + + return conn, err +} + +func (c *client) getGRPCConn(ctx context.Context) (*grpc.ClientConn, error) { + opts := append( + c.config.GetDialOptions(), + grpc.WithPerRPCCredentials(c), + grpc.WithDefaultCallOptions( + grpc.UseCompressor("gzip"), + ), + grpc.WithChainUnaryInterceptor( + logging.UnaryClientInterceptor(logging.LoggerFunc(interceptorLogger)), + ), + grpc.WithStreamInterceptor( + logging.StreamClientInterceptor(logging.LoggerFunc(interceptorLogger)), + ), + ) + + conn, err := grpc.DialContext(ctx, c.config.GetConnectionURI(), opts...) + if err != nil { + return nil, fmt.Errorf("error dialing grpc server: %w", err) + } + + go c.logConnectionState(ctx, conn) + + return conn, nil +} + +// GetRequestMetadata implements credentials.PerRPCCredentials +func (c *client) GetRequestMetadata(ctx context.Context, _ ...string) (map[string]string, error) { + token, err := c.tokenProvider(ctx) + if err != nil { + return nil, err + } + return map[string]string{ + "authorization": fmt.Sprintf("Bearer %s", token), + }, nil +} + +// RequireTransportSecurity implements credentials.PerRPCCredentials +func (c *client) RequireTransportSecurity() bool { + return c.config.RequireTLS() +} + +func (c *client) logConnectionState(ctx context.Context, conn *grpc.ClientConn) { + var state connectivity.State = -1 + for ctx.Err() == nil && state != connectivity.Shutdown { + _ = conn.WaitForStateChange(ctx, state) + state = conn.GetState() + log.Ctx(ctx).Info(). + Str("endpoint", c.config.connectionURI). + Str("state", state.String()). + Msg("grpc connection state") + } +} + +func interceptorLogger(ctx context.Context, lvl logging.Level, msg string, fields ...any) { + l := log.Ctx(ctx).With().Fields(fields).Logger() + + switch lvl { + case logging.LevelDebug: + l.Info().Msg(msg) + case logging.LevelInfo: + l.Info().Msg(msg) + case logging.LevelWarn: + l.Warn().Msg(msg) + case logging.LevelError: + l.Error().Msg(msg) + default: + panic(fmt.Sprintf("unknown level %v", lvl)) + } +} diff --git a/internal/zero/grpcconn/client_test.go b/internal/zero/grpcconn/client_test.go new file mode 100644 index 000000000..ee648a481 --- /dev/null +++ b/internal/zero/grpcconn/client_test.go @@ -0,0 +1,47 @@ +package grpcconn + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfig(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + endpoint string + connectionURI string + requireTLS bool + expectError bool + }{ + {"http://localhost:8721", "dns:localhost:8721", false, false}, + {"https://localhost:8721", "dns:localhost:8721", true, false}, + {"http://localhost:8721/", "dns:localhost:8721", false, false}, + {"https://localhost:8721/", "dns:localhost:8721", true, false}, + {"http://localhost", "dns:localhost:80", false, false}, + {"https://localhost", "dns:localhost:443", true, false}, + + {endpoint: "", expectError: true}, + {endpoint: "http://", expectError: true}, + {endpoint: "https://", expectError: true}, + {endpoint: "localhost:8721", expectError: true}, + {endpoint: "http://localhost:8721/path", expectError: true}, + {endpoint: "https://localhost:8721/path", expectError: true}, + } { + tc := tc + t.Run(tc.endpoint, func(t *testing.T) { + t.Parallel() + cfg, err := getConfig(tc.endpoint) + if tc.expectError { + require.Error(t, err) + return + } + if assert.NoError(t, err) { + assert.Equal(t, tc.connectionURI, cfg.GetConnectionURI(), "connection uri") + assert.Equal(t, tc.requireTLS, cfg.RequireTLS(), "require tls") + } + }) + } +} diff --git a/pkg/zero/connect/config.go b/internal/zero/grpcconn/config.go similarity index 84% rename from pkg/zero/connect/config.go rename to internal/zero/grpcconn/config.go index 9f32dd865..557a1f559 100644 --- a/pkg/zero/connect/config.go +++ b/internal/zero/grpcconn/config.go @@ -1,4 +1,4 @@ -package connect +package grpcconn import ( "crypto/tls" @@ -13,8 +13,8 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -// Config is the configuration for the gRPC client -type Config struct { +// config is the configuration for the gRPC client +type config struct { connectionURI string // requireTLS is whether TLS should be used or cleartext requireTLS bool @@ -25,8 +25,8 @@ type Config struct { // NewConfig returns a new Config from an endpoint string, that has to be in a URL format. // The endpoint can be either http:// or https:// that will be used to determine whether TLS should be used. // if port is not specified, it will be inferred from the scheme (80 for http, 443 for https). -func NewConfig(endpoint string) (*Config, error) { - c := new(Config) +func getConfig(endpoint string) (*config, error) { + c := new(config) err := c.parseEndpoint(endpoint) if err != nil { return nil, fmt.Errorf("invalid endpoint: %w", err) @@ -36,26 +36,26 @@ func NewConfig(endpoint string) (*Config, error) { } // GetConnectionURI returns connection string conforming to https://github.com/grpc/grpc/blob/master/doc/naming.md -func (c *Config) GetConnectionURI() string { +func (c *config) GetConnectionURI() string { return c.connectionURI } // GetDialTimeout returns the timeout for the dial operation -func (c *Config) GetDialTimeout() time.Duration { - return defaultDialTimeout +func (c *config) GetDialTimeout() time.Duration { + return time.Hour } // RequireTLS returns whether TLS should be used or cleartext -func (c *Config) RequireTLS() bool { +func (c *config) RequireTLS() bool { return c.requireTLS } // GetDialOptions returns the dial options to pass to the gRPC client -func (c *Config) GetDialOptions() []grpc.DialOption { +func (c *config) GetDialOptions() []grpc.DialOption { return c.opts } -func (c *Config) buildTLSOptions() { +func (c *config) buildTLSOptions() { creds := insecure.NewCredentials() if c.requireTLS { creds = credentials.NewTLS(&tls.Config{ @@ -65,7 +65,7 @@ func (c *Config) buildTLSOptions() { c.opts = append(c.opts, grpc.WithTransportCredentials(creds)) } -func (c *Config) parseEndpoint(endpoint string) error { +func (c *config) parseEndpoint(endpoint string) error { u, err := url.Parse(endpoint) if err != nil { return fmt.Errorf("error parsing endpoint url: %w", err) diff --git a/internal/zero/reporter/config.go b/internal/zero/reporter/config.go new file mode 100644 index 000000000..2ff991fe5 --- /dev/null +++ b/internal/zero/reporter/config.go @@ -0,0 +1,49 @@ +package reporter + +import ( + "time" + + "go.opentelemetry.io/otel/metric" +) + +type config struct { + shutdownTimeout time.Duration + collectInterval time.Duration + metrics []func(metric.Meter) error +} + +// Option is a functional option for configuring the dialhome package. +type Option func(*config) + +// WithShutdownTimeout sets the shutdown timeout to use for dialhome. +func WithShutdownTimeout(timeout time.Duration) Option { + return func(c *config) { + c.shutdownTimeout = timeout + } +} + +// WithCollectInterval sets the collect interval for metrics to be queried. +func WithCollectInterval(interval time.Duration) Option { + return func(c *config) { + c.collectInterval = interval + } +} + +// WithMetrics adds metrics to be collected +func WithMetrics(fns ...func(metric.Meter) error) Option { + return func(c *config) { + c.metrics = append(c.metrics, fns...) + } +} + +func getConfig(opts ...Option) *config { + c := new(config) + defaults := []Option{ + WithShutdownTimeout(time.Second * 5), + WithCollectInterval(time.Hour), + } + for _, opt := range append(defaults, opts...) { + opt(c) + } + return c +} diff --git a/internal/zero/reporter/reporter.go b/internal/zero/reporter/reporter.go new file mode 100644 index 000000000..21973941b --- /dev/null +++ b/internal/zero/reporter/reporter.go @@ -0,0 +1,61 @@ +// Package reporter periodically submits metrics back to the cloud. +package reporter + +import ( + "context" + "fmt" + "time" + + export_grpc "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + metric_sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "google.golang.org/grpc" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/version" +) + +// Run starts loop that pushes metrics via OTEL protocol until ctx is canceled +func Run( + ctx context.Context, + conn *grpc.ClientConn, + opts ...Option, +) error { + cfg := getConfig(opts...) + + exporter, err := export_grpc.New(ctx, export_grpc.WithGRPCConn(conn)) + if err != nil { + return fmt.Errorf("starting OTEL exporter: %w", err) + } + defer shutdown(exporter.Shutdown, cfg.shutdownTimeout) + + provider := metric_sdk.NewMeterProvider( + metric_sdk.WithResource(resource.NewSchemaless( + semconv.ServiceNameKey.String("pomerium-managed-core"), + semconv.ServiceVersionKey.String(version.FullVersion()), + )), + metric_sdk.WithReader( + metric_sdk.NewPeriodicReader( + exporter, + metric_sdk.WithInterval(cfg.collectInterval), + ))) + defer shutdown(provider.Shutdown, cfg.shutdownTimeout) + + meter := provider.Meter("pomerium-managed-core") + for _, fn := range cfg.metrics { + err := fn(meter) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msg("error registering metric") + } + } + + <-ctx.Done() + return ctx.Err() +} + +func shutdown(fn func(ctx context.Context) error, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + _ = fn(ctx) +} diff --git a/pkg/zero/connect/client.go b/pkg/zero/connect/client.go deleted file mode 100644 index 73046b766..000000000 --- a/pkg/zero/connect/client.go +++ /dev/null @@ -1,83 +0,0 @@ -package connect - -import ( - "context" - "fmt" - "time" - - "google.golang.org/grpc" - grpc_backoff "google.golang.org/grpc/backoff" -) - -const ( - defaultDialTimeout = time.Hour -) - -type client struct { - config *Config - tokenProvider TokenProviderFn - minTokenTTL time.Duration -} - -// TokenProviderFn is a function that returns a token that is expected to be valid for at least minTTL -type TokenProviderFn func(ctx context.Context, minTTL time.Duration) (string, error) - -// NewAuthorizedConnectClient creates a new gRPC client for the connect service -func NewAuthorizedConnectClient( - ctx context.Context, - endpoint string, - tokenProvider TokenProviderFn, -) (ConnectClient, error) { - cfg, err := NewConfig(endpoint) - if err != nil { - return nil, err - } - - cc := &client{ - tokenProvider: tokenProvider, - config: cfg, - // streaming connection would reset based on token duration, - // so we need it be close to max duration 1hr - minTokenTTL: time.Minute * 55, - } - - grpcConn, err := cc.getGRPCConn(ctx) - if err != nil { - return nil, err - } - - return NewConnectClient(grpcConn), nil -} - -func (c *client) getGRPCConn(ctx context.Context) (*grpc.ClientConn, error) { - conn, err := grpc.DialContext(ctx, - c.config.GetConnectionURI(), - append(c.config.GetDialOptions(), - grpc.WithPerRPCCredentials(c), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: grpc_backoff.DefaultConfig, - // the MinConnectTimeout is confusing and is actually the max timeout as per grpc implementation - MinConnectTimeout: c.config.GetDialTimeout(), - }), - )...) - if err != nil { - return nil, fmt.Errorf("error dialing grpc server: %w", err) - } - return conn, nil -} - -// GetRequestMetadata implements credentials.PerRPCCredentials -func (c *client) GetRequestMetadata(ctx context.Context, _ ...string) (map[string]string, error) { - token, err := c.tokenProvider(ctx, c.minTokenTTL) - if err != nil { - return nil, err - } - return map[string]string{ - "authorization": fmt.Sprintf("Bearer %s", token), - }, nil -} - -// RequireTransportSecurity implements credentials.PerRPCCredentials -func (c *client) RequireTransportSecurity() bool { - return c.config.RequireTLS() -} diff --git a/pkg/zero/connect/client_test.go b/pkg/zero/connect/client_test.go deleted file mode 100644 index 9af739751..000000000 --- a/pkg/zero/connect/client_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package connect_test - -import ( - "context" - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/pomerium/pomerium/internal/zero/token" - cluster_api "github.com/pomerium/pomerium/pkg/zero/cluster" - "github.com/pomerium/pomerium/pkg/zero/connect" -) - -func TestConfig(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - endpoint string - connectionURI string - requireTLS bool - expectError bool - }{ - {"http://localhost:8721", "dns:localhost:8721", false, false}, - {"https://localhost:8721", "dns:localhost:8721", true, false}, - {"http://localhost:8721/", "dns:localhost:8721", false, false}, - {"https://localhost:8721/", "dns:localhost:8721", true, false}, - {"http://localhost", "dns:localhost:80", false, false}, - {"https://localhost", "dns:localhost:443", true, false}, - - {endpoint: "", expectError: true}, - {endpoint: "http://", expectError: true}, - {endpoint: "https://", expectError: true}, - {endpoint: "localhost:8721", expectError: true}, - {endpoint: "http://localhost:8721/path", expectError: true}, - {endpoint: "https://localhost:8721/path", expectError: true}, - } { - tc := tc - t.Run(tc.endpoint, func(t *testing.T) { - t.Parallel() - cfg, err := connect.NewConfig(tc.endpoint) - if tc.expectError { - require.Error(t, err) - return - } - if assert.NoError(t, err) { - assert.Equal(t, tc.connectionURI, cfg.GetConnectionURI(), "connection uri") - assert.Equal(t, tc.requireTLS, cfg.RequireTLS(), "require tls") - } - }) - } -} - -func TestConnectClient(t *testing.T) { - refreshToken := os.Getenv("CONNECT_CLUSTER_IDENTITY_TOKEN") - if refreshToken == "" { - t.Skip("CONNECT_CLUSTER_IDENTITY_TOKEN not set") - } - - connectServerEndpoint := os.Getenv("CONNECT_SERVER_ENDPOINT") - if connectServerEndpoint == "" { - connectServerEndpoint = "http://localhost:8721" - } - - clusterAPIEndpoint := os.Getenv("CLUSTER_API_ENDPOINT") - if clusterAPIEndpoint == "" { - clusterAPIEndpoint = "http://localhost:8720/cluster/v1" - } - - fetcher, err := cluster_api.NewTokenFetcher(clusterAPIEndpoint) - require.NoError(t, err, "error creating token fetcher") - - ctx := context.Background() - deadline, ok := t.Deadline() - if ok { - var cancel context.CancelFunc - ctx, cancel = context.WithDeadline(ctx, deadline.Add(-1*time.Second)) - t.Cleanup(cancel) - } - - tokenCache := token.NewCache(fetcher, refreshToken) - - connectClient, err := connect.NewAuthorizedConnectClient(ctx, connectServerEndpoint, tokenCache.GetToken) - require.NoError(t, err, "error creating connect client") - - stream, err := connectClient.Subscribe(ctx, &connect.SubscribeRequest{}) - require.NoError(t, err, "error subscribing") - - for { - msg, err := stream.Recv() - require.NoError(t, err, "error receiving message") - t.Log(msg) - } -}