diff --git a/internal/retry/backoff.go b/internal/retry/backoff.go new file mode 100644 index 000000000..008da0b39 --- /dev/null +++ b/internal/retry/backoff.go @@ -0,0 +1,24 @@ +package retry + +import ( + "context" + + "github.com/cenkalti/backoff/v4" +) + +// WithBackoff retries the given function with an exponential backoff, +// stopping when the context is done or the function returns a terminal error. +func WithBackoff(ctx context.Context, fn func(context.Context) error) error { + b := backoff.NewExponentialBackOff() + b.MaxElapsedTime = 0 + return backoff.Retry( + func() error { + err := fn(ctx) + if IsTerminalError(err) { + return backoff.Permanent(err) + } + return err + }, + backoff.WithContext(b, ctx), + ) +} diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index 114be81d1..96bc6cd06 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -17,7 +17,6 @@ import ( "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/internal/zero/bootstrap/writers" "github.com/pomerium/pomerium/internal/zero/healthcheck" - "github.com/pomerium/pomerium/internal/zero/leaser" "github.com/pomerium/pomerium/internal/zero/reconciler" "github.com/pomerium/pomerium/internal/zero/reporter" "github.com/pomerium/pomerium/pkg/cmd/pomerium" @@ -123,11 +122,25 @@ func (c *controller) runConnect(ctx context.Context) error { } func (c *controller) runZeroControlLoop(ctx context.Context) error { - return leaser.Run(ctx, c.bootstrapConfig, - c.runReconcilerLeased, - c.runAnalyticsLeased, - c.runMetricsReporterLeased, - c.runHealthChecksLeased, + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("control-group", "zero-cluster") + }) + + err := c.bootstrapConfig.WaitReady(ctx) + if err != nil { + return fmt.Errorf("waiting for config source to be ready: %w", err) + } + + r := c.NewDatabrokerRestartRunner(ctx) + defer r.Close() + + return r.Run(ctx, + WithLease( + c.runReconcilerLeased, + c.runAnalyticsLeased, + c.runMetricsReporterLeased, + c.runHealthChecksLeased, + ), ) } diff --git a/internal/zero/controller/leaser.go b/internal/zero/controller/leaser.go new file mode 100644 index 000000000..cc295ab1d --- /dev/null +++ b/internal/zero/controller/leaser.go @@ -0,0 +1,48 @@ +package controller + +import ( + "context" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/retry" + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +type leaser struct { + client databroker.DataBrokerServiceClient + funcs []DbcFunc +} + +// GetDataBrokerServiceClient implements the databroker.LeaseHandler interface. +func (c *leaser) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient { + return c.client +} + +// RunLeased implements the databroker.LeaseHandler interface. +func (c *leaser) RunLeased(ctx context.Context) error { + log.Debug(ctx).Msg("leaser: running leased functions") + + eg, ctx := errgroup.WithContext(ctx) + for _, fn := range c.funcs { + eg.Go(func() error { + return retry.WithBackoff(ctx, func(ctx context.Context) error { return fn(ctx, c.client) }) + }) + } + err := eg.Wait() + log.Debug(ctx).Err(err).Msg("leaser: done running leased functions") + return err +} + +func WithLease(funcs ...DbcFunc) DbcFunc { + return func(ctx context.Context, client databroker.DataBrokerServiceClient) error { + srv := &leaser{ + client: client, + funcs: funcs, + } + leaser := databroker.NewLeaser("zero-ctrl", time.Second*30, srv) + return leaser.Run(ctx) + } +} diff --git a/internal/zero/leaser/runner.go b/internal/zero/controller/runner.go similarity index 72% rename from internal/zero/leaser/runner.go rename to internal/zero/controller/runner.go index aa44f7b53..f9886bd4b 100644 --- a/internal/zero/leaser/runner.go +++ b/internal/zero/controller/runner.go @@ -1,4 +1,4 @@ -package leaser +package controller import ( "context" @@ -9,10 +9,11 @@ import ( "net/url" "sync" - "github.com/cenkalti/backoff/v4" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/retry" "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/pkg/grpc/databroker" @@ -22,35 +23,6 @@ import ( // ErrBootstrapConfigurationChanged is returned when the bootstrap configuration has changed and the function needs to be restarted. var ErrBootstrapConfigurationChanged = errors.New("bootstrap configuration changed") -// Run runs the given function with a databroker client. -// the function would be restarted if the databroker connection has to be re-established. -func Run( - ctx context.Context, - source *bootstrap.Source, - funcs ...func(ctx context.Context, client databroker.DataBrokerServiceClient) error, -) error { - err := source.WaitReady(ctx) - if err != nil { - return fmt.Errorf("waiting for config source to be ready: %w", err) - } - - p := newRunner(ctx, source) - defer p.Close() - - b := backoff.NewExponentialBackOff() - b.MaxElapsedTime = 0 - return backoff.Retry( - func() error { - err := p.runOnce(ctx, funcs...) - if retry.IsTerminalError(err) { - return backoff.Permanent(err) - } - return err - }, - backoff.WithContext(b, ctx), - ) -} - type runner struct { source *bootstrap.Source @@ -61,15 +33,21 @@ type runner struct { initError error } -func newRunner(ctx context.Context, source *bootstrap.Source) *runner { +func (c *controller) NewDatabrokerRestartRunner(ctx context.Context) *runner { p := &runner{ - source: source, + source: c.bootstrapConfig, } - p.initLocked(ctx, source.GetConfig()) - source.OnConfigChange(context.Background(), p.onConfigChange) + p.initLocked(ctx, c.bootstrapConfig.GetConfig()) + c.bootstrapConfig.OnConfigChange(context.Background(), p.onConfigChange) return p } +type DbcFunc func(context.Context, databroker.DataBrokerServiceClient) error + +func (p *runner) Run(ctx context.Context, funcs ...DbcFunc) error { + return retry.WithBackoff(ctx, func(ctx context.Context) error { return p.runUntilDatabrokerChanges(ctx, funcs...) }) +} + // Close releases the resources used by the databroker provider. func (p *runner) Close() { p.lock.Lock() @@ -123,10 +101,13 @@ func (p *runner) closeLocked() { p.initError = errors.New("databroker connection closed") } -func (p *runner) runOnce( +func (p *runner) runUntilDatabrokerChanges( ctx context.Context, - funcs ...func(ctx context.Context, client databroker.DataBrokerServiceClient) error, + funcs ...DbcFunc, ) error { + log.Debug(ctx).Msg("starting") + defer log.Debug(ctx).Msg("stop") + client, cancelCh, err := p.getDatabrokerClient() if err != nil { return fmt.Errorf("get databroker client: %w", err) @@ -139,11 +120,18 @@ func (p *runner) runOnce( select { case <-ctx.Done(): case <-cancelCh: + log.Debug(ctx).Msg("bootstrap configuration changed, restarting...") cancel(ErrBootstrapConfigurationChanged) } }() - return runWithLease(ctx, client, funcs...) + eg, ctx := errgroup.WithContext(ctx) + for _, fn := range funcs { + eg.Go(func() error { + return retry.WithBackoff(ctx, func(ctx context.Context) error { return fn(ctx, client) }) + }) + } + return eg.Wait() } func newDataBrokerConnection(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, error) { diff --git a/internal/zero/leaser/leaser.go b/internal/zero/leaser/leaser.go deleted file mode 100644 index c2b146a80..000000000 --- a/internal/zero/leaser/leaser.go +++ /dev/null @@ -1,56 +0,0 @@ -// Package leaser groups all Zero services that should run within a lease. -package leaser - -import ( - "context" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/pomerium/pomerium/internal/retry" - "github.com/pomerium/pomerium/pkg/grpc/databroker" -) - -type leaser struct { - cancel context.CancelCauseFunc - client databroker.DataBrokerServiceClient - funcs []func(ctx context.Context, client databroker.DataBrokerServiceClient) error -} - -// GetDataBrokerServiceClient implements the databroker.LeaseHandler interface. -func (c *leaser) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient { - return c.client -} - -// RunLeased implements the databroker.LeaseHandler interface. -func (c *leaser) RunLeased(ctx context.Context) error { - eg, ctx := errgroup.WithContext(ctx) - for _, fn := range append(c.funcs, databrokerChangeMonitor) { - fn := fn - eg.Go(func() error { - err := fn(ctx, c.client) - if retry.IsTerminalError(err) { - c.cancel(err) - } - return err - }) - } - return eg.Wait() -} - -func runWithLease( - ctx context.Context, - client databroker.DataBrokerServiceClient, - funcs ...func(context.Context, databroker.DataBrokerServiceClient) error, -) error { - ctx, cancel := context.WithCancelCause(ctx) - defer cancel(context.Canceled) - - srv := &leaser{ - cancel: cancel, - client: client, - funcs: funcs, - } - leaser := databroker.NewLeaser("zero-ctrl", time.Second*30, srv) - return leaser.Run(ctx) -} diff --git a/internal/zero/leaser/monitor.go b/internal/zero/leaser/monitor.go deleted file mode 100644 index 5fe771cfc..000000000 --- a/internal/zero/leaser/monitor.go +++ /dev/null @@ -1,37 +0,0 @@ -package leaser - -import ( - "context" - "fmt" - - "github.com/pomerium/pomerium/pkg/grpc/databroker" -) - -const typeStr = "pomerium.io/zero/leaser" - -// databrokerChangeMonitor runs infinite sync loop to see if there is any change in databroker -// it doesn't really syncs anything, just checks if the underlying databroker has changed -func databrokerChangeMonitor(ctx context.Context, client databroker.DataBrokerServiceClient) error { - _, recordVersion, serverVersion, err := databroker.InitialSync(ctx, client, &databroker.SyncLatestRequest{ - Type: typeStr, - }) - if err != nil { - return fmt.Errorf("error during initial sync: %w", err) - } - - stream, err := client.Sync(ctx, &databroker.SyncRequest{ - Type: typeStr, - ServerVersion: serverVersion, - RecordVersion: recordVersion, - }) - if err != nil { - return fmt.Errorf("error calling sync: %w", err) - } - - for { - _, err := stream.Recv() - if err != nil { - return fmt.Errorf("error receiving record: %w", err) - } - } -}