mirror of
https://github.com/pomerium/pomerium.git
synced 2025-07-25 20:49:30 +02:00
core/identity: add enabler (#5084)
* core/identity: add disabler * enable by default * add name * rename to enabler, use mutex instead of goroutine * rename method, add comments
This commit is contained in:
parent
a518435c17
commit
99a5dbd65b
4 changed files with 190 additions and 4 deletions
108
internal/enabler/enabler.go
Normal file
108
internal/enabler/enabler.go
Normal file
|
@ -0,0 +1,108 @@
|
|||
// package enabler contains a component that can be enabled and disabled dynamically
|
||||
package enabler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
)
|
||||
|
||||
var errCauseEnabler = errors.New("enabler")
|
||||
|
||||
// A Handler is a component with a RunEnabled function.
|
||||
type Handler interface {
|
||||
RunEnabled(ctx context.Context) error
|
||||
}
|
||||
|
||||
// HandlerFunc is a function run by the enabler.
|
||||
type HandlerFunc func(ctx context.Context) error
|
||||
|
||||
func (f HandlerFunc) RunEnabled(ctx context.Context) error {
|
||||
return f(ctx)
|
||||
}
|
||||
|
||||
// An Enabler enables or disables a component dynamically.
|
||||
// When the Enabler is enabled, the Handler's RunEnabled will be called.
|
||||
// If the Enabler is subsequently disabled the context passed to RunEnabled will be canceled.
|
||||
// If the Enabler is subseqently enabled again, RunEnabled will be called again.
|
||||
// Handlers should obey the context lifetime and be tolerant of RunEnabled
|
||||
// being called multiple times. (not concurrently)
|
||||
type Enabler interface {
|
||||
Run(ctx context.Context) error
|
||||
Enable()
|
||||
Disable()
|
||||
}
|
||||
|
||||
type enabler struct {
|
||||
name string
|
||||
handler Handler
|
||||
|
||||
mu sync.Mutex
|
||||
cancel context.CancelCauseFunc
|
||||
enabled bool
|
||||
}
|
||||
|
||||
// New creates a new Enabler.
|
||||
func New(name string, handler Handler, enabled bool) Enabler {
|
||||
d := &enabler{
|
||||
name: name,
|
||||
handler: handler,
|
||||
enabled: enabled,
|
||||
cancel: func(_ error) {},
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// Run calls RunEnabled if enabled, otherwise it waits until enabled.
|
||||
func (d *enabler) Run(ctx context.Context) error {
|
||||
for {
|
||||
err := d.runOrWaitForEnabled(ctx)
|
||||
// if we received any error but our own, exit with that error
|
||||
if !errors.Is(err, errCauseEnabler) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *enabler) runOrWaitForEnabled(ctx context.Context) error {
|
||||
d.mu.Lock()
|
||||
enabled := d.enabled
|
||||
ctx, d.cancel = context.WithCancelCause(ctx)
|
||||
d.mu.Unlock()
|
||||
|
||||
// we're enabled so call RunEnabled. If Disabled is called it will cancel ctx.
|
||||
if enabled {
|
||||
log.Ctx(ctx).Info().Msgf("enabled %s", d.name)
|
||||
err := d.handler.RunEnabled(ctx)
|
||||
// if RunEnabled stopped because we canceled the context
|
||||
if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ctx), errCauseEnabler) {
|
||||
log.Ctx(ctx).Info().Msgf("disabled %s", d.name)
|
||||
return errCauseEnabler
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// wait until Enabled is called
|
||||
<-ctx.Done()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
|
||||
func (d *enabler) Enable() {
|
||||
d.mu.Lock()
|
||||
if !d.enabled {
|
||||
d.enabled = true
|
||||
d.cancel(errCauseEnabler)
|
||||
}
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
func (d *enabler) Disable() {
|
||||
d.mu.Lock()
|
||||
if d.enabled {
|
||||
d.enabled = false
|
||||
d.cancel(errCauseEnabler)
|
||||
}
|
||||
d.mu.Unlock()
|
||||
}
|
61
internal/enabler/enabler_test.go
Normal file
61
internal/enabler/enabler_test.go
Normal file
|
@ -0,0 +1,61 @@
|
|||
package enabler_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/enabler"
|
||||
)
|
||||
|
||||
func TestEnabler(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("enabled immediately", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error {
|
||||
return errors.New("ERROR")
|
||||
}), true)
|
||||
err := e.Run(context.Background())
|
||||
assert.Error(t, err)
|
||||
})
|
||||
t.Run("enabled delayed", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error {
|
||||
return errors.New("ERROR")
|
||||
}), false)
|
||||
time.AfterFunc(time.Millisecond*10, e.Enable)
|
||||
err := e.Run(context.Background())
|
||||
assert.Error(t, err)
|
||||
})
|
||||
t.Run("disabled", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
var started, stopped atomic.Int64
|
||||
e := enabler.New("test", enabler.HandlerFunc(func(ctx context.Context) error {
|
||||
started.Add(1)
|
||||
<-ctx.Done()
|
||||
stopped.Add(1)
|
||||
return ctx.Err()
|
||||
}), true)
|
||||
time.AfterFunc(time.Millisecond*10, e.Disable)
|
||||
go e.Run(ctx)
|
||||
|
||||
assert.Eventually(t, func() bool { return stopped.Load() == 1 }, time.Second, time.Millisecond*100,
|
||||
"should stop RunEnabled")
|
||||
|
||||
e.Enable()
|
||||
|
||||
assert.Eventually(t, func() bool { return started.Load() == 2 }, time.Second, time.Millisecond*100,
|
||||
"should re-start RunEnabled")
|
||||
})
|
||||
}
|
|
@ -19,6 +19,7 @@ type config struct {
|
|||
sessionRefreshCoolOffDuration time.Duration
|
||||
now func() time.Time
|
||||
eventMgr *events.Manager
|
||||
enabled bool
|
||||
}
|
||||
|
||||
func newConfig(options ...Option) *config {
|
||||
|
@ -26,6 +27,7 @@ func newConfig(options ...Option) *config {
|
|||
WithSessionRefreshGracePeriod(defaultSessionRefreshGracePeriod)(cfg)
|
||||
WithSessionRefreshCoolOffDuration(defaultSessionRefreshCoolOffDuration)(cfg)
|
||||
WithNow(time.Now)(cfg)
|
||||
WithEnabled(true)(cfg)
|
||||
for _, option := range options {
|
||||
option(cfg)
|
||||
}
|
||||
|
@ -72,7 +74,14 @@ func WithNow(now func() time.Time) Option {
|
|||
|
||||
// WithEventManager passes an event manager to record events
|
||||
func WithEventManager(mgr *events.Manager) Option {
|
||||
return func(c *config) {
|
||||
c.eventMgr = mgr
|
||||
return func(cfg *config) {
|
||||
cfg.eventMgr = mgr
|
||||
}
|
||||
}
|
||||
|
||||
// WithEnabled sets the enabled option in the config.
|
||||
func WithEnabled(enabled bool) Option {
|
||||
return func(cfg *config) {
|
||||
cfg.enabled = enabled
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/atomicutil"
|
||||
"github.com/pomerium/pomerium/internal/enabler"
|
||||
"github.com/pomerium/pomerium/internal/events"
|
||||
"github.com/pomerium/pomerium/internal/identity/identity"
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
|
@ -43,6 +44,7 @@ type (
|
|||
|
||||
// A Manager refreshes identity information using session and user data.
|
||||
type Manager struct {
|
||||
enabler.Enabler
|
||||
cfg *atomicutil.Value[*config]
|
||||
|
||||
sessionScheduler *scheduler.Scheduler
|
||||
|
@ -62,6 +64,7 @@ func New(
|
|||
sessionScheduler: scheduler.New(),
|
||||
userScheduler: scheduler.New(),
|
||||
}
|
||||
mgr.Enabler = enabler.New("identity_manager", mgr, true)
|
||||
mgr.reset()
|
||||
mgr.UpdateConfig(options...)
|
||||
return mgr
|
||||
|
@ -76,10 +79,15 @@ func withLog(ctx context.Context) context.Context {
|
|||
// UpdateConfig updates the manager with the new options.
|
||||
func (mgr *Manager) UpdateConfig(options ...Option) {
|
||||
mgr.cfg.Store(newConfig(options...))
|
||||
if mgr.cfg.Load().enabled {
|
||||
mgr.Enable()
|
||||
} else {
|
||||
mgr.Disable()
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the manager. This method blocks until an error occurs or the given context is canceled.
|
||||
func (mgr *Manager) Run(ctx context.Context) error {
|
||||
// RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled.
|
||||
func (mgr *Manager) RunEnabled(ctx context.Context) error {
|
||||
leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr)
|
||||
return leaser.Run(ctx)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue