tracing: adjust envoy otel trace batching settings to match go sdk (#5446)

This commit is contained in:
Joe Kralicky 2025-01-24 14:51:07 -05:00 committed by GitHub
parent 0bd6d8cc83
commit 6ea51149f9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 59 additions and 17 deletions

View file

@ -150,8 +150,8 @@ func (b *Builder) BuildBootstrapDynamicResources(
// BuildBootstrapLayeredRuntime builds the layered runtime for the envoy bootstrap. // BuildBootstrapLayeredRuntime builds the layered runtime for the envoy bootstrap.
func (b *Builder) BuildBootstrapLayeredRuntime(ctx context.Context) (*envoy_config_bootstrap_v3.LayeredRuntime, error) { func (b *Builder) BuildBootstrapLayeredRuntime(ctx context.Context) (*envoy_config_bootstrap_v3.LayeredRuntime, error) {
flushIntervalMs := 5000 flushIntervalMs := trace.BatchSpanProcessorScheduleDelay()
minFlushSpans := 3 minFlushSpans := trace.BatchSpanProcessorMaxExportBatchSize()
if trace.DebugFlagsFromContext(ctx).Check(trace.EnvoyFlushEverySpan) { if trace.DebugFlagsFromContext(ctx).Check(trace.EnvoyFlushEverySpan) {
minFlushSpans = 1 minFlushSpans = 1
flushIntervalMs = math.MaxInt32 flushIntervalMs = math.MaxInt32
@ -166,15 +166,12 @@ func (b *Builder) BuildBootstrapLayeredRuntime(ctx context.Context) (*envoy_conf
"tracing": map[string]any{ "tracing": map[string]any{
"opentelemetry": map[string]any{ "opentelemetry": map[string]any{
"flush_interval_ms": flushIntervalMs, "flush_interval_ms": flushIntervalMs,
// For most requests, envoy generates 3 spans: // Note: for most requests, envoy generates 3 spans:
// - ingress (downstream->envoy) // - ingress (downstream->envoy)
// - ext_authz check request (envoy->pomerium) // - ext_authz check request (envoy->pomerium)
// - egress (envoy->upstream) // - egress (envoy->upstream)
// The default value is 5, which usually leads to delayed exports. // Some requests only generate 2 spans, e.g. if there is no upstream
// This can be set lower, e.g. 1 to have envoy export every span // request made or auth fails.
// individually (useful for testing), but 3 is a reasonable default.
// If set to 1, also set flush_interval_ms to a very large number to
// effectively disable it.
"min_flush_spans": minFlushSpans, "min_flush_spans": minFlushSpans,
}, },
}, },

View file

@ -51,7 +51,7 @@ func TestBuilder_BuildBootstrapLayeredRuntime(t *testing.T) {
"tracing": { "tracing": {
"opentelemetry": { "opentelemetry": {
"flush_interval_ms": 5000, "flush_interval_ms": 5000,
"min_flush_spans": 3 "min_flush_spans": 512
} }
} }
} }

View file

@ -9,6 +9,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/testenv" "github.com/pomerium/pomerium/internal/testenv"
"github.com/pomerium/pomerium/internal/testenv/envutil" "github.com/pomerium/pomerium/internal/testenv/envutil"
"github.com/pomerium/pomerium/internal/testenv/scenarios" "github.com/pomerium/pomerium/internal/testenv/scenarios"
@ -20,16 +21,28 @@ import (
var ( var (
numRoutes int numRoutes int
dumpErrLogs bool dumpErrLogs bool
enableTracing bool
publicRoutes bool
) )
func init() { func init() {
flag.IntVar(&numRoutes, "routes", 100, "number of routes") flag.IntVar(&numRoutes, "routes", 100, "number of routes")
flag.BoolVar(&dumpErrLogs, "dump-err-logs", false, "if the test fails, write all captured logs to a file (testdata/<test-name>)") flag.BoolVar(&dumpErrLogs, "dump-err-logs", false, "if the test fails, write all captured logs to a file (testdata/<test-name>)")
flag.BoolVar(&enableTracing, "enable-tracing", false, "enable tracing")
flag.BoolVar(&publicRoutes, "public-routes", false, "use public unauthenticated routes")
} }
func TestRequestLatency(t *testing.T) { func TestRequestLatency(t *testing.T) {
resume := envutil.PauseProfiling(t) resume := envutil.PauseProfiling(t)
env := testenv.New(t, testenv.Silent()) var env testenv.Environment
if enableTracing {
receiver := scenarios.NewOTLPTraceReceiver()
env = testenv.New(t, testenv.Silent(), testenv.WithTraceClient(receiver.NewGRPCClient()))
env.Add(receiver)
} else {
env = testenv.New(t, testenv.Silent())
}
users := []*scenarios.User{} users := []*scenarios.User{}
for i := range numRoutes { for i := range numRoutes {
users = append(users, &scenarios.User{ users = append(users, &scenarios.User{
@ -47,9 +60,12 @@ func TestRequestLatency(t *testing.T) {
routes := make([]testenv.Route, numRoutes) routes := make([]testenv.Route, numRoutes)
for i := range numRoutes { for i := range numRoutes {
routes[i] = up.Route(). routes[i] = up.Route().
From(env.SubdomainURL(fmt.Sprintf("from-%d", i))). From(env.SubdomainURL(fmt.Sprintf("from-%d", i)))
// Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true }) if publicRoutes {
PPL(fmt.Sprintf(`{"allow":{"and":["email":{"is":"user%d@example.com"}]}}`, i)) routes[i] = routes[i].Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true })
} else {
routes[i] = routes[i].PPL(fmt.Sprintf(`{"allow":{"and":["email":{"is":"user%d@example.com"}]}}`, i))
}
} }
env.AddUpstream(up) env.AddUpstream(up)

View file

@ -2,9 +2,12 @@ package trace
import ( import (
"context" "context"
"os"
"strconv"
"go.opentelemetry.io/contrib/propagators/autoprop" "go.opentelemetry.io/contrib/propagators/autoprop"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/embedded" "go.opentelemetry.io/otel/trace/embedded"
) )
@ -44,3 +47,25 @@ var _ trace.Tracer = panicTracer{}
func (p panicTracer) Start(context.Context, string, ...trace.SpanStartOption) (context.Context, trace.Span) { func (p panicTracer) Start(context.Context, string, ...trace.SpanStartOption) (context.Context, trace.Span) {
panic("global tracer used") panic("global tracer used")
} }
// functions below mimic those with the same name in otel/sdk/internal/env/env.go
func BatchSpanProcessorScheduleDelay() int {
const defaultValue = sdktrace.DefaultScheduleDelay
if v, ok := os.LookupEnv("OTEL_BSP_SCHEDULE_DELAY"); ok {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return defaultValue
}
func BatchSpanProcessorMaxExportBatchSize() int {
const defaultValue = sdktrace.DefaultMaxExportBatchSize
if v, ok := os.LookupEnv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE"); ok {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return defaultValue
}

View file

@ -50,7 +50,7 @@ func NewServer(ctx context.Context) *ExporterServer {
} }
func (srv *ExporterServer) Start(ctx context.Context) { func (srv *ExporterServer) Start(ctx context.Context) {
lis := bufconn.Listen(4096) lis := bufconn.Listen(2 * 1024 * 1024)
go func() { go func() {
if err := srv.remoteClient.Start(ctx); err != nil { if err := srv.remoteClient.Start(ctx); err != nil {
panic(err) panic(err)
@ -95,5 +95,6 @@ func (srv *ExporterServer) Shutdown(ctx context.Context) error {
if err := srv.remoteClient.Stop(ctx); err != nil { if err := srv.remoteClient.Stop(ctx); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
srv.cc.Close()
return errors.Join(errs...) return errors.Join(errs...)
} }

View file

@ -75,7 +75,10 @@ func (rec *OTLPTraceReceiver) Attach(ctx context.Context) {
} }
// Modify implements testenv.Modifier. // Modify implements testenv.Modifier.
func (rec *OTLPTraceReceiver) Modify(*config.Config) {} func (rec *OTLPTraceReceiver) Modify(cfg *config.Config) {
cfg.Options.TracingProvider = "otlp"
cfg.Options.TracingOTLPEndpoint = rec.GRPCEndpointURL().Value()
}
func (rec *OTLPTraceReceiver) handleV1Traces(w http.ResponseWriter, r *http.Request) { func (rec *OTLPTraceReceiver) handleV1Traces(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Type") != "application/x-protobuf" { if r.Header.Get("Content-Type") != "application/x-protobuf" {