diff --git a/internal/cmd/pomerium/pomerium.go b/internal/cmd/pomerium/pomerium.go index 8ca89e516..f2fbdb86a 100644 --- a/internal/cmd/pomerium/pomerium.go +++ b/internal/cmd/pomerium/pomerium.go @@ -61,6 +61,9 @@ func Run(ctx context.Context, configFile string) error { _, grpcPort, _ := net.SplitHostPort(controlPlane.GRPCListener.Addr().String()) _, httpPort, _ := net.SplitHostPort(controlPlane.HTTPListener.Addr().String()) + log.Info().Str("port", grpcPort).Msg("gRPC server started") + log.Info().Str("port", httpPort).Msg("HTTP server started") + // create envoy server envoyServer, err := envoy.NewServer(opt, grpcPort, httpPort) if err != nil { diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index 3a253c630..6a92fcbdd 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -14,7 +14,7 @@ import ( "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" - "github.com/pomerium/pomerium/internal/telemetry/metrics" + "github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/telemetry/requestid" ) @@ -61,7 +61,7 @@ func NewServer() (*Server, error) { return nil, err } srv.GRPCServer = grpc.NewServer( - grpc.StatsHandler(metrics.NewGRPCServerStatsHandler("control_plane")), + grpc.StatsHandler(telemetry.NewGRPCServerStatsHandler("control_plane")), grpc.UnaryInterceptor(requestid.UnaryServerInterceptor()), grpc.StreamInterceptor(requestid.StreamServerInterceptor()), ) diff --git a/internal/telemetry/doc.go b/internal/telemetry/doc.go new file mode 100644 index 000000000..b6caaf0f0 --- /dev/null +++ b/internal/telemetry/doc.go @@ -0,0 +1,2 @@ +// Package telemetry contains metrics and tracing constructs +package telemetry diff --git a/internal/telemetry/grpc.go b/internal/telemetry/grpc.go new file mode 100644 index 000000000..a36966f39 --- /dev/null +++ b/internal/telemetry/grpc.go @@ -0,0 +1,39 @@ +package telemetry + +import ( + "context" + + "go.opencensus.io/plugin/ocgrpc" + grpcstats "google.golang.org/grpc/stats" + + "github.com/pomerium/pomerium/internal/telemetry/metrics" +) + +type tagRPCHandler interface { + TagRPC(context.Context, *grpcstats.RPCTagInfo) context.Context +} + +// GRPCServerStatsHandler provides a grpc stats.Handler for metrics and tracing for a pomerium service +type GRPCServerStatsHandler struct { + service string + metricsHandler tagRPCHandler + grpcstats.Handler +} + +// TagRPC implements grpc.stats.Handler and adds metrics and tracing metadata to the context of a given RPC +func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context { + + metricCtx := h.metricsHandler.TagRPC(ctx, tagInfo) + handledCtx := h.Handler.TagRPC(metricCtx, tagInfo) + + return handledCtx +} + +// NewGRPCServerStatsHandler creates a new GRPCServerStatsHandler for a pomerium service +func NewGRPCServerStatsHandler(service string) grpcstats.Handler { + return &GRPCServerStatsHandler{ + service: service, + Handler: &ocgrpc.ServerHandler{}, + metricsHandler: metrics.NewGRPCServerMetricsHandler(service), + } +} diff --git a/internal/telemetry/grpc_test.go b/internal/telemetry/grpc_test.go new file mode 100644 index 000000000..d2e208f2b --- /dev/null +++ b/internal/telemetry/grpc_test.go @@ -0,0 +1,37 @@ +package telemetry + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opencensus.io/plugin/ocgrpc" + grpcstats "google.golang.org/grpc/stats" +) + +type mockTagHandler struct { + called bool +} + +type mockCtxTag string + +func (m *mockTagHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context { + m.called = true + return context.WithValue(ctx, mockCtxTag("added"), "true") +} + +func Test_GRPCServerStatsHandler(t *testing.T) { + + metricsHandler := &mockTagHandler{} + h := &GRPCServerStatsHandler{ + metricsHandler: metricsHandler, + Handler: &ocgrpc.ServerHandler{}, + } + + ctx := context.WithValue(context.Background(), mockCtxTag("original"), "true") + ctx = h.TagRPC(ctx, &grpcstats.RPCTagInfo{}) + + assert.True(t, metricsHandler.called) + assert.Equal(t, ctx.Value(mockCtxTag("added")), "true") + assert.Equal(t, ctx.Value(mockCtxTag("original")), "true") +} diff --git a/internal/telemetry/metrics/grpc.go b/internal/telemetry/metrics/grpc.go index dd69f97c5..c51e7b36a 100644 --- a/internal/telemetry/metrics/grpc.go +++ b/internal/telemetry/metrics/grpc.go @@ -4,12 +4,13 @@ import ( "context" "strings" - "github.com/pomerium/pomerium/internal/log" "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/stats/view" "go.opencensus.io/tag" "google.golang.org/grpc" grpcstats "google.golang.org/grpc/stats" + + "github.com/pomerium/pomerium/internal/log" ) // GRPC Views @@ -110,6 +111,9 @@ var ( // GRPCClientInterceptor creates a UnaryClientInterceptor which updates the RPC // context with metric tag metadata +// +// TODO: This handler will NOT currently propagate B3 headers to upstream servers. See +// GRPCServerStatsHandler for changes required func GRPCClientInterceptor(service string) grpc.UnaryClientInterceptor { return func( ctx context.Context, @@ -147,17 +151,18 @@ func GRPCClientInterceptor(service string) grpc.UnaryClientInterceptor { } -// GRPCServerStatsHandler provides a grpc stats.Handler for a pomerium service to add tags and track -// metrics to server side calls -type GRPCServerStatsHandler struct { +// GRPCServerMetricsHandler implements a telemetry tagRPCHandler methods for metrics +type GRPCServerMetricsHandler struct { service string - grpcstats.Handler } -// TagRPC implements grpc.stats.Handler and adds tags to the context of a given RPC -func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context { +// NewGRPCServerMetricsHandler creates a new GRPCServerStatsHandler for a pomerium service +func NewGRPCServerMetricsHandler(service string) *GRPCServerMetricsHandler { + return &GRPCServerMetricsHandler{service: service} +} - handledCtx := h.Handler.TagRPC(ctx, tagInfo) +// TagRPC handles adding any metrics related values to the incoming context +func (h *GRPCServerMetricsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context { // Split the method into parts for better slicing rpcInfo := strings.SplitN(tagInfo.FullMethodName, "/", 3) @@ -169,21 +174,16 @@ func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats. } taggedCtx, tagErr := tag.New( - handledCtx, + ctx, tag.Upsert(TagKeyService, h.service), tag.Upsert(TagKeyGRPCMethod, rpcMethod), tag.Upsert(TagKeyGRPCService, rpcService), ) if tagErr != nil { log.Warn().Err(tagErr).Str("context", "GRPCServerStatsHandler").Msg("telemetry/metrics: failed to create context") - return handledCtx + return ctx } return taggedCtx } - -// NewGRPCServerStatsHandler creates a new GRPCServerStatsHandler for a pomerium service -func NewGRPCServerStatsHandler(service string) grpcstats.Handler { - return &GRPCServerStatsHandler{service: service, Handler: &ocgrpc.ServerHandler{}} -} diff --git a/internal/telemetry/metrics/grpc_test.go b/internal/telemetry/metrics/grpc_test.go index 7239d6291..ba229be91 100644 --- a/internal/telemetry/metrics/grpc_test.go +++ b/internal/telemetry/metrics/grpc_test.go @@ -11,6 +11,8 @@ import ( "google.golang.org/grpc/status" ) +var statsHandler = &ocgrpc.ServerHandler{} + type testProto struct { message string } @@ -118,15 +120,17 @@ func Test_GRPCClientInterceptor(t *testing.T) { } } -func mockServerRPCHandle(statsHandler stats.Handler, method string, errorCode error) { +func mockServerRPCHandle(metricsHandler *GRPCServerMetricsHandler, method string, errorCode error) { message := "hello" ctx := statsHandler.TagRPC(context.Background(), &stats.RPCTagInfo{FullMethodName: method}) + ctx = metricsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method}) + statsHandler.HandleRPC(ctx, &stats.InPayload{Client: false, Length: len(message)}) statsHandler.HandleRPC(ctx, &stats.OutPayload{Client: false, Length: len(message)}) statsHandler.HandleRPC(ctx, &stats.End{Client: false, Error: errorCode}) } -func Test_GRPCServerStatsHandler(t *testing.T) { +func Test_GRPCServerMetricsHandler(t *testing.T) { tests := []struct { name string method string @@ -171,8 +175,8 @@ func Test_GRPCServerStatsHandler(t *testing.T) { view.Unregister(GRPCServerViews...) view.Register(GRPCServerViews...) - statsHandler := NewGRPCServerStatsHandler("test_service") - mockServerRPCHandle(statsHandler, tt.method, tt.errorCode) + metricsHandler := NewGRPCServerMetricsHandler("test_service") + mockServerRPCHandle(metricsHandler, tt.method, tt.errorCode) testDataRetrieval(GRPCServerResponseSizeView, t, tt.wantgrpcServerResponseSize) testDataRetrieval(GRPCServerRequestDurationView, t, tt.wantgrpcServerRequestDuration)