telemetry: Refactor GRPC Server Handler (#756)

* Refactor GRPC server stats handler location
This commit is contained in:
Travis Groth 2020-05-22 13:36:55 -04:00 committed by GitHub
parent e2a7149c36
commit ca5f68e371
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 106 additions and 21 deletions

View file

@ -61,6 +61,9 @@ func Run(ctx context.Context, configFile string) error {
_, grpcPort, _ := net.SplitHostPort(controlPlane.GRPCListener.Addr().String()) _, grpcPort, _ := net.SplitHostPort(controlPlane.GRPCListener.Addr().String())
_, httpPort, _ := net.SplitHostPort(controlPlane.HTTPListener.Addr().String()) _, httpPort, _ := net.SplitHostPort(controlPlane.HTTPListener.Addr().String())
log.Info().Str("port", grpcPort).Msg("gRPC server started")
log.Info().Str("port", httpPort).Msg("HTTP server started")
// create envoy server // create envoy server
envoyServer, err := envoy.NewServer(opt, grpcPort, httpPort) envoyServer, err := envoy.NewServer(opt, grpcPort, httpPort)
if err != nil { if err != nil {

View file

@ -14,7 +14,7 @@ import (
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/requestid" "github.com/pomerium/pomerium/internal/telemetry/requestid"
) )
@ -61,7 +61,7 @@ func NewServer() (*Server, error) {
return nil, err return nil, err
} }
srv.GRPCServer = grpc.NewServer( srv.GRPCServer = grpc.NewServer(
grpc.StatsHandler(metrics.NewGRPCServerStatsHandler("control_plane")), grpc.StatsHandler(telemetry.NewGRPCServerStatsHandler("control_plane")),
grpc.UnaryInterceptor(requestid.UnaryServerInterceptor()), grpc.UnaryInterceptor(requestid.UnaryServerInterceptor()),
grpc.StreamInterceptor(requestid.StreamServerInterceptor()), grpc.StreamInterceptor(requestid.StreamServerInterceptor()),
) )

View file

@ -0,0 +1,2 @@
// Package telemetry contains metrics and tracing constructs
package telemetry

View file

@ -0,0 +1,39 @@
package telemetry
import (
"context"
"go.opencensus.io/plugin/ocgrpc"
grpcstats "google.golang.org/grpc/stats"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
)
type tagRPCHandler interface {
TagRPC(context.Context, *grpcstats.RPCTagInfo) context.Context
}
// GRPCServerStatsHandler provides a grpc stats.Handler for metrics and tracing for a pomerium service
type GRPCServerStatsHandler struct {
service string
metricsHandler tagRPCHandler
grpcstats.Handler
}
// TagRPC implements grpc.stats.Handler and adds metrics and tracing metadata to the context of a given RPC
func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context {
metricCtx := h.metricsHandler.TagRPC(ctx, tagInfo)
handledCtx := h.Handler.TagRPC(metricCtx, tagInfo)
return handledCtx
}
// NewGRPCServerStatsHandler creates a new GRPCServerStatsHandler for a pomerium service
func NewGRPCServerStatsHandler(service string) grpcstats.Handler {
return &GRPCServerStatsHandler{
service: service,
Handler: &ocgrpc.ServerHandler{},
metricsHandler: metrics.NewGRPCServerMetricsHandler(service),
}
}

View file

@ -0,0 +1,37 @@
package telemetry
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"go.opencensus.io/plugin/ocgrpc"
grpcstats "google.golang.org/grpc/stats"
)
type mockTagHandler struct {
called bool
}
type mockCtxTag string
func (m *mockTagHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context {
m.called = true
return context.WithValue(ctx, mockCtxTag("added"), "true")
}
func Test_GRPCServerStatsHandler(t *testing.T) {
metricsHandler := &mockTagHandler{}
h := &GRPCServerStatsHandler{
metricsHandler: metricsHandler,
Handler: &ocgrpc.ServerHandler{},
}
ctx := context.WithValue(context.Background(), mockCtxTag("original"), "true")
ctx = h.TagRPC(ctx, &grpcstats.RPCTagInfo{})
assert.True(t, metricsHandler.called)
assert.Equal(t, ctx.Value(mockCtxTag("added")), "true")
assert.Equal(t, ctx.Value(mockCtxTag("original")), "true")
}

View file

@ -4,12 +4,13 @@ import (
"context" "context"
"strings" "strings"
"github.com/pomerium/pomerium/internal/log"
"go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"google.golang.org/grpc" "google.golang.org/grpc"
grpcstats "google.golang.org/grpc/stats" grpcstats "google.golang.org/grpc/stats"
"github.com/pomerium/pomerium/internal/log"
) )
// GRPC Views // GRPC Views
@ -110,6 +111,9 @@ var (
// GRPCClientInterceptor creates a UnaryClientInterceptor which updates the RPC // GRPCClientInterceptor creates a UnaryClientInterceptor which updates the RPC
// context with metric tag metadata // context with metric tag metadata
//
// TODO: This handler will NOT currently propagate B3 headers to upstream servers. See
// GRPCServerStatsHandler for changes required
func GRPCClientInterceptor(service string) grpc.UnaryClientInterceptor { func GRPCClientInterceptor(service string) grpc.UnaryClientInterceptor {
return func( return func(
ctx context.Context, ctx context.Context,
@ -147,17 +151,18 @@ func GRPCClientInterceptor(service string) grpc.UnaryClientInterceptor {
} }
// GRPCServerStatsHandler provides a grpc stats.Handler for a pomerium service to add tags and track // GRPCServerMetricsHandler implements a telemetry tagRPCHandler methods for metrics
// metrics to server side calls type GRPCServerMetricsHandler struct {
type GRPCServerStatsHandler struct {
service string service string
grpcstats.Handler
} }
// TagRPC implements grpc.stats.Handler and adds tags to the context of a given RPC // NewGRPCServerMetricsHandler creates a new GRPCServerStatsHandler for a pomerium service
func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context { func NewGRPCServerMetricsHandler(service string) *GRPCServerMetricsHandler {
return &GRPCServerMetricsHandler{service: service}
}
handledCtx := h.Handler.TagRPC(ctx, tagInfo) // TagRPC handles adding any metrics related values to the incoming context
func (h *GRPCServerMetricsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context {
// Split the method into parts for better slicing // Split the method into parts for better slicing
rpcInfo := strings.SplitN(tagInfo.FullMethodName, "/", 3) rpcInfo := strings.SplitN(tagInfo.FullMethodName, "/", 3)
@ -169,21 +174,16 @@ func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.
} }
taggedCtx, tagErr := tag.New( taggedCtx, tagErr := tag.New(
handledCtx, ctx,
tag.Upsert(TagKeyService, h.service), tag.Upsert(TagKeyService, h.service),
tag.Upsert(TagKeyGRPCMethod, rpcMethod), tag.Upsert(TagKeyGRPCMethod, rpcMethod),
tag.Upsert(TagKeyGRPCService, rpcService), tag.Upsert(TagKeyGRPCService, rpcService),
) )
if tagErr != nil { if tagErr != nil {
log.Warn().Err(tagErr).Str("context", "GRPCServerStatsHandler").Msg("telemetry/metrics: failed to create context") log.Warn().Err(tagErr).Str("context", "GRPCServerStatsHandler").Msg("telemetry/metrics: failed to create context")
return handledCtx return ctx
} }
return taggedCtx return taggedCtx
} }
// NewGRPCServerStatsHandler creates a new GRPCServerStatsHandler for a pomerium service
func NewGRPCServerStatsHandler(service string) grpcstats.Handler {
return &GRPCServerStatsHandler{service: service, Handler: &ocgrpc.ServerHandler{}}
}

View file

@ -11,6 +11,8 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
var statsHandler = &ocgrpc.ServerHandler{}
type testProto struct { type testProto struct {
message string message string
} }
@ -118,15 +120,17 @@ func Test_GRPCClientInterceptor(t *testing.T) {
} }
} }
func mockServerRPCHandle(statsHandler stats.Handler, method string, errorCode error) { func mockServerRPCHandle(metricsHandler *GRPCServerMetricsHandler, method string, errorCode error) {
message := "hello" message := "hello"
ctx := statsHandler.TagRPC(context.Background(), &stats.RPCTagInfo{FullMethodName: method}) ctx := statsHandler.TagRPC(context.Background(), &stats.RPCTagInfo{FullMethodName: method})
ctx = metricsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
statsHandler.HandleRPC(ctx, &stats.InPayload{Client: false, Length: len(message)}) statsHandler.HandleRPC(ctx, &stats.InPayload{Client: false, Length: len(message)})
statsHandler.HandleRPC(ctx, &stats.OutPayload{Client: false, Length: len(message)}) statsHandler.HandleRPC(ctx, &stats.OutPayload{Client: false, Length: len(message)})
statsHandler.HandleRPC(ctx, &stats.End{Client: false, Error: errorCode}) statsHandler.HandleRPC(ctx, &stats.End{Client: false, Error: errorCode})
} }
func Test_GRPCServerStatsHandler(t *testing.T) { func Test_GRPCServerMetricsHandler(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
method string method string
@ -171,8 +175,8 @@ func Test_GRPCServerStatsHandler(t *testing.T) {
view.Unregister(GRPCServerViews...) view.Unregister(GRPCServerViews...)
view.Register(GRPCServerViews...) view.Register(GRPCServerViews...)
statsHandler := NewGRPCServerStatsHandler("test_service") metricsHandler := NewGRPCServerMetricsHandler("test_service")
mockServerRPCHandle(statsHandler, tt.method, tt.errorCode) mockServerRPCHandle(metricsHandler, tt.method, tt.errorCode)
testDataRetrieval(GRPCServerResponseSizeView, t, tt.wantgrpcServerResponseSize) testDataRetrieval(GRPCServerResponseSizeView, t, tt.wantgrpcServerResponseSize)
testDataRetrieval(GRPCServerRequestDurationView, t, tt.wantgrpcServerRequestDuration) testDataRetrieval(GRPCServerRequestDurationView, t, tt.wantgrpcServerRequestDuration)