add options to adjust databroker lease ttl, and retry initial interval (#5391)

This commit is contained in:
Joe Kralicky 2024-12-13 14:01:43 -05:00 committed by GitHub
parent f876e2f31d
commit ecd2855dcc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 79 additions and 13 deletions

View file

@ -33,6 +33,7 @@ import (
// DataBroker represents the databroker service. The databroker service is a simple interface
// for storing keyed blobs (bytes) of unstructured data.
type DataBroker struct {
Options
dataBrokerServer *dataBrokerServer
manager *manager.Manager
legacyManager *legacymanager.Manager
@ -44,8 +45,36 @@ type DataBroker struct {
sharedKey *atomicutil.Value[[]byte]
}
type Options struct {
managerOptions []manager.Option
legacyManagerOptions []legacymanager.Option
}
type Option func(*Options)
func (o *Options) apply(opts ...Option) {
for _, op := range opts {
op(o)
}
}
func WithManagerOptions(managerOptions ...manager.Option) Option {
return func(o *Options) {
o.managerOptions = append(o.managerOptions, managerOptions...)
}
}
func WithLegacyManagerOptions(legacyManagerOptions ...legacymanager.Option) Option {
return func(o *Options) {
o.legacyManagerOptions = append(o.legacyManagerOptions, legacyManagerOptions...)
}
}
// New creates a new databroker service.
func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager) (*DataBroker, error) {
func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager, opts ...Option) (*DataBroker, error) {
options := Options{}
options.apply(opts...)
localListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
@ -97,6 +126,7 @@ func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager) (*D
}
c := &DataBroker{
Options: options,
dataBrokerServer: dataBrokerServer,
localListener: localListener,
localGRPCServer: localGRPCServer,
@ -160,16 +190,16 @@ func (c *DataBroker) update(ctx context.Context, cfg *config.Config) error {
dataBrokerClient := databroker.NewDataBrokerServiceClient(c.localGRPCConnection)
options := []manager.Option{
options := append([]manager.Option{
manager.WithDataBrokerClient(dataBrokerClient),
manager.WithEventManager(c.eventsMgr),
manager.WithEnabled(!cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagLegacyIdentityManager)),
}
legacyOptions := []legacymanager.Option{
}, c.managerOptions...)
legacyOptions := append([]legacymanager.Option{
legacymanager.WithDataBrokerClient(dataBrokerClient),
legacymanager.WithEventManager(c.eventsMgr),
legacymanager.WithEnabled(cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagLegacyIdentityManager)),
}
}, c.legacyManagerOptions...)
if cfg.Options.SupportsUserRefresh() {
authenticator, err := identity.NewAuthenticator(oauthOptions)

View file

@ -9,8 +9,9 @@ import (
)
type config struct {
maxInterval time.Duration
watches []watch
maxInterval time.Duration
initialInterval time.Duration
watches []watch
backoff.BackOff
}
@ -42,10 +43,18 @@ func WithMaxInterval(d time.Duration) Option {
}
}
// WithInitialInterval sets the initial backoff interval.
func WithInitialInterval(d time.Duration) Option {
return func(cfg *config) {
cfg.initialInterval = d
}
}
func newConfig(opts ...Option) ([]watch, backoff.BackOff) {
cfg := new(config)
for _, opt := range []Option{
WithMaxInterval(time.Minute * 5),
WithInitialInterval(backoff.DefaultInitialInterval),
} {
opt(cfg)
}
@ -59,6 +68,7 @@ func newConfig(opts ...Option) ([]watch, backoff.BackOff) {
}
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = cfg.initialInterval
bo.MaxInterval = cfg.maxInterval
bo.MaxElapsedTime = 0
bo.Multiplier = 2

View file

@ -32,7 +32,8 @@ import (
)
type RunOptions struct {
fileMgr *filemgr.Manager
fileMgr *filemgr.Manager
databrokerServerOptions []databroker_service.Option
}
type RunOption func(*RunOptions)
@ -49,6 +50,12 @@ func WithOverrideFileManager(fileMgr *filemgr.Manager) RunOption {
}
}
func WithDataBrokerServerOptions(opts ...databroker_service.Option) RunOption {
return func(o *RunOptions) {
o.databrokerServerOptions = append(o.databrokerServerOptions, opts...)
}
}
// Run runs the main pomerium application.
func Run(ctx context.Context, src config.Source, opts ...RunOption) error {
options := RunOptions{}
@ -142,7 +149,7 @@ func Run(ctx context.Context, src config.Source, opts ...RunOption) error {
}
var dataBrokerServer *databroker_service.DataBroker
if config.IsDataBroker(src.GetConfig().Options.Services) {
dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr)
dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr, options.databrokerServerOptions...)
if err != nil {
return fmt.Errorf("setting up databroker: %w", err)
}
@ -226,8 +233,9 @@ func setupDataBroker(ctx context.Context,
src config.Source,
controlPlane *controlplane.Server,
eventsMgr *events.Manager,
opts ...databroker_service.Option,
) (*databroker_service.DataBroker, error) {
svc, err := databroker_service.New(ctx, src.GetConfig(), eventsMgr)
svc, err := databroker_service.New(ctx, src.GetConfig(), eventsMgr, opts...)
if err != nil {
return nil, fmt.Errorf("error creating databroker service: %w", err)
}

View file

@ -10,6 +10,7 @@ import (
var (
defaultSessionRefreshGracePeriod = 1 * time.Minute
defaultSessionRefreshCoolOffDuration = 10 * time.Second
defaultLeaseTTL = 30 * time.Second
)
type config struct {
@ -17,6 +18,7 @@ type config struct {
dataBrokerClient databroker.DataBrokerServiceClient
sessionRefreshGracePeriod time.Duration
sessionRefreshCoolOffDuration time.Duration
leaseTTL time.Duration
now func() time.Time
eventMgr *events.Manager
enabled bool
@ -28,6 +30,7 @@ func newConfig(options ...Option) *config {
WithSessionRefreshCoolOffDuration(defaultSessionRefreshCoolOffDuration)(cfg)
WithNow(time.Now)(cfg)
WithEnabled(true)(cfg)
WithLeaseTTL(defaultLeaseTTL)(cfg)
for _, option := range options {
option(cfg)
}
@ -85,3 +88,9 @@ func WithEnabled(enabled bool) Option {
cfg.enabled = enabled
}
}
func WithLeaseTTL(ttl time.Duration) Option {
return func(o *config) {
o.leaseTTL = ttl
}
}

View file

@ -88,7 +88,7 @@ func (mgr *Manager) UpdateConfig(options ...Option) {
// RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled.
func (mgr *Manager) RunEnabled(ctx context.Context) error {
leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr)
leaser := databroker.NewLeaser("identity_manager", mgr.cfg.Load().leaseTTL, mgr)
return leaser.Run(ctx)
}

View file

@ -11,6 +11,7 @@ var (
defaultSessionRefreshGracePeriod = 1 * time.Minute
defaultSessionRefreshCoolOffDuration = 10 * time.Second
defaultUpdateUserInfoInterval = 10 * time.Minute
defaultLeaseTTL = 30 * time.Second
)
type config struct {
@ -19,6 +20,7 @@ type config struct {
sessionRefreshGracePeriod time.Duration
sessionRefreshCoolOffDuration time.Duration
updateUserInfoInterval time.Duration
leaseTTL time.Duration
now func() time.Time
eventMgr *events.Manager
enabled bool
@ -31,6 +33,7 @@ func newConfig(options ...Option) *config {
WithNow(time.Now)(cfg)
WithUpdateUserInfoInterval(defaultUpdateUserInfoInterval)(cfg)
WithEnabled(true)(cfg)
WithLeaseTTL(defaultLeaseTTL)(cfg)
for _, option := range options {
option(cfg)
}
@ -95,3 +98,10 @@ func WithUpdateUserInfoInterval(dur time.Duration) Option {
cfg.updateUserInfoInterval = dur
}
}
// WithLeaseTTL sets the TTL used by the leaser.
func WithLeaseTTL(ttl time.Duration) Option {
return func(o *config) {
o.leaseTTL = ttl
}
}

View file

@ -5,7 +5,6 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/rs/zerolog"
"golang.org/x/oauth2"
@ -79,7 +78,7 @@ func (mgr *Manager) GetDataBrokerServiceClient() databroker.DataBrokerServiceCli
// RunEnabled runs the manager. This method blocks until an error occurs or the given context is canceled.
func (mgr *Manager) RunEnabled(ctx context.Context) error {
leaser := databroker.NewLeaser("identity_manager", time.Second*30, mgr)
leaser := databroker.NewLeaser("identity_manager", mgr.cfg.Load().leaseTTL, mgr)
return leaser.Run(ctx)
}