Fix trace client update (#5480)

This commit is contained in:
Joe Kralicky 2025-02-12 19:47:17 -05:00 committed by GitHub
parent 229ef72e58
commit 3043e98fab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 19 additions and 6 deletions

View file

@ -30,6 +30,10 @@ var (
type SyncClient interface { type SyncClient interface {
otlptrace.Client otlptrace.Client
// Update safely replaces the current trace client with the one provided.
// The new client must be unstarted. The old client (if any) will be stopped.
//
// This function is NOT reentrant; callers must use appropriate locking.
Update(ctx context.Context, newClient otlptrace.Client) error Update(ctx context.Context, newClient otlptrace.Client) error
} }

View file

@ -84,6 +84,7 @@ type Pomerium struct {
startMu sync.Mutex startMu sync.Mutex
cancel context.CancelCauseFunc cancel context.CancelCauseFunc
envoyServer *envoy.Server envoyServer *envoy.Server
traceClientMu sync.Mutex
} }
func New(opts ...Option) *Pomerium { func New(opts ...Option) *Pomerium {
@ -98,7 +99,7 @@ func New(opts ...Option) *Pomerium {
func (p *Pomerium) Start(ctx context.Context, tracerProvider oteltrace.TracerProvider, src config.Source) error { func (p *Pomerium) Start(ctx context.Context, tracerProvider oteltrace.TracerProvider, src config.Source) error {
p.startMu.Lock() p.startMu.Lock()
defer p.startMu.Unlock() defer p.startMu.Unlock()
updateTraceClient(ctx, src.GetConfig()) p.updateTraceClient(ctx, src.GetConfig())
ctx, p.cancel = context.WithCancelCause(ctx) ctx, p.cancel = context.WithCancelCause(ctx)
_, _ = maxprocs.Set(maxprocs.Logger(func(s string, i ...any) { log.Ctx(ctx).Debug().Msgf(s, i...) })) _, _ = maxprocs.Set(maxprocs.Logger(func(s string, i ...any) { log.Ctx(ctx).Debug().Msgf(s, i...) }))
@ -138,7 +139,7 @@ func (p *Pomerium) Start(ctx context.Context, tracerProvider oteltrace.TracerPro
} }
cfg := src.GetConfig() cfg := src.GetConfig()
src.OnConfigChange(ctx, updateTraceClient) src.OnConfigChange(ctx, p.updateTraceClient)
// setup the control plane // setup the control plane
controlPlane, err := controlplane.NewServer(ctx, cfg, metricsMgr, eventsMgr, fileMgr) controlPlane, err := controlplane.NewServer(ctx, cfg, metricsMgr, eventsMgr, fileMgr)
@ -319,7 +320,7 @@ func setupProxy(ctx context.Context, src config.Source, controlPlane *controlpla
return nil return nil
} }
func updateTraceClient(ctx context.Context, cfg *config.Config) { func (p *Pomerium) updateTraceClient(ctx context.Context, cfg *config.Config) {
sc, ok := trace.RemoteClientFromContext(ctx).(trace.SyncClient) sc, ok := trace.RemoteClientFromContext(ctx).(trace.SyncClient)
if !ok { if !ok {
return return
@ -329,6 +330,14 @@ func updateTraceClient(ctx context.Context, cfg *config.Config) {
log.Ctx(ctx).Warn().Err(err).Msg("error configuring trace client") log.Ctx(ctx).Warn().Err(err).Msg("error configuring trace client")
} else { } else {
go func() { go func() {
if !p.traceClientMu.TryLock() {
log.Ctx(ctx).
Debug().
Msg("waiting for a previous trace client update to complete")
p.traceClientMu.Lock()
}
defer p.traceClientMu.Unlock()
if err := sc.Update(ctx, newClient); err != nil { if err := sc.Update(ctx, newClient); err != nil {
log.Ctx(ctx). log.Ctx(ctx).
Warn(). Warn().