telemetry: support b3 headers on gRPC server calls (#2376)

This commit is contained in:
Caleb Doxsey 2021-07-20 05:36:58 -06:00 committed by GitHub
parent a02010409a
commit fbf44261c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 6 deletions

View file

@ -30,11 +30,11 @@ import (
"github.com/shirou/gopsutil/v3/process" "github.com/shirou/gopsutil/v3/process"
"google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/protojson"
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/config/envoyconfig" "github.com/pomerium/pomerium/config/envoyconfig"
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
) )
const ( const (
@ -227,8 +227,8 @@ func (srv *Server) writeConfig(ctx context.Context, cfg *config.Config) error {
func (srv *Server) buildBootstrapConfig(cfg *config.Config) ([]byte, error) { func (srv *Server) buildBootstrapConfig(cfg *config.Config) ([]byte, error) {
nodeCfg := &envoy_config_core_v3.Node{ nodeCfg := &envoy_config_core_v3.Node{
Id: "proxy", Id: telemetry.ServiceName(cfg.Options.Services),
Cluster: "proxy", Cluster: telemetry.ServiceName(cfg.Options.Services),
} }
adminCfg, err := srv.builder.BuildBootstrapAdmin(cfg) adminCfg, err := srv.builder.BuildBootstrapAdmin(cfg)

View file

@ -2,14 +2,25 @@ package telemetry
import ( import (
"context" "context"
"strings"
"go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/metadata"
grpcstats "google.golang.org/grpc/stats" grpcstats "google.golang.org/grpc/stats"
"github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/telemetry/metrics"
) )
const (
grpcTraceBinHeader = "grpc-trace-bin"
b3TraceIDHeader = "x-b3-traceid"
b3SpanIDHeader = "x-b3-spanid"
)
type tagRPCHandler interface { type tagRPCHandler interface {
TagRPC(context.Context, *grpcstats.RPCTagInfo) context.Context TagRPC(context.Context, *grpcstats.RPCTagInfo) context.Context
} }
@ -23,9 +34,40 @@ type GRPCServerStatsHandler struct {
// TagRPC implements grpc.stats.Handler and adds metrics and tracing metadata to the context of a given RPC // TagRPC implements grpc.stats.Handler and adds metrics and tracing metadata to the context of a given RPC
func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context { func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context {
handledCtx := h.Handler.TagRPC(ctx, tagInfo) // the opencensus trace handler only supports grpc-trace-bin, so we use that code and support b3 too
metricCtx := h.metricsHandler.TagRPC(handledCtx, tagInfo)
md, _ := metadata.FromIncomingContext(ctx)
name := strings.TrimPrefix(tagInfo.FullMethodName, "/")
name = strings.Replace(name, "/", ".", -1)
var parent trace.SpanContext
hasParent := false
if traceBin := md[grpcTraceBinHeader]; len(traceBin) > 0 {
parent, hasParent = propagation.FromBinary([]byte(traceBin[0]))
}
if hdr := md[b3TraceIDHeader]; len(hdr) > 0 {
if tid, ok := b3.ParseTraceID(hdr[0]); ok {
parent.TraceID = tid
hasParent = true
}
}
if hdr := md[b3SpanIDHeader]; len(hdr) > 0 {
if sid, ok := b3.ParseSpanID(hdr[0]); ok {
parent.SpanID = sid
hasParent = true
}
}
if hasParent {
ctx, _ = trace.StartSpanWithRemoteParent(ctx, name, parent,
trace.WithSpanKind(trace.SpanKindServer))
} else {
ctx, _ = trace.StartSpan(ctx, name,
trace.WithSpanKind(trace.SpanKindServer))
}
metricCtx := h.metricsHandler.TagRPC(ctx, tagInfo)
return metricCtx return metricCtx
} }

View file

@ -6,6 +6,9 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
"google.golang.org/grpc/metadata"
grpcstats "google.golang.org/grpc/stats" grpcstats "google.golang.org/grpc/stats"
) )
@ -28,9 +31,17 @@ func Test_GRPCServerStatsHandler(t *testing.T) {
} }
ctx := context.WithValue(context.Background(), mockCtxTag("original"), "true") ctx := context.WithValue(context.Background(), mockCtxTag("original"), "true")
ctx = metadata.NewIncomingContext(ctx, metadata.MD{
b3TraceIDHeader: {"9de3f6756f315fef"},
b3SpanIDHeader: {"b4f83d3096b6bf9c"},
})
ctx = h.TagRPC(ctx, &grpcstats.RPCTagInfo{}) ctx = h.TagRPC(ctx, &grpcstats.RPCTagInfo{})
assert.True(t, metricsHandler.called) assert.True(t, metricsHandler.called)
assert.Equal(t, ctx.Value(mockCtxTag("added")), "true") assert.Equal(t, ctx.Value(mockCtxTag("added")), "true")
assert.Equal(t, ctx.Value(mockCtxTag("original")), "true") assert.Equal(t, ctx.Value(mockCtxTag("original")), "true")
span := trace.FromContext(ctx)
expectedTraceID, _ := b3.ParseTraceID("9de3f6756f315fef")
assert.Equal(t, expectedTraceID, span.SpanContext().TraceID)
} }