diff --git a/databroker/cache.go b/databroker/cache.go index 48395a8c3..47e1bd8c6 100644 --- a/databroker/cache.go +++ b/databroker/cache.go @@ -35,6 +35,7 @@ import ( // DataBroker represents the databroker service. The databroker service is a simple interface // for storing keyed blobs (bytes) of unstructured data. type DataBroker struct { + Options dataBrokerServer *dataBrokerServer manager *manager.Manager legacyManager *legacymanager.Manager @@ -48,8 +49,36 @@ type DataBroker struct { tracer oteltrace.Tracer } +type Options struct { + managerOptions []manager.Option + legacyManagerOptions []legacymanager.Option +} + +type Option func(*Options) + +func (o *Options) apply(opts ...Option) { + for _, op := range opts { + op(o) + } +} + +func WithManagerOptions(managerOptions ...manager.Option) Option { + return func(o *Options) { + o.managerOptions = append(o.managerOptions, managerOptions...) + } +} + +func WithLegacyManagerOptions(legacyManagerOptions ...legacymanager.Option) Option { + return func(o *Options) { + o.legacyManagerOptions = append(o.legacyManagerOptions, legacyManagerOptions...) + } +} + // New creates a new databroker service. -func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager) (*DataBroker, error) { +func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager, opts ...Option) (*DataBroker, error) { + options := Options{} + options.apply(opts...) + localListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, err @@ -103,6 +132,7 @@ func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager) (*D } c := &DataBroker{ + Options: options, dataBrokerServer: dataBrokerServer, localListener: localListener, localGRPCServer: localGRPCServer, @@ -168,16 +198,16 @@ func (c *DataBroker) update(ctx context.Context, cfg *config.Config) error { dataBrokerClient := databroker.NewDataBrokerServiceClient(c.localGRPCConnection) - options := []manager.Option{ + options := append([]manager.Option{ manager.WithDataBrokerClient(dataBrokerClient), manager.WithEventManager(c.eventsMgr), manager.WithEnabled(!cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagLegacyIdentityManager)), - } - legacyOptions := []legacymanager.Option{ + }, c.managerOptions...) + legacyOptions := append([]legacymanager.Option{ legacymanager.WithDataBrokerClient(dataBrokerClient), legacymanager.WithEventManager(c.eventsMgr), legacymanager.WithEnabled(cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagLegacyIdentityManager)), - } + }, c.legacyManagerOptions...) if cfg.Options.SupportsUserRefresh() { authenticator, err := identity.NewAuthenticator(ctx, c.tracerProvider, oauthOptions) diff --git a/internal/retry/config.go b/internal/retry/config.go index f55e6282e..7a783fbf3 100644 --- a/internal/retry/config.go +++ b/internal/retry/config.go @@ -9,8 +9,9 @@ import ( ) type config struct { - maxInterval time.Duration - watches []watch + maxInterval time.Duration + initialInterval time.Duration + watches []watch backoff.BackOff } @@ -42,10 +43,18 @@ func WithMaxInterval(d time.Duration) Option { } } +// WithInitialInterval sets the initial backoff interval. +func WithInitialInterval(d time.Duration) Option { + return func(cfg *config) { + cfg.initialInterval = d + } +} + func newConfig(opts ...Option) ([]watch, backoff.BackOff) { cfg := new(config) for _, opt := range []Option{ WithMaxInterval(time.Minute * 5), + WithInitialInterval(backoff.DefaultInitialInterval), } { opt(cfg) } @@ -59,6 +68,7 @@ func newConfig(opts ...Option) ([]watch, backoff.BackOff) { } bo := backoff.NewExponentialBackOff() + bo.InitialInterval = cfg.initialInterval bo.MaxInterval = cfg.maxInterval bo.MaxElapsedTime = 0 bo.Multiplier = 2 diff --git a/pkg/cmd/pomerium/pomerium.go b/pkg/cmd/pomerium/pomerium.go index 457c5f7d7..70096b3c9 100644 --- a/pkg/cmd/pomerium/pomerium.go +++ b/pkg/cmd/pomerium/pomerium.go @@ -32,8 +32,9 @@ import ( ) type Options struct { - fileMgr *filemgr.Manager - envoyServerOptions []envoy.ServerOption + fileMgr *filemgr.Manager + envoyServerOptions []envoy.ServerOption + databrokerServerOptions []databroker_service.Option } type Option func(*Options) @@ -56,6 +57,12 @@ func WithEnvoyServerOptions(opts ...envoy.ServerOption) Option { } } +func WithDataBrokerServerOptions(opts ...databroker_service.Option) Option { + return func(o *Options) { + o.databrokerServerOptions = append(o.databrokerServerOptions, opts...) + } +} + // Run runs the main pomerium application. func Run(ctx context.Context, src config.Source, opts ...Option) error { p := New(opts...) @@ -174,7 +181,7 @@ func (p *Pomerium) Start(ctx context.Context, tracerProvider oteltrace.TracerPro } var dataBrokerServer *databroker_service.DataBroker if config.IsDataBroker(src.GetConfig().Options.Services) { - dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr) + dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr, p.databrokerServerOptions...) if err != nil { return fmt.Errorf("setting up databroker: %w", err) } @@ -260,8 +267,9 @@ func setupDataBroker(ctx context.Context, src config.Source, controlPlane *controlplane.Server, eventsMgr *events.Manager, + opts ...databroker_service.Option, ) (*databroker_service.DataBroker, error) { - svc, err := databroker_service.New(ctx, src.GetConfig(), eventsMgr) + svc, err := databroker_service.New(ctx, src.GetConfig(), eventsMgr, opts...) if err != nil { return nil, fmt.Errorf("error creating databroker service: %w", err) } diff --git a/pkg/identity/legacymanager/config.go b/pkg/identity/legacymanager/config.go index 49682fbd2..c518571b1 100644 --- a/pkg/identity/legacymanager/config.go +++ b/pkg/identity/legacymanager/config.go @@ -10,6 +10,7 @@ import ( var ( defaultSessionRefreshGracePeriod = 1 * time.Minute defaultSessionRefreshCoolOffDuration = 10 * time.Second + defaultLeaseTTL = 30 * time.Second ) type config struct { @@ -17,6 +18,7 @@ type config struct { dataBrokerClient databroker.DataBrokerServiceClient sessionRefreshGracePeriod time.Duration sessionRefreshCoolOffDuration time.Duration + leaseTTL time.Duration now func() time.Time eventMgr *events.Manager enabled bool @@ -28,6 +30,7 @@ func newConfig(options ...Option) *config { WithSessionRefreshCoolOffDuration(defaultSessionRefreshCoolOffDuration)(cfg) WithNow(time.Now)(cfg) WithEnabled(true)(cfg) + WithLeaseTTL(defaultLeaseTTL)(cfg) for _, option := range options { option(cfg) } @@ -85,3 +88,9 @@ func WithEnabled(enabled bool) Option { cfg.enabled = enabled } } + +func WithLeaseTTL(ttl time.Duration) Option { + return func(o *config) { + o.leaseTTL = ttl + } +} diff --git a/pkg/identity/legacymanager/manager.go b/pkg/identity/legacymanager/manager.go index 0bf68b525..341fe206a 100644 --- a/pkg/identity/legacymanager/manager.go +++ b/pkg/identity/legacymanager/manager.go @@ -88,7 +88,7 @@ func (mgr *Manager) UpdateConfig(options ...Option) { // RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled. func (mgr *Manager) RunEnabled(ctx context.Context) error { - leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr) + leaser := databroker.NewLeaser("identity_manager", mgr.cfg.Load().leaseTTL, mgr) return leaser.Run(ctx) } diff --git a/pkg/identity/manager/config.go b/pkg/identity/manager/config.go index 5fbca8b67..7c1c0f116 100644 --- a/pkg/identity/manager/config.go +++ b/pkg/identity/manager/config.go @@ -11,6 +11,7 @@ var ( defaultSessionRefreshGracePeriod = 1 * time.Minute defaultSessionRefreshCoolOffDuration = 10 * time.Second defaultUpdateUserInfoInterval = 10 * time.Minute + defaultLeaseTTL = 30 * time.Second ) type config struct { @@ -19,6 +20,7 @@ type config struct { sessionRefreshGracePeriod time.Duration sessionRefreshCoolOffDuration time.Duration updateUserInfoInterval time.Duration + leaseTTL time.Duration now func() time.Time eventMgr *events.Manager enabled bool @@ -31,6 +33,7 @@ func newConfig(options ...Option) *config { WithNow(time.Now)(cfg) WithUpdateUserInfoInterval(defaultUpdateUserInfoInterval)(cfg) WithEnabled(true)(cfg) + WithLeaseTTL(defaultLeaseTTL)(cfg) for _, option := range options { option(cfg) } @@ -95,3 +98,10 @@ func WithUpdateUserInfoInterval(dur time.Duration) Option { cfg.updateUserInfoInterval = dur } } + +// WithLeaseTTL sets the TTL used by the leaser. +func WithLeaseTTL(ttl time.Duration) Option { + return func(o *config) { + o.leaseTTL = ttl + } +} diff --git a/pkg/identity/manager/manager.go b/pkg/identity/manager/manager.go index 2353377cd..91c0b330a 100644 --- a/pkg/identity/manager/manager.go +++ b/pkg/identity/manager/manager.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "sync" - "time" "github.com/rs/zerolog" "golang.org/x/oauth2" @@ -79,7 +78,7 @@ func (mgr *Manager) GetDataBrokerServiceClient() databroker.DataBrokerServiceCli // RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled. func (mgr *Manager) RunEnabled(ctx context.Context) error { - leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr) + leaser := databroker.NewLeaser("identity_manager", mgr.cfg.Load().leaseTTL, mgr) return leaser.Run(ctx) }