diff --git a/databroker/cache.go b/databroker/cache.go index a26a8ebcd..8105ed932 100644 --- a/databroker/cache.go +++ b/databroker/cache.go @@ -33,6 +33,7 @@ import ( // DataBroker represents the databroker service. The databroker service is a simple interface // for storing keyed blobs (bytes) of unstructured data. type DataBroker struct { + Options dataBrokerServer *dataBrokerServer manager *manager.Manager legacyManager *legacymanager.Manager @@ -44,8 +45,36 @@ type DataBroker struct { sharedKey *atomicutil.Value[[]byte] } +type Options struct { + managerOptions []manager.Option + legacyManagerOptions []legacymanager.Option +} + +type Option func(*Options) + +func (o *Options) apply(opts ...Option) { + for _, op := range opts { + op(o) + } +} + +func WithManagerOptions(managerOptions ...manager.Option) Option { + return func(o *Options) { + o.managerOptions = append(o.managerOptions, managerOptions...) + } +} + +func WithLegacyManagerOptions(legacyManagerOptions ...legacymanager.Option) Option { + return func(o *Options) { + o.legacyManagerOptions = append(o.legacyManagerOptions, legacyManagerOptions...) + } +} + // New creates a new databroker service. -func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager) (*DataBroker, error) { +func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager, opts ...Option) (*DataBroker, error) { + options := Options{} + options.apply(opts...) + localListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, err @@ -97,6 +126,7 @@ func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager) (*D } c := &DataBroker{ + Options: options, dataBrokerServer: dataBrokerServer, localListener: localListener, localGRPCServer: localGRPCServer, @@ -160,16 +190,16 @@ func (c *DataBroker) update(ctx context.Context, cfg *config.Config) error { dataBrokerClient := databroker.NewDataBrokerServiceClient(c.localGRPCConnection) - options := []manager.Option{ + options := append([]manager.Option{ manager.WithDataBrokerClient(dataBrokerClient), manager.WithEventManager(c.eventsMgr), manager.WithEnabled(!cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagLegacyIdentityManager)), - } - legacyOptions := []legacymanager.Option{ + }, c.managerOptions...) + legacyOptions := append([]legacymanager.Option{ legacymanager.WithDataBrokerClient(dataBrokerClient), legacymanager.WithEventManager(c.eventsMgr), legacymanager.WithEnabled(cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagLegacyIdentityManager)), - } + }, c.legacyManagerOptions...) if cfg.Options.SupportsUserRefresh() { authenticator, err := identity.NewAuthenticator(oauthOptions) diff --git a/internal/retry/config.go b/internal/retry/config.go index f55e6282e..7a783fbf3 100644 --- a/internal/retry/config.go +++ b/internal/retry/config.go @@ -9,8 +9,9 @@ import ( ) type config struct { - maxInterval time.Duration - watches []watch + maxInterval time.Duration + initialInterval time.Duration + watches []watch backoff.BackOff } @@ -42,10 +43,18 @@ func WithMaxInterval(d time.Duration) Option { } } +// WithInitialInterval sets the initial backoff interval. +func WithInitialInterval(d time.Duration) Option { + return func(cfg *config) { + cfg.initialInterval = d + } +} + func newConfig(opts ...Option) ([]watch, backoff.BackOff) { cfg := new(config) for _, opt := range []Option{ WithMaxInterval(time.Minute * 5), + WithInitialInterval(backoff.DefaultInitialInterval), } { opt(cfg) } @@ -59,6 +68,7 @@ func newConfig(opts ...Option) ([]watch, backoff.BackOff) { } bo := backoff.NewExponentialBackOff() + bo.InitialInterval = cfg.initialInterval bo.MaxInterval = cfg.maxInterval bo.MaxElapsedTime = 0 bo.Multiplier = 2 diff --git a/pkg/cmd/pomerium/pomerium.go b/pkg/cmd/pomerium/pomerium.go index d70b3959d..de3851fe6 100644 --- a/pkg/cmd/pomerium/pomerium.go +++ b/pkg/cmd/pomerium/pomerium.go @@ -32,7 +32,8 @@ import ( ) type RunOptions struct { - fileMgr *filemgr.Manager + fileMgr *filemgr.Manager + databrokerServerOptions []databroker_service.Option } type RunOption func(*RunOptions) @@ -49,6 +50,12 @@ func WithOverrideFileManager(fileMgr *filemgr.Manager) RunOption { } } +func WithDataBrokerServerOptions(opts ...databroker_service.Option) RunOption { + return func(o *RunOptions) { + o.databrokerServerOptions = append(o.databrokerServerOptions, opts...) + } +} + // Run runs the main pomerium application. func Run(ctx context.Context, src config.Source, opts ...RunOption) error { options := RunOptions{} @@ -142,7 +149,7 @@ func Run(ctx context.Context, src config.Source, opts ...RunOption) error { } var dataBrokerServer *databroker_service.DataBroker if config.IsDataBroker(src.GetConfig().Options.Services) { - dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr) + dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr, options.databrokerServerOptions...) if err != nil { return fmt.Errorf("setting up databroker: %w", err) } @@ -226,8 +233,9 @@ func setupDataBroker(ctx context.Context, src config.Source, controlPlane *controlplane.Server, eventsMgr *events.Manager, + opts ...databroker_service.Option, ) (*databroker_service.DataBroker, error) { - svc, err := databroker_service.New(ctx, src.GetConfig(), eventsMgr) + svc, err := databroker_service.New(ctx, src.GetConfig(), eventsMgr, opts...) if err != nil { return nil, fmt.Errorf("error creating databroker service: %w", err) } diff --git a/pkg/identity/legacymanager/config.go b/pkg/identity/legacymanager/config.go index 49682fbd2..c518571b1 100644 --- a/pkg/identity/legacymanager/config.go +++ b/pkg/identity/legacymanager/config.go @@ -10,6 +10,7 @@ import ( var ( defaultSessionRefreshGracePeriod = 1 * time.Minute defaultSessionRefreshCoolOffDuration = 10 * time.Second + defaultLeaseTTL = 30 * time.Second ) type config struct { @@ -17,6 +18,7 @@ type config struct { dataBrokerClient databroker.DataBrokerServiceClient sessionRefreshGracePeriod time.Duration sessionRefreshCoolOffDuration time.Duration + leaseTTL time.Duration now func() time.Time eventMgr *events.Manager enabled bool @@ -28,6 +30,7 @@ func newConfig(options ...Option) *config { WithSessionRefreshCoolOffDuration(defaultSessionRefreshCoolOffDuration)(cfg) WithNow(time.Now)(cfg) WithEnabled(true)(cfg) + WithLeaseTTL(defaultLeaseTTL)(cfg) for _, option := range options { option(cfg) } @@ -85,3 +88,9 @@ func WithEnabled(enabled bool) Option { cfg.enabled = enabled } } + +func WithLeaseTTL(ttl time.Duration) Option { + return func(o *config) { + o.leaseTTL = ttl + } +} diff --git a/pkg/identity/legacymanager/manager.go b/pkg/identity/legacymanager/manager.go index 0bf68b525..341fe206a 100644 --- a/pkg/identity/legacymanager/manager.go +++ b/pkg/identity/legacymanager/manager.go @@ -88,7 +88,7 @@ func (mgr *Manager) UpdateConfig(options ...Option) { // RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled. func (mgr *Manager) RunEnabled(ctx context.Context) error { - leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr) + leaser := databroker.NewLeaser("identity_manager", mgr.cfg.Load().leaseTTL, mgr) return leaser.Run(ctx) } diff --git a/pkg/identity/manager/config.go b/pkg/identity/manager/config.go index 5fbca8b67..7c1c0f116 100644 --- a/pkg/identity/manager/config.go +++ b/pkg/identity/manager/config.go @@ -11,6 +11,7 @@ var ( defaultSessionRefreshGracePeriod = 1 * time.Minute defaultSessionRefreshCoolOffDuration = 10 * time.Second defaultUpdateUserInfoInterval = 10 * time.Minute + defaultLeaseTTL = 30 * time.Second ) type config struct { @@ -19,6 +20,7 @@ type config struct { sessionRefreshGracePeriod time.Duration sessionRefreshCoolOffDuration time.Duration updateUserInfoInterval time.Duration + leaseTTL time.Duration now func() time.Time eventMgr *events.Manager enabled bool @@ -31,6 +33,7 @@ func newConfig(options ...Option) *config { WithNow(time.Now)(cfg) WithUpdateUserInfoInterval(defaultUpdateUserInfoInterval)(cfg) WithEnabled(true)(cfg) + WithLeaseTTL(defaultLeaseTTL)(cfg) for _, option := range options { option(cfg) } @@ -95,3 +98,10 @@ func WithUpdateUserInfoInterval(dur time.Duration) Option { cfg.updateUserInfoInterval = dur } } + +// WithLeaseTTL sets the TTL used by the leaser. +func WithLeaseTTL(ttl time.Duration) Option { + return func(o *config) { + o.leaseTTL = ttl + } +} diff --git a/pkg/identity/manager/manager.go b/pkg/identity/manager/manager.go index 2353377cd..91c0b330a 100644 --- a/pkg/identity/manager/manager.go +++ b/pkg/identity/manager/manager.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "sync" - "time" "github.com/rs/zerolog" "golang.org/x/oauth2" @@ -79,7 +78,7 @@ func (mgr *Manager) GetDataBrokerServiceClient() databroker.DataBrokerServiceCli // RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled. func (mgr *Manager) RunEnabled(ctx context.Context) error { - leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr) + leaser := databroker.NewLeaser("identity_manager", mgr.cfg.Load().leaseTTL, mgr) return leaser.Run(ctx) }