From 99a5dbd65b469d4a9015f6450134210e5cf4931c Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Fri, 26 Apr 2024 15:05:22 -0600 Subject: [PATCH] core/identity: add enabler (#5084) * core/identity: add disabler * enable by default * add name * rename to enabler, use mutex instead of goroutine * rename method, add comments --- internal/enabler/enabler.go | 108 +++++++++++++++++++++++++++ internal/enabler/enabler_test.go | 61 +++++++++++++++ internal/identity/manager/config.go | 13 +++- internal/identity/manager/manager.go | 12 ++- 4 files changed, 190 insertions(+), 4 deletions(-) create mode 100644 internal/enabler/enabler.go create mode 100644 internal/enabler/enabler_test.go diff --git a/internal/enabler/enabler.go b/internal/enabler/enabler.go new file mode 100644 index 000000000..32e5aeec0 --- /dev/null +++ b/internal/enabler/enabler.go @@ -0,0 +1,108 @@ +// package enabler contains a component that can be enabled and disabled dynamically +package enabler + +import ( + "context" + "errors" + "sync" + + "github.com/pomerium/pomerium/internal/log" +) + +var errCauseEnabler = errors.New("enabler") + +// A Handler is a component with a RunEnabled function. +type Handler interface { + RunEnabled(ctx context.Context) error +} + +// HandlerFunc is a function run by the enabler. +type HandlerFunc func(ctx context.Context) error + +func (f HandlerFunc) RunEnabled(ctx context.Context) error { + return f(ctx) +} + +// An Enabler enables or disables a component dynamically. +// When the Enabler is enabled, the Handler's RunEnabled will be called. +// If the Enabler is subsequently disabled the context passed to RunEnabled will be canceled. +// If the Enabler is subseqently enabled again, RunEnabled will be called again. +// Handlers should obey the context lifetime and be tolerant of RunEnabled +// being called multiple times. (not concurrently) +type Enabler interface { + Run(ctx context.Context) error + Enable() + Disable() +} + +type enabler struct { + name string + handler Handler + + mu sync.Mutex + cancel context.CancelCauseFunc + enabled bool +} + +// New creates a new Enabler. +func New(name string, handler Handler, enabled bool) Enabler { + d := &enabler{ + name: name, + handler: handler, + enabled: enabled, + cancel: func(_ error) {}, + } + return d +} + +// Run calls RunEnabled if enabled, otherwise it waits until enabled. +func (d *enabler) Run(ctx context.Context) error { + for { + err := d.runOrWaitForEnabled(ctx) + // if we received any error but our own, exit with that error + if !errors.Is(err, errCauseEnabler) { + return err + } + } +} + +func (d *enabler) runOrWaitForEnabled(ctx context.Context) error { + d.mu.Lock() + enabled := d.enabled + ctx, d.cancel = context.WithCancelCause(ctx) + d.mu.Unlock() + + // we're enabled so call RunEnabled. If Disabled is called it will cancel ctx. + if enabled { + log.Ctx(ctx).Info().Msgf("enabled %s", d.name) + err := d.handler.RunEnabled(ctx) + // if RunEnabled stopped because we canceled the context + if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), errCauseEnabler) { + log.Ctx(ctx).Info().Msgf("disabled %s", d.name) + return errCauseEnabler + } + return err + } + + // wait until Enabled is called + <-ctx.Done() + return context.Cause(ctx) +} + +func (d *enabler) Enable() { + d.mu.Lock() + if !d.enabled { + d.enabled = true + d.cancel(errCauseEnabler) + } + d.mu.Unlock() +} + +func (d *enabler) Disable() { + d.mu.Lock() + if d.enabled { + d.enabled = false + d.cancel(errCauseEnabler) + } + d.mu.Unlock() +} diff --git a/internal/enabler/enabler_test.go b/internal/enabler/enabler_test.go new file mode 100644 index 000000000..34c74dc8c --- /dev/null +++ b/internal/enabler/enabler_test.go @@ -0,0 +1,61 @@ +package enabler_test + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/pomerium/pomerium/internal/enabler" +) + +func TestEnabler(t *testing.T) { + t.Parallel() + + t.Run("enabled immediately", func(t *testing.T) { + t.Parallel() + + e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error { + return errors.New("ERROR") + }), true) + err := e.Run(context.Background()) + assert.Error(t, err) + }) + t.Run("enabled delayed", func(t *testing.T) { + t.Parallel() + + e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error { + return errors.New("ERROR") + }), false) + time.AfterFunc(time.Millisecond*10, e.Enable) + err := e.Run(context.Background()) + assert.Error(t, err) + }) + t.Run("disabled", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + var started, stopped atomic.Int64 + e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error { + started.Add(1) + <-ctx.Done() + stopped.Add(1) + return ctx.Err() + }), true) + time.AfterFunc(time.Millisecond*10, e.Disable) + go e.Run(ctx) + + assert.Eventually(t, func() bool { return stopped.Load() == 1 }, time.Second, time.Millisecond*100, + "should stop RunEnabled") + + e.Enable() + + assert.Eventually(t, func() bool { return started.Load() == 2 }, time.Second, time.Millisecond*100, + "should re-start RunEnabled") + }) +} diff --git a/internal/identity/manager/config.go b/internal/identity/manager/config.go index fbc8f811b..d4609c41e 100644 --- a/internal/identity/manager/config.go +++ b/internal/identity/manager/config.go @@ -19,6 +19,7 @@ type config struct { sessionRefreshCoolOffDuration time.Duration now func() time.Time eventMgr *events.Manager + enabled bool } func newConfig(options ...Option) *config { @@ -26,6 +27,7 @@ func newConfig(options ...Option) *config { WithSessionRefreshGracePeriod(defaultSessionRefreshGracePeriod)(cfg) WithSessionRefreshCoolOffDuration(defaultSessionRefreshCoolOffDuration)(cfg) WithNow(time.Now)(cfg) + WithEnabled(true)(cfg) for _, option := range options { option(cfg) } @@ -72,7 +74,14 @@ func WithNow(now func() time.Time) Option { // WithEventManager passes an event manager to record events func WithEventManager(mgr *events.Manager) Option { - return func(c *config) { - c.eventMgr = mgr + return func(cfg *config) { + cfg.eventMgr = mgr + } +} + +// WithEnabled sets the enabled option in the config. +func WithEnabled(enabled bool) Option { + return func(cfg *config) { + cfg.enabled = enabled } } diff --git a/internal/identity/manager/manager.go b/internal/identity/manager/manager.go index 1ae18063f..c5831dfbe 100644 --- a/internal/identity/manager/manager.go +++ b/internal/identity/manager/manager.go @@ -16,6 +16,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/atomicutil" + "github.com/pomerium/pomerium/internal/enabler" "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/identity/identity" "github.com/pomerium/pomerium/internal/log" @@ -43,6 +44,7 @@ type ( // A Manager refreshes identity information using session and user data. type Manager struct { + enabler.Enabler cfg *atomicutil.Value[*config] sessionScheduler *scheduler.Scheduler @@ -62,6 +64,7 @@ func New( sessionScheduler: scheduler.New(), userScheduler: scheduler.New(), } + mgr.Enabler = enabler.New("identity_manager", mgr, true) mgr.reset() mgr.UpdateConfig(options...) return mgr @@ -76,10 +79,15 @@ func withLog(ctx context.Context) context.Context { // UpdateConfig updates the manager with the new options. func (mgr *Manager) UpdateConfig(options ...Option) { mgr.cfg.Store(newConfig(options...)) + if mgr.cfg.Load().enabled { + mgr.Enable() + } else { + mgr.Disable() + } } -// Run runs the manager. This method blocks until an error occurs or the given context is canceled. -func (mgr *Manager) Run(ctx context.Context) error { +// RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled. +func (mgr *Manager) RunEnabled(ctx context.Context) error { leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr) return leaser.Run(ctx) }