Merge branch 'main' into kenjenkins/rework-groups-filtering

This commit is contained in:
Kenneth Jenkins 2025-01-24 14:13:39 -08:00
commit 6bd3c33a3e
9 changed files with 97 additions and 17 deletions

View file

@ -150,8 +150,8 @@ func (b *Builder) BuildBootstrapDynamicResources(
// BuildBootstrapLayeredRuntime builds the layered runtime for the envoy bootstrap.
func (b *Builder) BuildBootstrapLayeredRuntime(ctx context.Context) (*envoy_config_bootstrap_v3.LayeredRuntime, error) {
flushIntervalMs := 5000
minFlushSpans := 3
flushIntervalMs := trace.BatchSpanProcessorScheduleDelay()
minFlushSpans := trace.BatchSpanProcessorMaxExportBatchSize()
if trace.DebugFlagsFromContext(ctx).Check(trace.EnvoyFlushEverySpan) {
minFlushSpans = 1
flushIntervalMs = math.MaxInt32
@ -166,15 +166,12 @@ func (b *Builder) BuildBootstrapLayeredRuntime(ctx context.Context) (*envoy_conf
"tracing": map[string]any{
"opentelemetry": map[string]any{
"flush_interval_ms": flushIntervalMs,
// For most requests, envoy generates 3 spans:
// Note: for most requests, envoy generates 3 spans:
// - ingress (downstream->envoy)
// - ext_authz check request (envoy->pomerium)
// - egress (envoy->upstream)
// The default value is 5, which usually leads to delayed exports.
// This can be set lower, e.g. 1 to have envoy export every span
// individually (useful for testing), but 3 is a reasonable default.
// If set to 1, also set flush_interval_ms to a very large number to
// effectively disable it.
// Some requests only generate 2 spans, e.g. if there is no upstream
// request made or auth fails.
"min_flush_spans": minFlushSpans,
},
},

View file

@ -51,7 +51,7 @@ func TestBuilder_BuildBootstrapLayeredRuntime(t *testing.T) {
"tracing": {
"opentelemetry": {
"flush_interval_ms": 5000,
"min_flush_spans": 3
"min_flush_spans": 512
}
}
}

View file

@ -9,6 +9,7 @@ import (
"testing"
"time"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/testenv"
"github.com/pomerium/pomerium/internal/testenv/envutil"
"github.com/pomerium/pomerium/internal/testenv/scenarios"
@ -18,18 +19,30 @@ import (
)
var (
numRoutes int
dumpErrLogs bool
numRoutes int
dumpErrLogs bool
enableTracing bool
publicRoutes bool
)
func init() {
flag.IntVar(&numRoutes, "routes", 100, "number of routes")
flag.BoolVar(&dumpErrLogs, "dump-err-logs", false, "if the test fails, write all captured logs to a file (testdata/<test-name>)")
flag.BoolVar(&enableTracing, "enable-tracing", false, "enable tracing")
flag.BoolVar(&publicRoutes, "public-routes", false, "use public unauthenticated routes")
}
func TestRequestLatency(t *testing.T) {
resume := envutil.PauseProfiling(t)
env := testenv.New(t, testenv.Silent())
var env testenv.Environment
if enableTracing {
receiver := scenarios.NewOTLPTraceReceiver()
env = testenv.New(t, testenv.Silent(), testenv.WithTraceClient(receiver.NewGRPCClient()))
env.Add(receiver)
} else {
env = testenv.New(t, testenv.Silent())
}
users := []*scenarios.User{}
for i := range numRoutes {
users = append(users, &scenarios.User{
@ -47,9 +60,12 @@ func TestRequestLatency(t *testing.T) {
routes := make([]testenv.Route, numRoutes)
for i := range numRoutes {
routes[i] = up.Route().
From(env.SubdomainURL(fmt.Sprintf("from-%d", i))).
// Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true })
PPL(fmt.Sprintf(`{"allow":{"and":["email":{"is":"user%d@example.com"}]}}`, i))
From(env.SubdomainURL(fmt.Sprintf("from-%d", i)))
if publicRoutes {
routes[i] = routes[i].Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true })
} else {
routes[i] = routes[i].PPL(fmt.Sprintf(`{"allow":{"and":["email":{"is":"user%d@example.com"}]}}`, i))
}
}
env.AddUpstream(up)

View file

@ -13,6 +13,8 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
oteltrace "go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
)
@ -226,6 +228,36 @@ func (n NoopClient) UploadTraces(context.Context, []*v1.ResourceSpans) error {
return nil
}
// ValidNoopSpan is the same as noop.Span, except with a "valid" span context
// (has a non-zero trace and span ID).
//
// Adding this into a context as follows:
//
// ctx = oteltrace.ContextWithSpan(ctx, trace.ValidNoopSpan{})
//
// will prevent some usages of the global tracer provider by libraries such
// as otelhttp, which only uses the global provider if the context's span
// is "invalid".
type ValidNoopSpan struct {
noop.Span
}
var noopTraceID = oteltrace.TraceID{
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
}
var noopSpanID = oteltrace.SpanID{
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
}
// SpanContext implements trace.Span.
func (n ValidNoopSpan) SpanContext() oteltrace.SpanContext {
return n.Span.SpanContext().WithTraceID(noopTraceID).WithSpanID(noopSpanID)
}
var _ oteltrace.Span = ValidNoopSpan{}
func IsDisabledViaEnvironment() bool {
if os.Getenv("OTEL_SDK_DISABLED") == "true" {
return true

View file

@ -2,9 +2,12 @@ package trace
import (
"context"
"os"
"strconv"
"go.opentelemetry.io/contrib/propagators/autoprop"
"go.opentelemetry.io/otel"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/embedded"
)
@ -44,3 +47,25 @@ var _ trace.Tracer = panicTracer{}
func (p panicTracer) Start(context.Context, string, ...trace.SpanStartOption) (context.Context, trace.Span) {
panic("global tracer used")
}
// functions below mimic those with the same name in otel/sdk/internal/env/env.go
func BatchSpanProcessorScheduleDelay() int {
const defaultValue = sdktrace.DefaultScheduleDelay
if v, ok := os.LookupEnv("OTEL_BSP_SCHEDULE_DELAY"); ok {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return defaultValue
}
func BatchSpanProcessorMaxExportBatchSize() int {
const defaultValue = sdktrace.DefaultMaxExportBatchSize
if v, ok := os.LookupEnv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE"); ok {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return defaultValue
}

View file

@ -50,7 +50,7 @@ func NewServer(ctx context.Context) *ExporterServer {
}
func (srv *ExporterServer) Start(ctx context.Context) {
lis := bufconn.Listen(4096)
lis := bufconn.Listen(2 * 1024 * 1024)
go func() {
if err := srv.remoteClient.Start(ctx); err != nil {
panic(err)
@ -95,5 +95,6 @@ func (srv *ExporterServer) Shutdown(ctx context.Context) error {
if err := srv.remoteClient.Stop(ctx); err != nil {
errs = append(errs, err)
}
srv.cc.Close()
return errors.Join(errs...)
}

View file

@ -75,7 +75,10 @@ func (rec *OTLPTraceReceiver) Attach(ctx context.Context) {
}
// Modify implements testenv.Modifier.
func (rec *OTLPTraceReceiver) Modify(*config.Config) {}
func (rec *OTLPTraceReceiver) Modify(cfg *config.Config) {
cfg.Options.TracingProvider = "otlp"
cfg.Options.TracingOTLPEndpoint = rec.GRPCEndpointURL().Value()
}
func (rec *OTLPTraceReceiver) handleV1Traces(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Type") != "application/x-protobuf" {

View file

@ -6,8 +6,10 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
oteltrace "go.opentelemetry.io/otel/trace"
)
// WithTestMinIO starts a test MinIO server
@ -15,6 +17,7 @@ func WithTestMinIO(t *testing.T, bucket string, handler func(endpoint string)) {
t.Helper()
ctx := GetContext(t, maxWait)
ctx = oteltrace.ContextWithSpan(ctx, trace.ValidNoopSpan{})
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{

View file

@ -8,8 +8,10 @@ import (
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
oteltrace "go.opentelemetry.io/otel/trace"
)
// WithTestPostgres starts a postgres database.
@ -17,6 +19,7 @@ func WithTestPostgres(t *testing.T, handler func(dsn string)) {
t.Helper()
ctx := GetContext(t, maxWait)
ctx = oteltrace.ContextWithSpan(ctx, trace.ValidNoopSpan{})
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{