diff --git a/authenticate/authenticate.go b/authenticate/authenticate.go index 4c2b46b01..a8c2638d1 100644 --- a/authenticate/authenticate.go +++ b/authenticate/authenticate.go @@ -150,6 +150,7 @@ func New(opts config.Options) (*Authenticate, error) { RequestTimeout: opts.GRPCClientTimeout, ClientDNSRoundRobin: opts.GRPCClientDNSRoundRobin, WithInsecure: opts.GRPCInsecure, + ServiceName: opts.Services, }) if err != nil { return nil, err diff --git a/authorize/authorize.go b/authorize/authorize.go index 5da810057..0dcd9c32b 100644 --- a/authorize/authorize.go +++ b/authorize/authorize.go @@ -79,6 +79,7 @@ func New(opts config.Options) (*Authorize, error) { RequestTimeout: opts.GRPCClientTimeout, ClientDNSRoundRobin: opts.GRPCClientDNSRoundRobin, WithInsecure: opts.GRPCInsecure, + ServiceName: opts.Services, }) if err != nil { return nil, fmt.Errorf("authorize: error creating cache connection: %w", err) diff --git a/cache/cache.go b/cache/cache.go index 598742383..dcb02d0db 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -19,6 +19,7 @@ import ( "github.com/pomerium/pomerium/internal/grpc/user" "github.com/pomerium/pomerium/internal/identity" "github.com/pomerium/pomerium/internal/identity/manager" + "github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/urlutil" ) @@ -52,9 +53,19 @@ func New(opts config.Options) (*Cache, error) { if err != nil { return nil, err } + + // No metrics handler because we have one in the control plane. Add one + // if we no longer register with that grpc Server localGRPCServer := grpc.NewServer() - localGRPCConnection, err := grpc.DialContext(context.Background(), localListener.Addr().String(), - grpc.WithInsecure()) + + clientStatsHandler := telemetry.NewGRPCClientStatsHandler(opts.Services) + clientDialOptions := clientStatsHandler.DialOptions(grpc.WithInsecure()) + + localGRPCConnection, err := grpc.DialContext( + context.Background(), + localListener.Addr().String(), + clientDialOptions..., + ) if err != nil { return nil, err } diff --git a/internal/grpc/client.go b/internal/grpc/client.go index 06f3b0cb7..2a43e3cde 100644 --- a/internal/grpc/client.go +++ b/internal/grpc/client.go @@ -13,14 +13,13 @@ import ( "strconv" "time" - "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/internal/telemetry/metrics" - "github.com/pomerium/pomerium/internal/telemetry/requestid" - - "go.opencensus.io/plugin/ocgrpc" "google.golang.org/grpc" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/credentials" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry" + "github.com/pomerium/pomerium/internal/telemetry/requestid" ) const ( @@ -68,17 +67,19 @@ func NewGRPCClientConn(opts *Options) (*grpc.ClientConn, error) { connAddr = net.JoinHostPort(connAddr, strconv.Itoa(defaultGRPCInsecurePort)) } } + dialOptions := []grpc.DialOption{ grpc.WithChainUnaryInterceptor( requestid.UnaryClientInterceptor(), - metrics.GRPCClientInterceptor(opts.ServiceName), grpcTimeoutInterceptor(opts.RequestTimeout), ), grpc.WithStreamInterceptor(requestid.StreamClientInterceptor()), - grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), grpc.WithDefaultCallOptions([]grpc.CallOption{grpc.WaitForReady(true)}...), } + clientStatsHandler := telemetry.NewGRPCClientStatsHandler(opts.ServiceName) + dialOptions = clientStatsHandler.DialOptions(dialOptions...) + if opts.WithInsecure { log.Info().Str("addr", connAddr).Msg("internal/grpc: grpc with insecure") dialOptions = append(dialOptions, grpc.WithInsecure()) diff --git a/internal/telemetry/grpc.go b/internal/telemetry/grpc.go index 24c05e742..b4b4258f2 100644 --- a/internal/telemetry/grpc.go +++ b/internal/telemetry/grpc.go @@ -4,6 +4,7 @@ import ( "context" "go.opencensus.io/plugin/ocgrpc" + "google.golang.org/grpc" grpcstats "google.golang.org/grpc/stats" "github.com/pomerium/pomerium/internal/telemetry/metrics" @@ -37,3 +38,30 @@ func NewGRPCServerStatsHandler(service string) grpcstats.Handler { metricsHandler: metrics.NewGRPCServerMetricsHandler(ServiceName(service)), } } + +// GRPCClientStatsHandler provides DialOptions for grpc clients to instrument network calls with +// both metrics and tracing +type GRPCClientStatsHandler struct { + UnaryInterceptor grpc.UnaryClientInterceptor + // TODO: we should have a streaming interceptor too + grpcstats.Handler +} + +// NewGRPCClientStatsHandler returns a new GRPCClientStatsHandler used to create +// telemetry related client DialOptions +func NewGRPCClientStatsHandler(service string) *GRPCClientStatsHandler { + return &GRPCClientStatsHandler{ + Handler: &ocgrpc.ClientHandler{}, + UnaryInterceptor: metrics.GRPCClientInterceptor(ServiceName(service)), + } +} + +// DialOptions returns telemetry related DialOptions appended to an optional existing list +// of DialOptions +func (h *GRPCClientStatsHandler) DialOptions(o ...grpc.DialOption) []grpc.DialOption { + o = append(o, + grpc.WithUnaryInterceptor(h.UnaryInterceptor), + grpc.WithStatsHandler(h.Handler), + ) + return o +} diff --git a/internal/telemetry/grpc_test.go b/internal/telemetry/grpc_test.go index d2e208f2b..54bdc8021 100644 --- a/internal/telemetry/grpc_test.go +++ b/internal/telemetry/grpc_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "go.opencensus.io/plugin/ocgrpc" + "google.golang.org/grpc" grpcstats "google.golang.org/grpc/stats" ) @@ -35,3 +36,27 @@ func Test_GRPCServerStatsHandler(t *testing.T) { assert.Equal(t, ctx.Value(mockCtxTag("added")), "true") assert.Equal(t, ctx.Value(mockCtxTag("original")), "true") } + +type mockDialOption struct { + name string + grpc.EmptyDialOption +} + +func Test_NewGRPCClientStatsHandler(t *testing.T) { + t.Parallel() + + h := NewGRPCClientStatsHandler("test") + + origOpts := []grpc.DialOption{ + mockDialOption{name: "one"}, + mockDialOption{name: "two"}, + } + + newOpts := h.DialOptions(origOpts...) + + for i := range origOpts { + assert.Contains(t, newOpts, origOpts[i]) + } + + assert.Greater(t, len(newOpts), len(origOpts)) +} diff --git a/proxy/proxy.go b/proxy/proxy.go index ebc5d4873..b2c84c32b 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -150,6 +150,7 @@ func New(opts config.Options) (*Proxy, error) { RequestTimeout: opts.GRPCClientTimeout, ClientDNSRoundRobin: opts.GRPCClientDNSRoundRobin, WithInsecure: opts.GRPCInsecure, + ServiceName: opts.Services, }) if err != nil { return nil, err