diff --git a/config/envoyconfig/listeners_main.go b/config/envoyconfig/listeners_main.go index 3fd323f14..bf19bed76 100644 --- a/config/envoyconfig/listeners_main.go +++ b/config/envoyconfig/listeners_main.go @@ -146,6 +146,14 @@ func (b *Builder) buildMainHTTPConnectionManagerFilter( }, Remove: false, }, + { + Header: "x-pomerium-external-parent-span", + OnHeaderPresent: &envoy_extensions_filters_http_header_to_metadata.Config_KeyValuePair{ + MetadataNamespace: "pomerium.internal", + Key: "external-parent-span", + }, + Remove: true, + }, }, ResponseRules: []*envoy_extensions_filters_http_header_to_metadata.Config_Rule{ { @@ -274,6 +282,28 @@ func (b *Builder) buildMainHTTPConnectionManagerFilter( }, }, }, + { + Tag: "pomerium.external-parent-span", + Type: &envoy_tracing_v3.CustomTag_Metadata_{ + Metadata: &envoy_tracing_v3.CustomTag_Metadata{ + Kind: &metadatav3.MetadataKind{ + Kind: &metadatav3.MetadataKind_Request_{ + Request: &metadatav3.MetadataKind_Request{}, + }, + }, + MetadataKey: &metadatav3.MetadataKey{ + Key: "pomerium.internal", + Path: []*metadatav3.MetadataKey_PathSegment{ + { + Segment: &metadatav3.MetadataKey_PathSegment_Key{ + Key: "external-parent-span", + }, + }, + }, + }, + }, + }, + }, }, }, // See https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-for diff --git a/config/envoyconfig/luascripts/trace-context.lua b/config/envoyconfig/luascripts/trace-context.lua index 892613bd9..2285bb1c9 100644 --- a/config/envoyconfig/luascripts/trace-context.lua +++ b/config/envoyconfig/luascripts/trace-context.lua @@ -27,6 +27,10 @@ function envoy_on_request(request_handle) if substitute_query_param("pomerium_traceparent", "x-pomerium-traceparent") then substitute_query_param("pomerium_tracestate", "x-pomerium-tracestate") end + local traceparent = headers:get("traceparent") + if traceparent ~= nil and #traceparent == 55 and headers:get("x-pomerium-traceparent") == nil then + headers:replace("x-pomerium-external-parent-span", traceparent:sub(37, 52)) + end end function envoy_on_response(response_handle) diff --git a/config/envoyconfig/testdata/main_http_connection_manager_filter.json b/config/envoyconfig/testdata/main_http_connection_manager_filter.json index 28ecd38e3..6d497c1d3 100644 --- a/config/envoyconfig/testdata/main_http_connection_manager_filter.json +++ b/config/envoyconfig/testdata/main_http_connection_manager_filter.json @@ -29,7 +29,7 @@ "typedConfig": { "@type": "type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua", "defaultSourceCode": { - "inlineString": "function envoy_on_request(request_handle)\n local headers = request_handle:headers()\n local path = headers:get(\":path\")\n\n if path:find(\"#\") ~= nil then\n return\n end\n\n local function substitute_query_param(query_param_name, header_name)\n local i, j = path:find(query_param_name .. \"=\")\n if i ~= nil and (path:sub(i - 1, i - 1) == \"&\" or path:sub(i - 1, i - 1) == \"?\") then\n local k = path:find(\"&\", j + 1)\n if k ~= nil then\n k = k - 1\n else\n k = #path\n end\n local value = path:sub(j + 1, k)\n if value ~= nil then\n headers:replace(header_name, value)\n return true\n end\n end\n return false\n end\n\n if substitute_query_param(\"pomerium_traceparent\", \"x-pomerium-traceparent\") then\n substitute_query_param(\"pomerium_tracestate\", \"x-pomerium-tracestate\")\n end\nend\n\nfunction envoy_on_response(response_handle)\nend\n" + "inlineString": "function envoy_on_request(request_handle)\n local headers = request_handle:headers()\n local path = headers:get(\":path\")\n\n if path:find(\"#\") ~= nil then\n return\n end\n\n local function substitute_query_param(query_param_name, header_name)\n local i, j = path:find(query_param_name .. \"=\")\n if i ~= nil and (path:sub(i - 1, i - 1) == \"\u0026\" or path:sub(i - 1, i - 1) == \"?\") then\n local k = path:find(\"\u0026\", j + 1)\n if k ~= nil then\n k = k - 1\n else\n k = #path\n end\n local value = path:sub(j + 1, k)\n if value ~= nil then\n headers:replace(header_name, value)\n return true\n end\n end\n return false\n end\n\n if substitute_query_param(\"pomerium_traceparent\", \"x-pomerium-traceparent\") then\n substitute_query_param(\"pomerium_tracestate\", \"x-pomerium-tracestate\")\n end\n local traceparent = headers:get(\"traceparent\")\n if traceparent ~= nil and #traceparent == 55 and headers:get(\"x-pomerium-traceparent\") == nil then\n headers:replace(\"x-pomerium-external-parent-span\", traceparent:sub(37, 52))\n end\nend\n\nfunction envoy_on_response(response_handle)\nend\n" } } }, @@ -51,6 +51,14 @@ "metadataNamespace": "pomerium.internal", "key": "tracestate" } + }, + { + "header": "x-pomerium-external-parent-span", + "onHeaderPresent": { + "key": "external-parent-span", + "metadataNamespace": "pomerium.internal" + }, + "remove": true } ], "responseRules": [ @@ -272,6 +280,22 @@ ] } } + }, + { + "metadata": { + "kind": { + "request": {} + }, + "metadataKey": { + "key": "pomerium.internal", + "path": [ + { + "key": "external-parent-span" + } + ] + } + }, + "tag": "pomerium.external-parent-span" } ], "provider": { diff --git a/internal/telemetry/trace/queue.go b/internal/telemetry/trace/queue.go index 28388dc66..ebe7123dd 100644 --- a/internal/telemetry/trace/queue.go +++ b/internal/telemetry/trace/queue.go @@ -3,6 +3,7 @@ package trace import ( "context" "encoding/base64" + "encoding/hex" "errors" "fmt" "net/url" @@ -40,7 +41,7 @@ func NewSpanExportQueue(ctx context.Context, client otlptrace.Client) *SpanExpor debug := systemContextFromContext(ctx).DebugFlags var observer SpanObserver if debug.Check(TrackSpanReferences) { - observer = &spanObserver{referencedIDs: make(map[oteltrace.SpanID]bool)} + observer = &spanObserver{referencedIDs: make(map[oteltrace.SpanID]oteltrace.SpanID)} } else { observer = noopSpanObserver{} } @@ -129,7 +130,7 @@ func (q *SpanExportQueue) Enqueue(ctx context.Context, req *coltracepb.ExportTra continue } if parentSpanID.IsValid() { // if parent is not a root span - q.observer.ObserveReference(parentSpanID) + q.observer.ObserveReference(parentSpanID, spanID) continue } traceID, ok := toTraceID(span.TraceId) @@ -186,13 +187,37 @@ func (q *SpanExportQueue) Enqueue(ctx context.Context, req *coltracepb.ExportTra if !ok { continue } + parentSpanId, ok := toSpanID(span.ParentSpanId) + if !ok { + continue + } q.observer.Observe(spanID) if mapping, ok := q.knownTraceIDMappings[traceID]; ok { id := mapping.Value() copy(span.TraceId, id[:]) knownSpans = append(knownSpans, span) } else { - q.insertPendingSpanLocked(resourceInfo, scope.Scope, scope.SchemaUrl, traceID, span) + var isInternalRoot bool + if q.debugFlags.Check(TrackSpanReferences) { + if parentSpanId.IsValid() { + for _, attr := range span.Attributes { + if attr.Key == "pomerium.external-parent-span" { + isInternalRoot = true + if bytes, err := hex.DecodeString(attr.Value.GetStringValue()); err == nil { + if spanId, _ := toSpanID(bytes); spanId.IsValid() { + q.observer.Observe(spanId) + } + } + break + } + } + } + } + if isInternalRoot { + toUpload = append(toUpload, q.resolveTraceIDMappingLocked(traceID, traceID)...) + } else { + q.insertPendingSpanLocked(resourceInfo, scope.Scope, scope.SchemaUrl, traceID, span) + } } } if len(knownSpans) > 0 { @@ -247,14 +272,23 @@ func (q *SpanExportQueue) Close(ctx context.Context) error { case <-q.closed: q.mu.Lock() defer q.mu.Unlock() + didWarn := false + if q.debugFlags.Check(TrackSpanReferences) { var unknownParentIDs []string - for id, known := range q.observer.(*spanObserver).referencedIDs { - if !known { - unknownParentIDs = append(unknownParentIDs, id.String()) + for id, via := range q.observer.(*spanObserver).referencedIDs { + if via.IsValid() { + if q.debugFlags.Check(TrackAllSpans) { + if viaSpan, ok := q.debugAllObservedSpans[via]; ok { + unknownParentIDs = append(unknownParentIDs, fmt.Sprintf("%s via %s (%s)", id, via, viaSpan.Name)) + } else { + unknownParentIDs = append(unknownParentIDs, fmt.Sprintf("%s via %s", id, via)) + } + } } } if len(unknownParentIDs) > 0 { + didWarn = true msg := startMsg("WARNING: parent spans referenced but never seen:\n") for _, str := range unknownParentIDs { msg.WriteString(str) @@ -263,7 +297,6 @@ func (q *SpanExportQueue) Close(ctx context.Context) error { endMsg(msg) } } - didWarn := false incomplete := len(q.pendingResourcesByTraceID) > 0 if incomplete && q.debugFlags.Check(WarnOnIncompleteTraces) { didWarn = true @@ -454,7 +487,7 @@ func (t *spanTracker) Shutdown(_ context.Context) error { for _, span := range incompleteSpans { fmt.Fprintf(msg, "%s\n", span) } - msg.WriteString("Note: set TrackAllObservedSpans flag for more info\n") + msg.WriteString("Note: set TrackAllSpans flag for more info\n") endMsg(msg) } } @@ -599,22 +632,22 @@ func (r *ResourceInfo) computeID() string { } type SpanObserver interface { - ObserveReference(id oteltrace.SpanID) + ObserveReference(id oteltrace.SpanID, via oteltrace.SpanID) Observe(id oteltrace.SpanID) Wait() } type spanObserver struct { mu sync.Mutex - referencedIDs map[oteltrace.SpanID]bool + referencedIDs map[oteltrace.SpanID]oteltrace.SpanID unobservedIDs sync.WaitGroup } -func (obs *spanObserver) ObserveReference(id oteltrace.SpanID) { +func (obs *spanObserver) ObserveReference(id oteltrace.SpanID, via oteltrace.SpanID) { obs.mu.Lock() defer obs.mu.Unlock() if _, referenced := obs.referencedIDs[id]; !referenced { - obs.referencedIDs[id] = false // referenced, but not observed + obs.referencedIDs[id] = via // referenced, but not observed obs.unobservedIDs.Add(1) } } @@ -622,8 +655,8 @@ func (obs *spanObserver) ObserveReference(id oteltrace.SpanID) { func (obs *spanObserver) Observe(id oteltrace.SpanID) { obs.mu.Lock() defer obs.mu.Unlock() - if observed, referenced := obs.referencedIDs[id]; !observed { // NB: subtle condition - obs.referencedIDs[id] = true + if observed, referenced := obs.referencedIDs[id]; !referenced || observed.IsValid() { // NB: subtle condition + obs.referencedIDs[id] = zeroSpanID if referenced { obs.unobservedIDs.Done() } @@ -631,14 +664,32 @@ func (obs *spanObserver) Observe(id oteltrace.SpanID) { } func (obs *spanObserver) Wait() { + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-done: + return + case <-time.After(10 * time.Second): + obs.mu.Lock() + msg := startMsg("Waiting on unobserved spans:\n") + for id, via := range obs.referencedIDs { + if via.IsValid() { + fmt.Fprintf(msg, "%s via %s\n", id, via) + } + } + endMsg(msg) + obs.mu.Unlock() + } + }() obs.unobservedIDs.Wait() } type noopSpanObserver struct{} -func (noopSpanObserver) ObserveReference(oteltrace.SpanID) {} -func (noopSpanObserver) Observe(oteltrace.SpanID) {} -func (noopSpanObserver) Wait() {} +func (noopSpanObserver) ObserveReference(oteltrace.SpanID, oteltrace.SpanID) {} +func (noopSpanObserver) Observe(oteltrace.SpanID) {} +func (noopSpanObserver) Wait() {} func formatSpanName(span *tracev1.Span) { hasPath := strings.Contains(span.GetName(), "${path}") diff --git a/internal/telemetry/trace/server.go b/internal/telemetry/trace/server.go index b1a844a10..d35f6f452 100644 --- a/internal/telemetry/trace/server.go +++ b/internal/telemetry/trace/server.go @@ -59,7 +59,10 @@ func (srv *ExporterServer) Start() { } func (srv *ExporterServer) NewClient() otlptrace.Client { - return otlptracegrpc.NewClient(otlptracegrpc.WithGRPCConn(srv.cc)) + return otlptracegrpc.NewClient( + otlptracegrpc.WithGRPCConn(srv.cc), + otlptracegrpc.WithTimeout(1*time.Minute), + ) } func (srv *ExporterServer) SpanProcessors() []sdktrace.SpanProcessor { @@ -78,7 +81,7 @@ func (srv *ExporterServer) Shutdown(ctx context.Context) error { return context.Cause(ctx) } var errs []error - if err := srv.spanExportQueue.WaitForSpans(5 * time.Second); err != nil { + if err := srv.spanExportQueue.WaitForSpans(30 * time.Second); err != nil { errs = append(errs, err) } if err := srv.spanExportQueue.Close(ctx); err != nil { diff --git a/internal/testenv/environment.go b/internal/testenv/environment.go index b0a8586c4..38cb721ff 100644 --- a/internal/testenv/environment.go +++ b/internal/testenv/environment.go @@ -351,8 +351,7 @@ func New(t testing.TB, opts ...EnvironmentOption) Environment { ctx := trace.Options{ DebugFlags: options.traceDebugFlags, - }.NewContext(context.Background()) - ctx = logger.WithContext(ctx) + }.NewContext(logger.WithContext(context.Background())) tracerProvider := trace.NewTracerProvider(ctx, "Test Environment") tracer := tracerProvider.Tracer(trace.PomeriumCoreTracer) ctx, span := tracer.Start(ctx, t.Name(), oteltrace.WithNewRoot()) @@ -593,7 +592,7 @@ func (e *environment) Start() { opts := []pomerium.Option{ pomerium.WithOverrideFileManager(fileMgr), - pomerium.WithEnvoyServerOptions(envoy.WithExitGracePeriod(10 * time.Second)), + pomerium.WithEnvoyServerOptions(envoy.WithExitGracePeriod(30 * time.Second)), pomerium.WithDataBrokerServerOptions( databroker_service.WithManagerOptions(manager.WithLeaseTTL(1*time.Second)), databroker_service.WithLegacyManagerOptions(legacymanager.WithLeaseTTL(1*time.Second)), diff --git a/internal/testenv/selftests/tracing_test.go b/internal/testenv/selftests/tracing_test.go index 36c1f92a6..39a54228d 100644 --- a/internal/testenv/selftests/tracing_test.go +++ b/internal/testenv/selftests/tracing_test.go @@ -18,7 +18,11 @@ import ( "github.com/pomerium/pomerium/internal/testenv/upstreams" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" oteltrace "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" @@ -27,10 +31,14 @@ import ( ) func requireOTLPTracesEndpoint(t testing.TB) { + t.Setenv("OTEL_TRACES_EXPORTER", "otlp") tracesEndpoint := os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") if tracesEndpoint == "" { - tracesEndpoint = "http://localhost:4317" - t.Setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", tracesEndpoint) + tracesEndpoint = os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + if tracesEndpoint == "" { + tracesEndpoint = "http://localhost:4317" + t.Setenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", tracesEndpoint) + } } client, err := grpc.NewClient(strings.TrimPrefix(tracesEndpoint, "http://"), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) @@ -165,3 +173,58 @@ func TestSampling(t *testing.T) { sampled.Store(0) notSampled.Store(100) } + +func TestExternalSpans(t *testing.T) { + requireOTLPTracesEndpoint(t) + // set up external tracer + external, err := otlptrace.New(context.Background(), otlptracegrpc.NewClient()) + require.NoError(t, err) + r, err := resource.Merge( + resource.Empty(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("External"), + ), + ) + require.NoError(t, err) + + tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(external), sdktrace.WithResource(r)) + + env := testenv.New(t, testenv.AddTraceDebugFlags(testenv.StandardTraceDebugFlags)) + defer env.Stop() + up := upstreams.HTTP(nil, upstreams.WithNoClientTracing()) + up.Handle("/foo", func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte("OK")) + }) + env.Add(scenarios.NewIDP([]*scenarios.User{ + { + Email: "foo@example.com", + FirstName: "Firstname", + LastName: "Lastname", + }, + })) + + route := up.Route(). + From(env.SubdomainURL("foo")). + PPL(`{"allow":{"and":["email":{"is":"foo@example.com"}]}}`) + + env.AddUpstream(up) + env.Start() + + snippets.WaitStartupComplete(env) + + ctx, span := tp.Tracer("external").Start(context.Background(), "External Root", oteltrace.WithNewRoot()) + t.Logf("external span id: %s", span.SpanContext().SpanID().String()) + resp, err := up.Get(route, upstreams.AuthenticateAs("foo@example.com"), upstreams.Path("/foo"), upstreams.Context(ctx)) + span.End() + require.NoError(t, err) + body, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + assert.NoError(t, resp.Body.Close()) + assert.Equal(t, resp.StatusCode, 200) + assert.Equal(t, "OK", string(body)) + + assert.NoError(t, tp.ForceFlush(context.Background())) + assert.NoError(t, tp.Shutdown(context.Background())) + external.Shutdown(ctx) +} diff --git a/internal/testenv/upstreams/http.go b/internal/testenv/upstreams/http.go index 78652b2d9..1923e722f 100644 --- a/internal/testenv/upstreams/http.go +++ b/internal/testenv/upstreams/http.go @@ -118,8 +118,8 @@ func ClientCert[T interface { } type HTTPUpstreamOptions struct { - displayName string - noClientTracing bool + displayName string + clientTracerProviderOverride oteltrace.TracerProvider } type HTTPUpstreamOption func(*HTTPUpstreamOptions) @@ -138,7 +138,13 @@ func WithDisplayName(displayName string) HTTPUpstreamOption { func WithNoClientTracing() HTTPUpstreamOption { return func(o *HTTPUpstreamOptions) { - o.noClientTracing = true + o.clientTracerProviderOverride = noop.NewTracerProvider() + } +} + +func WithClientTracerProvider(tp oteltrace.TracerProvider) HTTPUpstreamOption { + return func(o *HTTPUpstreamOptions) { + o.clientTracerProviderOverride = tp } } @@ -231,8 +237,8 @@ func (h *httpUpstream) Run(ctx context.Context) error { tlsConfig = h.tlsConfig.Value() } h.serverTracerProvider.Resolve(trace.NewTracerProvider(ctx, h.displayName)) - if h.noClientTracing { - h.clientTracerProvider.Resolve(noop.NewTracerProvider()) + if h.clientTracerProviderOverride != nil { + h.clientTracerProvider.Resolve(h.clientTracerProviderOverride) } else { h.clientTracerProvider.Resolve(trace.NewTracerProvider(ctx, "HTTP Client")) }