wip: new tracing system

This commit is contained in:
Joe Kralicky 2024-12-04 03:38:08 +00:00
parent eb57fa7a8b
commit e221c8af84
No known key found for this signature in database
GPG key ID: 75C4875F34A9FB79
83 changed files with 1414 additions and 1285 deletions

View file

@ -34,6 +34,7 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/config/envoyconfig/filemgr"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/testenv/envutil"
"github.com/pomerium/pomerium/internal/testenv/values"
"github.com/pomerium/pomerium/pkg/cmd/pomerium"
@ -45,6 +46,8 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/grpclog"
)
@ -56,6 +59,7 @@ type Environment interface {
// top-level logger scoped to this environment. It will be canceled when
// Stop() is called, or during test cleanup.
Context() context.Context
Tracer() oteltrace.Tracer
Assert() *assert.Assertions
Require() *require.Assertions
@ -133,9 +137,18 @@ type Environment interface {
// the Pomerium server and Envoy.
NewLogRecorder(opts ...LogRecorderOption) *LogRecorder
// GetState returns the current state of the test environment.
GetState() EnvironmentState
// OnStateChanged registers a callback to be invoked when the environment's
// state changes to the given state. The callback is invoked in a separate
// goroutine.
// state changes to the given state. Each callback is invoked in a separate
// goroutine, but the test environment will wait for all callbacks to return
// before continuing, after triggering the state change.
// State changes are triggered in the following places:
// - NotRunning->Starting: in Start(), as the first operation
// - Starting->Running: in Start(), just before returning
// - Running->Stopping: in Stop(), just before the env context is canceled
// - Stopping->Stopped: in Stop(), after all tasks have completed
OnStateChanged(state EnvironmentState, callback func())
}
@ -192,10 +205,12 @@ type environment struct {
workspaceFolder string
silent bool
ctx context.Context
cancel context.CancelCauseFunc
cleanupOnce sync.Once
logWriter *log.MultiWriter
ctx context.Context
cancel context.CancelCauseFunc
cleanupOnce sync.Once
logWriter *log.MultiWriter
tracerProvider oteltrace.TracerProvider
tracer oteltrace.Tracer
mods []WithCaller[Modifier]
tasks []WithCaller[Task]
@ -305,7 +320,14 @@ func New(t testing.TB, opts ...EnvironmentOption) Environment {
})
logger := zerolog.New(writer).With().Timestamp().Logger().Level(zerolog.DebugLevel)
ctx, cancel := context.WithCancelCause(logger.WithContext(context.Background()))
ctx, cancel := context.WithCancelCause(logger.WithContext(trace.NewContext(context.Background())))
t.Cleanup(func() {
trace.Shutdown(ctx)
})
tracerProvider := trace.NewTracerProvider(ctx, "Test Environment")
tracer := tracerProvider.Tracer(trace.PomeriumCoreTracer)
ctx, span := tracer.Start(ctx, t.Name())
require.NoError(t, err)
taskErrGroup, ctx := errgroup.WithContext(ctx)
e := &environment{
@ -325,13 +347,19 @@ func New(t testing.TB, opts ...EnvironmentOption) Environment {
Debug: values.Deferred[int](),
ALPN: values.Deferred[int](),
},
workspaceFolder: workspaceFolder,
silent: silent,
ctx: ctx,
cancel: cancel,
logWriter: writer,
taskErrGroup: taskErrGroup,
workspaceFolder: workspaceFolder,
silent: silent,
ctx: ctx,
cancel: cancel,
tracerProvider: tracerProvider,
tracer: tracerProvider.Tracer(trace.PomeriumCoreTracer),
logWriter: writer,
taskErrGroup: taskErrGroup,
stateChangeListeners: make(map[EnvironmentState][]func()),
}
e.OnStateChanged(Stopped, func() {
span.End()
})
_, err = rand.Read(e.sharedSecret[:])
require.NoError(t, err)
_, err = rand.Read(e.cookieSecret[:])
@ -394,6 +422,10 @@ func (e *environment) Context() context.Context {
return ContextWithEnv(e.ctx, e)
}
func (e *environment) Tracer() oteltrace.Tracer {
return e.tracer
}
func (e *environment) Assert() *assert.Assertions {
return e.assert
}
@ -455,6 +487,8 @@ var ErrCauseTestCleanup = errors.New("test cleanup")
var ErrCauseManualStop = errors.New("Stop() called")
func (e *environment) Start() {
_, span := e.tracer.Start(e.ctx, "Start")
defer span.End()
e.debugf("Start()")
e.advanceState(Starting)
e.t.Cleanup(e.cleanup)
@ -515,6 +549,7 @@ func (e *environment) Start() {
log.AccessLogFieldUserAgent,
log.AccessLogFieldClientCertificate,
}
cfg.Options.TracingSampleRate = 1.0
e.src = &configSource{cfg: cfg}
e.AddTask(TaskFunc(func(ctx context.Context) error {
@ -524,7 +559,7 @@ func (e *environment) Start() {
require.NoError(e.t, cfg.Options.Validate(), "invoking modifier resulted in an invalid configuration:\nadded by: "+mod.Caller)
}
opts := []pomerium.RunOption{
opts := []pomerium.Option{
pomerium.WithOverrideFileManager(fileMgr),
}
envoyBinaryPath := filepath.Join(e.workspaceFolder, fmt.Sprintf("pkg/envoy/files/envoy-%s-%s", runtime.GOOS, runtime.GOARCH))
@ -565,7 +600,12 @@ func (e *environment) Start() {
e.debugf("envoy profiling not available")
}
return pomerium.Run(ctx, e.src, opts...)
pom := pomerium.New(opts...)
e.OnStateChanged(Stopping, func() {
pom.Shutdown()
})
pom.Start(ctx, e.tracerProvider, e.src)
return pom.Wait()
}))
for i, task := range e.tasks {
@ -740,7 +780,7 @@ func (e *environment) Add(m Modifier) {
e.t.Helper()
caller := getCaller()
e.debugf("Add: %T from %s", m, caller)
switch e.getState() {
switch e.GetState() {
case NotRunning:
for _, mod := range e.mods {
if mod.Value == m {
@ -761,7 +801,7 @@ func (e *environment) Add(m Modifier) {
case Stopped, Stopping:
panic("test bug: cannot call Add() after Stop()")
default:
panic(fmt.Sprintf("unexpected environment state: %s", e.getState()))
panic(fmt.Sprintf("unexpected environment state: %s", e.GetState()))
}
}
@ -805,13 +845,25 @@ func (e *environment) advanceState(newState EnvironmentState) {
}
e.debugf("state %s -> %s", e.state.String(), newState.String())
e.state = newState
e.debugf("notifying %d listeners of state change", len(e.stateChangeListeners[newState]))
for _, listener := range e.stateChangeListeners[newState] {
go listener()
if len(e.stateChangeListeners[newState]) > 0 {
e.debugf("notifying %d listeners of state change", len(e.stateChangeListeners[newState]))
var wg sync.WaitGroup
for _, listener := range e.stateChangeListeners[newState] {
wg.Add(1)
go func() {
_, span := e.tracer.Start(e.ctx, "State Change Callback")
span.SetAttributes(attribute.String("state", newState.String()))
defer span.End()
defer wg.Done()
listener()
}()
}
wg.Wait()
e.debugf("done notifying state change listeners")
}
}
func (e *environment) getState() EnvironmentState {
func (e *environment) GetState() EnvironmentState {
e.stateMu.Lock()
defer e.stateMu.Unlock()
return e.state
@ -828,7 +880,7 @@ func (e *environment) OnStateChanged(state EnvironmentState, callback func()) {
// add change listeners for all states, if there are multiple bits set
for state > 0 {
stateBit := EnvironmentState(bits.TrailingZeros32(uint32(state)))
stateBit := EnvironmentState(1 << bits.TrailingZeros32(uint32(state)))
state &= (state - 1)
e.stateChangeListeners[stateBit] = append(e.stateChangeListeners[stateBit], callback)
}