mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-06 10:21:05 +02:00
trace config support, test updates (wip), sampling fixes
This commit is contained in:
parent
b9f783f5d7
commit
1138563b58
19 changed files with 1050 additions and 523 deletions
|
@ -2,9 +2,11 @@ package selftests_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
|
@ -14,66 +16,41 @@ import (
|
|||
"github.com/pomerium/pomerium/internal/testenv/scenarios"
|
||||
"github.com/pomerium/pomerium/internal/testenv/snippets"
|
||||
"github.com/pomerium/pomerium/internal/testenv/upstreams"
|
||||
"github.com/pomerium/pomerium/internal/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1"
|
||||
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
|
||||
)
|
||||
|
||||
func otlpTraceReceiverOrFromEnv(t *testing.T) (modifier testenv.Modifier, newRemoteClient func() otlptrace.Client, getResults func() []*tracev1.ResourceSpans) {
|
||||
func otlpTraceReceiverOrFromEnv(t *testing.T) (modifier testenv.Modifier, newRemoteClient func() otlptrace.Client, getResults func() *testutil.TraceResults) {
|
||||
t.Setenv("OTEL_TRACES_EXPORTER", "otlp")
|
||||
tracesEndpoint := os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT")
|
||||
if tracesEndpoint == "" {
|
||||
tracesEndpoint = os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT")
|
||||
if tracesEndpoint == "" {
|
||||
srv := scenarios.NewOTLPTraceReceiver()
|
||||
return srv, srv.NewClient, srv.ResourceSpans
|
||||
return srv, srv.NewClient, func() *testutil.TraceResults {
|
||||
return testutil.NewTraceResults(srv.FlushResourceSpans())
|
||||
}
|
||||
}
|
||||
}
|
||||
return testenv.NoopModifier(), trace.NewRemoteClientFromEnv, nil
|
||||
}
|
||||
|
||||
var commonResourceNames = []string{
|
||||
var allServices = []string{
|
||||
"Test Environment",
|
||||
"Authorize",
|
||||
"Authenticate",
|
||||
"Control Plane",
|
||||
"Data Broker",
|
||||
}
|
||||
|
||||
func assertResourceNamesPresent(t *testing.T, resources []*resourcev1.Resource, names []string) {
|
||||
assert.NotEmpty(t, resources)
|
||||
for _, service := range names {
|
||||
assertResourceExists(t, resources, attribute.NewSet(
|
||||
attribute.String("service.name", service),
|
||||
attribute.String("telemetry.sdk.language", "go"),
|
||||
attribute.String("telemetry.sdk.name", "opentelemetry"),
|
||||
))
|
||||
}
|
||||
assertResourceExists(t, resources, attribute.NewSet(
|
||||
attribute.String("service.name", "Envoy"),
|
||||
attribute.String("pomerium.envoy", "true"),
|
||||
))
|
||||
}
|
||||
|
||||
func assertResourceExists(t *testing.T, resources []*resourcev1.Resource, attrs attribute.Set) {
|
||||
for _, res := range resources {
|
||||
set := trace.NewAttributeSet(res.Attributes...)
|
||||
set, _ = set.Filter(func(kv attribute.KeyValue) bool {
|
||||
return attrs.HasValue(kv.Key)
|
||||
})
|
||||
if set.Equals(&attrs) {
|
||||
return
|
||||
}
|
||||
}
|
||||
attrsData, _ := attrs.MarshalJSON()
|
||||
t.Errorf("resource not found: %s", string(attrsData))
|
||||
"Upstream",
|
||||
"IDP",
|
||||
"HTTP Client",
|
||||
"Envoy",
|
||||
}
|
||||
|
||||
func TestOTLPTracing(t *testing.T) {
|
||||
|
@ -115,22 +92,41 @@ func TestOTLPTracing(t *testing.T) {
|
|||
|
||||
if getResults != nil {
|
||||
results := getResults()
|
||||
resources := []*resourcev1.Resource{}
|
||||
for _, res := range results {
|
||||
resources = append(resources, res.Resource)
|
||||
}
|
||||
assertResourceNamesPresent(t, resources, append(commonResourceNames, "Upstream", "IDP", "HTTP Client"))
|
||||
var (
|
||||
testEnvironmentLocalTest = fmt.Sprintf("Test Environment: %s", t.Name())
|
||||
testEnvironmentAuthenticate = "Test Environment: Authenticate"
|
||||
authenticateOAuth2Client = "Authenticate: OAuth2 Client: GET /.well-known/jwks.json"
|
||||
idpServerGetUserinfo = "IDP: Server: GET /oidc/userinfo"
|
||||
idpServerPostToken = "IDP: Server: POST /oidc/token"
|
||||
controlPlaneEnvoyAccessLogs = "Control Plane: envoy.service.accesslog.v3.AccessLogService/StreamAccessLogs"
|
||||
controlPlaneEnvoyDiscovery = "Control Plane: envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources"
|
||||
controlPlaneExport = "Control Plane: opentelemetry.proto.collector.trace.v1.TraceService/Export"
|
||||
)
|
||||
|
||||
results.MatchTraces(t,
|
||||
testutil.MatchOptions{
|
||||
Exact: true,
|
||||
CheckDetachedSpans: true,
|
||||
},
|
||||
testutil.Match{Name: testEnvironmentLocalTest, TraceCount: 1, Services: []string{"Test Environment", "Control Plane", "Data Broker"}},
|
||||
testutil.Match{Name: testEnvironmentAuthenticate, TraceCount: 1, Services: allServices},
|
||||
testutil.Match{Name: authenticateOAuth2Client, TraceCount: testutil.Greater(0)},
|
||||
testutil.Match{Name: idpServerGetUserinfo, TraceCount: testutil.SameAs(authenticateOAuth2Client)},
|
||||
testutil.Match{Name: idpServerPostToken, TraceCount: testutil.SameAs(authenticateOAuth2Client)},
|
||||
testutil.Match{Name: controlPlaneEnvoyDiscovery, TraceCount: 1},
|
||||
testutil.Match{Name: controlPlaneExport, TraceCount: testutil.Greater(0)},
|
||||
testutil.Match{Name: controlPlaneEnvoyAccessLogs, TraceCount: testutil.Any{}},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSampling(t *testing.T) {
|
||||
// t.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
|
||||
modifier, newRemoteClient, getResults := otlpTraceReceiverOrFromEnv(t)
|
||||
env := testenv.New(t, testenv.WithTraceDebugFlags(testenv.StandardTraceDebugFlags), testenv.WithTraceClient(newRemoteClient()))
|
||||
env.Add(modifier)
|
||||
|
||||
env.Add(testenv.ModifierFunc(func(_ context.Context, cfg *config.Config) {
|
||||
cfg.Options.TracingSampleRate = 1.0
|
||||
cfg.Options.TracingSampleRate = 0.5
|
||||
}))
|
||||
env.Add(scenarios.NewIDP([]*scenarios.User{
|
||||
{
|
||||
|
@ -141,40 +137,48 @@ func TestSampling(t *testing.T) {
|
|||
}))
|
||||
|
||||
upstreamNoClientTracing := upstreams.HTTP(nil, upstreams.WithNoClientTracing(), upstreams.WithDisplayName("Upstream"))
|
||||
// up2 := upstreams.HTTP(nil, upstreams.WithDisplayName("Upstream 2"))
|
||||
sampled := atomic.Int32{}
|
||||
notSampled := atomic.Int32{}
|
||||
sampled := map[string]*atomic.Int32{}
|
||||
notSampled := map[string]*atomic.Int32{}
|
||||
readSampled := func(t testing.TB) int32 {
|
||||
return sampled["/"+t.Name()].Load()
|
||||
}
|
||||
readNotSampled := func(t testing.TB) int32 {
|
||||
return notSampled["/"+t.Name()].Load()
|
||||
}
|
||||
var mu sync.Mutex
|
||||
setupCounters := func(t testing.TB) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
sampled["/"+t.Name()] = &atomic.Int32{}
|
||||
notSampled["/"+t.Name()] = &atomic.Int32{}
|
||||
}
|
||||
|
||||
handler := func(w http.ResponseWriter, req *http.Request) {
|
||||
span := oteltrace.SpanFromContext(req.Context())
|
||||
spanId := span.SpanContext().SpanID().String()
|
||||
traceId := span.SpanContext().TraceID().String()
|
||||
_, _ = spanId, traceId
|
||||
flags := span.SpanContext().TraceFlags()
|
||||
path := req.URL.Path
|
||||
if flags.IsSampled() {
|
||||
sampled.Add(1)
|
||||
sampled[path].Add(1)
|
||||
} else {
|
||||
notSampled.Add(1)
|
||||
notSampled[path].Add(1)
|
||||
}
|
||||
w.Write([]byte("OK"))
|
||||
}
|
||||
upstreamNoClientTracing.Handle("/", handler)
|
||||
// up2.Handle("/", handler)
|
||||
upstreamNoClientTracing.Handle(fmt.Sprintf("/%s/{name}", t.Name()), handler)
|
||||
|
||||
route1 := upstreamNoClientTracing.Route().
|
||||
From(env.SubdomainURL("sampling-50pct")).
|
||||
PPL(`{"allow":{"and":["email":{"is":"foo@example.com"}]}}`)
|
||||
|
||||
// route2 := up2.Route().
|
||||
// From(env.SubdomainURL("default")).
|
||||
// PPL(`{"allow":{"and":["email":{"is":"foo@example.com"}]}}`)
|
||||
// _ = route2
|
||||
env.AddUpstream(upstreamNoClientTracing)
|
||||
// env.AddUpstream(up2)
|
||||
env.Start()
|
||||
snippets.WaitStartupComplete(env)
|
||||
|
||||
doRequest := func(ctx context.Context, up upstreams.HTTPUpstream, route testenv.Route) {
|
||||
resp, err := up.Get(route, upstreams.AuthenticateAs("foo@example.com"), upstreams.Path("/"), upstreams.Context(ctx))
|
||||
doRequest := func(t testing.TB, ctx context.Context, up upstreams.HTTPUpstream, route testenv.Route) {
|
||||
resp, err := up.Get(route, upstreams.AuthenticateAs("foo@example.com"), upstreams.Path(t.Name()), upstreams.Context(ctx))
|
||||
require.NoError(t, err)
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
assert.NoError(t, err)
|
||||
|
@ -183,51 +187,54 @@ func TestSampling(t *testing.T) {
|
|||
assert.Equal(t, "OK", string(body))
|
||||
}
|
||||
|
||||
for {
|
||||
doRequest(context.Background(), upstreamNoClientTracing, route1)
|
||||
if sampled.Load() == 1 {
|
||||
break
|
||||
t.Run("no-external-traceparent", func(t *testing.T) {
|
||||
setupCounters(t)
|
||||
for {
|
||||
doRequest(t, context.Background(), upstreamNoClientTracing, route1)
|
||||
if readSampled(t) == 10 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// assert.InDelta(t, int32(5), sampled.Load(), 2)
|
||||
// assert.InDelta(t, int32(5), notSampled.Load(), 2)
|
||||
assert.Equal(t, int32(10), readSampled(t)) // 10 sampled
|
||||
assert.InDelta(t, int32(10), readNotSampled(t), 5) // between 5-15 unsampled
|
||||
|
||||
sampled.Store(0)
|
||||
notSampled.Store(0)
|
||||
if getResults != nil {
|
||||
results := getResults()
|
||||
results.MatchTraces(t, testutil.MatchOptions{Exact: false, CheckDetachedSpans: true}, testutil.Match{}) // testutil.Match{Name: }
|
||||
|
||||
// for range 100 {
|
||||
// doRequest(context.Background(), up2, route2)
|
||||
// }
|
||||
}
|
||||
})
|
||||
|
||||
// // if the request already has a traceparent header, they will always be sampled
|
||||
// // regardless of the random sample rate we configured
|
||||
// assert.Equal(t, int32(100), sampled.Load())
|
||||
// assert.Equal(t, int32(0), notSampled.Load())
|
||||
t.Run("external-traceparent-always-sample", func(t *testing.T) {
|
||||
setupCounters(t)
|
||||
tracer := trace.NewTracerProvider(env.Context(), "Always Sample", sdktrace.WithSampler(sdktrace.AlwaysSample())).Tracer(trace.PomeriumCoreTracer)
|
||||
for range 100 {
|
||||
ctx, span := tracer.Start(context.Background(), "should sample")
|
||||
doRequest(t, ctx, upstreamNoClientTracing, route1)
|
||||
span.End()
|
||||
}
|
||||
|
||||
// sampled.Store(0)
|
||||
// notSampled.Store(0)
|
||||
// if the request already has a traceparent header, they will always be sampled
|
||||
// regardless of the random sample rate we configured
|
||||
assert.Equal(t, int32(100), readSampled(t))
|
||||
assert.Equal(t, int32(0), readNotSampled(t))
|
||||
})
|
||||
|
||||
// tracer := trace.NewTracerProvider(env.Context(), "Never Sample", sdktrace.WithSampler(sdktrace.NeverSample())).Tracer(trace.PomeriumCoreTracer)
|
||||
// for range 100 {
|
||||
// ctx, span := tracer.Start(context.Background(), "should not sample")
|
||||
// doRequest(ctx, up2, route2)
|
||||
// span.End()
|
||||
// }
|
||||
t.Run("external-traceparent-never-sample", func(t *testing.T) {
|
||||
setupCounters(t)
|
||||
tracer := trace.NewTracerProvider(env.Context(), "Never Sample", sdktrace.WithSampler(sdktrace.NeverSample())).Tracer(trace.PomeriumCoreTracer)
|
||||
for range 100 {
|
||||
ctx, span := tracer.Start(context.Background(), "should not sample")
|
||||
doRequest(t, ctx, upstreamNoClientTracing, route1)
|
||||
span.End()
|
||||
}
|
||||
|
||||
// sampled.Store(0)
|
||||
// notSampled.Store(100)
|
||||
assert.Equal(t, int32(0), readSampled(t))
|
||||
assert.Equal(t, int32(100), readNotSampled(t))
|
||||
})
|
||||
|
||||
env.Stop()
|
||||
|
||||
if getResults != nil {
|
||||
results := getResults()
|
||||
resources := []*resourcev1.Resource{}
|
||||
for _, res := range results {
|
||||
resources = append(resources, res.Resource)
|
||||
}
|
||||
assertResourceNamesPresent(t, resources, append(commonResourceNames, "Upstream", "IDP"))
|
||||
}
|
||||
}
|
||||
|
||||
func TestExternalSpans(t *testing.T) {
|
||||
|
@ -287,11 +294,15 @@ func TestExternalSpans(t *testing.T) {
|
|||
env.Stop()
|
||||
|
||||
if getResults != nil {
|
||||
results := getResults()
|
||||
resources := []*resourcev1.Resource{}
|
||||
for _, res := range results {
|
||||
resources = append(resources, res.Resource)
|
||||
}
|
||||
assertResourceNamesPresent(t, resources, commonResourceNames)
|
||||
// results := getResults()
|
||||
// resources := []*resourcev1.Resource{}
|
||||
// for _, res := range results {
|
||||
// resources = append(resources, res.Resource)
|
||||
// }
|
||||
// assertResourceNamesPresent(t, resources, commonResourceNames)
|
||||
|
||||
// results := getResults()
|
||||
// traces := results.GetTraces()
|
||||
// assert.Len(t, traces, 1)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue