cache: add client telemetry (#975)

This commit is contained in:
Travis Groth 2020-06-22 18:18:44 -04:00 committed by GitHub
parent 24b523c043
commit 88a77c42bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 9 deletions

View file

@ -150,6 +150,7 @@ func New(opts config.Options) (*Authenticate, error) {
RequestTimeout: opts.GRPCClientTimeout,
ClientDNSRoundRobin: opts.GRPCClientDNSRoundRobin,
WithInsecure: opts.GRPCInsecure,
ServiceName: opts.Services,
})
if err != nil {
return nil, err

View file

@ -79,6 +79,7 @@ func New(opts config.Options) (*Authorize, error) {
RequestTimeout: opts.GRPCClientTimeout,
ClientDNSRoundRobin: opts.GRPCClientDNSRoundRobin,
WithInsecure: opts.GRPCInsecure,
ServiceName: opts.Services,
})
if err != nil {
return nil, fmt.Errorf("authorize: error creating cache connection: %w", err)

15
cache/cache.go vendored
View file

@ -19,6 +19,7 @@ import (
"github.com/pomerium/pomerium/internal/grpc/user"
"github.com/pomerium/pomerium/internal/identity"
"github.com/pomerium/pomerium/internal/identity/manager"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/urlutil"
)
@ -52,9 +53,19 @@ func New(opts config.Options) (*Cache, error) {
if err != nil {
return nil, err
}
// No metrics handler because we have one in the control plane. Add one
// if we no longer register with that grpc Server
localGRPCServer := grpc.NewServer()
localGRPCConnection, err := grpc.DialContext(context.Background(), localListener.Addr().String(),
grpc.WithInsecure())
clientStatsHandler := telemetry.NewGRPCClientStatsHandler(opts.Services)
clientDialOptions := clientStatsHandler.DialOptions(grpc.WithInsecure())
localGRPCConnection, err := grpc.DialContext(
context.Background(),
localListener.Addr().String(),
clientDialOptions...,
)
if err != nil {
return nil, err
}

View file

@ -13,14 +13,13 @@ import (
"strconv"
"time"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/telemetry/requestid"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/credentials"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/requestid"
)
const (
@ -68,17 +67,19 @@ func NewGRPCClientConn(opts *Options) (*grpc.ClientConn, error) {
connAddr = net.JoinHostPort(connAddr, strconv.Itoa(defaultGRPCInsecurePort))
}
}
dialOptions := []grpc.DialOption{
grpc.WithChainUnaryInterceptor(
requestid.UnaryClientInterceptor(),
metrics.GRPCClientInterceptor(opts.ServiceName),
grpcTimeoutInterceptor(opts.RequestTimeout),
),
grpc.WithStreamInterceptor(requestid.StreamClientInterceptor()),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithDefaultCallOptions([]grpc.CallOption{grpc.WaitForReady(true)}...),
}
clientStatsHandler := telemetry.NewGRPCClientStatsHandler(opts.ServiceName)
dialOptions = clientStatsHandler.DialOptions(dialOptions...)
if opts.WithInsecure {
log.Info().Str("addr", connAddr).Msg("internal/grpc: grpc with insecure")
dialOptions = append(dialOptions, grpc.WithInsecure())

View file

@ -4,6 +4,7 @@ import (
"context"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
grpcstats "google.golang.org/grpc/stats"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
@ -37,3 +38,30 @@ func NewGRPCServerStatsHandler(service string) grpcstats.Handler {
metricsHandler: metrics.NewGRPCServerMetricsHandler(ServiceName(service)),
}
}
// GRPCClientStatsHandler provides DialOptions for grpc clients to instrument network calls with
// both metrics and tracing
type GRPCClientStatsHandler struct {
UnaryInterceptor grpc.UnaryClientInterceptor
// TODO: we should have a streaming interceptor too
grpcstats.Handler
}
// NewGRPCClientStatsHandler returns a new GRPCClientStatsHandler used to create
// telemetry related client DialOptions
func NewGRPCClientStatsHandler(service string) *GRPCClientStatsHandler {
return &GRPCClientStatsHandler{
Handler: &ocgrpc.ClientHandler{},
UnaryInterceptor: metrics.GRPCClientInterceptor(ServiceName(service)),
}
}
// DialOptions returns telemetry related DialOptions appended to an optional existing list
// of DialOptions
func (h *GRPCClientStatsHandler) DialOptions(o ...grpc.DialOption) []grpc.DialOption {
o = append(o,
grpc.WithUnaryInterceptor(h.UnaryInterceptor),
grpc.WithStatsHandler(h.Handler),
)
return o
}

View file

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
grpcstats "google.golang.org/grpc/stats"
)
@ -35,3 +36,27 @@ func Test_GRPCServerStatsHandler(t *testing.T) {
assert.Equal(t, ctx.Value(mockCtxTag("added")), "true")
assert.Equal(t, ctx.Value(mockCtxTag("original")), "true")
}
type mockDialOption struct {
name string
grpc.EmptyDialOption
}
func Test_NewGRPCClientStatsHandler(t *testing.T) {
t.Parallel()
h := NewGRPCClientStatsHandler("test")
origOpts := []grpc.DialOption{
mockDialOption{name: "one"},
mockDialOption{name: "two"},
}
newOpts := h.DialOptions(origOpts...)
for i := range origOpts {
assert.Contains(t, newOpts, origOpts[i])
}
assert.Greater(t, len(newOpts), len(origOpts))
}

View file

@ -150,6 +150,7 @@ func New(opts config.Options) (*Proxy, error) {
RequestTimeout: opts.GRPCClientTimeout,
ClientDNSRoundRobin: opts.GRPCClientDNSRoundRobin,
WithInsecure: opts.GRPCInsecure,
ServiceName: opts.Services,
})
if err != nil {
return nil, err