From 6ea51149f98265563e4fb67ce2740ca66999d102 Mon Sep 17 00:00:00 2001 From: Joe Kralicky Date: Fri, 24 Jan 2025 14:51:07 -0500 Subject: [PATCH 1/2] tracing: adjust envoy otel trace batching settings to match go sdk (#5446) --- config/envoyconfig/bootstrap.go | 13 ++++----- config/envoyconfig/bootstrap_test.go | 2 +- internal/benchmarks/latency_bench_test.go | 28 +++++++++++++++----- internal/telemetry/trace/global.go | 25 +++++++++++++++++ internal/telemetry/trace/server.go | 3 ++- internal/testenv/scenarios/trace_receiver.go | 5 +++- 6 files changed, 59 insertions(+), 17 deletions(-) diff --git a/config/envoyconfig/bootstrap.go b/config/envoyconfig/bootstrap.go index 665f216fb..f97338c44 100644 --- a/config/envoyconfig/bootstrap.go +++ b/config/envoyconfig/bootstrap.go @@ -150,8 +150,8 @@ func (b *Builder) BuildBootstrapDynamicResources( // BuildBootstrapLayeredRuntime builds the layered runtime for the envoy bootstrap. func (b *Builder) BuildBootstrapLayeredRuntime(ctx context.Context) (*envoy_config_bootstrap_v3.LayeredRuntime, error) { - flushIntervalMs := 5000 - minFlushSpans := 3 + flushIntervalMs := trace.BatchSpanProcessorScheduleDelay() + minFlushSpans := trace.BatchSpanProcessorMaxExportBatchSize() if trace.DebugFlagsFromContext(ctx).Check(trace.EnvoyFlushEverySpan) { minFlushSpans = 1 flushIntervalMs = math.MaxInt32 @@ -166,15 +166,12 @@ func (b *Builder) BuildBootstrapLayeredRuntime(ctx context.Context) (*envoy_conf "tracing": map[string]any{ "opentelemetry": map[string]any{ "flush_interval_ms": flushIntervalMs, - // For most requests, envoy generates 3 spans: + // Note: for most requests, envoy generates 3 spans: // - ingress (downstream->envoy) // - ext_authz check request (envoy->pomerium) // - egress (envoy->upstream) - // The default value is 5, which usually leads to delayed exports. - // This can be set lower, e.g. 1 to have envoy export every span - // individually (useful for testing), but 3 is a reasonable default. - // If set to 1, also set flush_interval_ms to a very large number to - // effectively disable it. + // Some requests only generate 2 spans, e.g. if there is no upstream + // request made or auth fails. "min_flush_spans": minFlushSpans, }, }, diff --git a/config/envoyconfig/bootstrap_test.go b/config/envoyconfig/bootstrap_test.go index ec27b7fe7..09d4ab7eb 100644 --- a/config/envoyconfig/bootstrap_test.go +++ b/config/envoyconfig/bootstrap_test.go @@ -51,7 +51,7 @@ func TestBuilder_BuildBootstrapLayeredRuntime(t *testing.T) { "tracing": { "opentelemetry": { "flush_interval_ms": 5000, - "min_flush_spans": 3 + "min_flush_spans": 512 } } } diff --git a/internal/benchmarks/latency_bench_test.go b/internal/benchmarks/latency_bench_test.go index d25185942..4fc0e3d9a 100644 --- a/internal/benchmarks/latency_bench_test.go +++ b/internal/benchmarks/latency_bench_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/testenv" "github.com/pomerium/pomerium/internal/testenv/envutil" "github.com/pomerium/pomerium/internal/testenv/scenarios" @@ -18,18 +19,30 @@ import ( ) var ( - numRoutes int - dumpErrLogs bool + numRoutes int + dumpErrLogs bool + enableTracing bool + publicRoutes bool ) func init() { flag.IntVar(&numRoutes, "routes", 100, "number of routes") flag.BoolVar(&dumpErrLogs, "dump-err-logs", false, "if the test fails, write all captured logs to a file (testdata/)") + flag.BoolVar(&enableTracing, "enable-tracing", false, "enable tracing") + flag.BoolVar(&publicRoutes, "public-routes", false, "use public unauthenticated routes") } func TestRequestLatency(t *testing.T) { resume := envutil.PauseProfiling(t) - env := testenv.New(t, testenv.Silent()) + var env testenv.Environment + if enableTracing { + receiver := scenarios.NewOTLPTraceReceiver() + env = testenv.New(t, testenv.Silent(), testenv.WithTraceClient(receiver.NewGRPCClient())) + env.Add(receiver) + } else { + env = testenv.New(t, testenv.Silent()) + } + users := []*scenarios.User{} for i := range numRoutes { users = append(users, &scenarios.User{ @@ -47,9 +60,12 @@ func TestRequestLatency(t *testing.T) { routes := make([]testenv.Route, numRoutes) for i := range numRoutes { routes[i] = up.Route(). - From(env.SubdomainURL(fmt.Sprintf("from-%d", i))). - // Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true }) - PPL(fmt.Sprintf(`{"allow":{"and":["email":{"is":"user%d@example.com"}]}}`, i)) + From(env.SubdomainURL(fmt.Sprintf("from-%d", i))) + if publicRoutes { + routes[i] = routes[i].Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true }) + } else { + routes[i] = routes[i].PPL(fmt.Sprintf(`{"allow":{"and":["email":{"is":"user%d@example.com"}]}}`, i)) + } } env.AddUpstream(up) diff --git a/internal/telemetry/trace/global.go b/internal/telemetry/trace/global.go index 16b7a5a4f..4f6338dc3 100644 --- a/internal/telemetry/trace/global.go +++ b/internal/telemetry/trace/global.go @@ -2,9 +2,12 @@ package trace import ( "context" + "os" + "strconv" "go.opentelemetry.io/contrib/propagators/autoprop" "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/embedded" ) @@ -44,3 +47,25 @@ var _ trace.Tracer = panicTracer{} func (p panicTracer) Start(context.Context, string, ...trace.SpanStartOption) (context.Context, trace.Span) { panic("global tracer used") } + +// functions below mimic those with the same name in otel/sdk/internal/env/env.go + +func BatchSpanProcessorScheduleDelay() int { + const defaultValue = sdktrace.DefaultScheduleDelay + if v, ok := os.LookupEnv("OTEL_BSP_SCHEDULE_DELAY"); ok { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return defaultValue +} + +func BatchSpanProcessorMaxExportBatchSize() int { + const defaultValue = sdktrace.DefaultMaxExportBatchSize + if v, ok := os.LookupEnv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE"); ok { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return defaultValue +} diff --git a/internal/telemetry/trace/server.go b/internal/telemetry/trace/server.go index 1bd14c7d0..779455b6d 100644 --- a/internal/telemetry/trace/server.go +++ b/internal/telemetry/trace/server.go @@ -50,7 +50,7 @@ func NewServer(ctx context.Context) *ExporterServer { } func (srv *ExporterServer) Start(ctx context.Context) { - lis := bufconn.Listen(4096) + lis := bufconn.Listen(2 * 1024 * 1024) go func() { if err := srv.remoteClient.Start(ctx); err != nil { panic(err) @@ -95,5 +95,6 @@ func (srv *ExporterServer) Shutdown(ctx context.Context) error { if err := srv.remoteClient.Stop(ctx); err != nil { errs = append(errs, err) } + srv.cc.Close() return errors.Join(errs...) } diff --git a/internal/testenv/scenarios/trace_receiver.go b/internal/testenv/scenarios/trace_receiver.go index cebdf8962..f318d59ae 100644 --- a/internal/testenv/scenarios/trace_receiver.go +++ b/internal/testenv/scenarios/trace_receiver.go @@ -75,7 +75,10 @@ func (rec *OTLPTraceReceiver) Attach(ctx context.Context) { } // Modify implements testenv.Modifier. -func (rec *OTLPTraceReceiver) Modify(*config.Config) {} +func (rec *OTLPTraceReceiver) Modify(cfg *config.Config) { + cfg.Options.TracingProvider = "otlp" + cfg.Options.TracingOTLPEndpoint = rec.GRPCEndpointURL().Value() +} func (rec *OTLPTraceReceiver) handleV1Traces(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Content-Type") != "application/x-protobuf" { From c307ca806a215b15005da0ceefd42d44a5dc46df Mon Sep 17 00:00:00 2001 From: Joe Kralicky Date: Fri, 24 Jan 2025 14:55:21 -0500 Subject: [PATCH 2/2] fix testcontainers docker client using the global tracer provider (#5440) --- internal/telemetry/trace/client.go | 32 ++++++++++++++++++++++++++++++ internal/testutil/minio.go | 3 +++ internal/testutil/postgres.go | 3 +++ 3 files changed, 38 insertions(+) diff --git a/internal/telemetry/trace/client.go b/internal/telemetry/trace/client.go index 0dcf332f9..f72d66d11 100644 --- a/internal/telemetry/trace/client.go +++ b/internal/telemetry/trace/client.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + oteltrace "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" v1 "go.opentelemetry.io/proto/otlp/trace/v1" ) @@ -226,6 +228,36 @@ func (n NoopClient) UploadTraces(context.Context, []*v1.ResourceSpans) error { return nil } +// ValidNoopSpan is the same as noop.Span, except with a "valid" span context +// (has a non-zero trace and span ID). +// +// Adding this into a context as follows: +// +// ctx = oteltrace.ContextWithSpan(ctx, trace.ValidNoopSpan{}) +// +// will prevent some usages of the global tracer provider by libraries such +// as otelhttp, which only uses the global provider if the context's span +// is "invalid". +type ValidNoopSpan struct { + noop.Span +} + +var noopTraceID = oteltrace.TraceID{ + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, +} + +var noopSpanID = oteltrace.SpanID{ + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, +} + +// SpanContext implements trace.Span. +func (n ValidNoopSpan) SpanContext() oteltrace.SpanContext { + return n.Span.SpanContext().WithTraceID(noopTraceID).WithSpanID(noopSpanID) +} + +var _ oteltrace.Span = ValidNoopSpan{} + func IsDisabledViaEnvironment() bool { if os.Getenv("OTEL_SDK_DISABLED") == "true" { return true diff --git a/internal/testutil/minio.go b/internal/testutil/minio.go index 69ee9828c..2b0d7e57e 100644 --- a/internal/testutil/minio.go +++ b/internal/testutil/minio.go @@ -6,8 +6,10 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + oteltrace "go.opentelemetry.io/otel/trace" ) // WithTestMinIO starts a test MinIO server @@ -15,6 +17,7 @@ func WithTestMinIO(t *testing.T, bucket string, handler func(endpoint string)) { t.Helper() ctx := GetContext(t, maxWait) + ctx = oteltrace.ContextWithSpan(ctx, trace.ValidNoopSpan{}) container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ diff --git a/internal/testutil/postgres.go b/internal/testutil/postgres.go index fedc417e3..15bb70826 100644 --- a/internal/testutil/postgres.go +++ b/internal/testutil/postgres.go @@ -8,8 +8,10 @@ import ( "github.com/google/uuid" "github.com/jackc/pgx/v5" + "github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + oteltrace "go.opentelemetry.io/otel/trace" ) // WithTestPostgres starts a postgres database. @@ -17,6 +19,7 @@ func WithTestPostgres(t *testing.T, handler func(dsn string)) { t.Helper() ctx := GetContext(t, maxWait) + ctx = oteltrace.ContextWithSpan(ctx, trace.ValidNoopSpan{}) container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{