diff --git a/internal/retry/retry.go b/internal/retry/retry.go index f71bb3ca4..8b8b2d6e9 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -29,10 +29,6 @@ func Retry( }) watches, backoff := newConfig(opts...) - ticker := time.NewTicker(backoff.NextBackOff()) - defer ticker.Stop() - - s := makeSelect(ctx, watches, name, ticker.C, fn) restart: for { @@ -51,10 +47,11 @@ restart: backoff: for { interval := backoff.NextBackOff() - ticker.Reset(interval) log.Ctx(ctx).Warn().Msgf("backing off for %s...", interval.String()) - + timer := time.NewTimer(interval) + s := makeSelect(ctx, watches, name, timer.C, fn) next, err := s.Exec(ctx) + timer.Stop() logNext(ctx, next, err) switch next { case nextRestart: diff --git a/internal/zero/connect-mux/service.go b/internal/zero/connect-mux/service.go index 25db0bd4b..3dbc78644 100644 --- a/internal/zero/connect-mux/service.go +++ b/internal/zero/connect-mux/service.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "sync/atomic" - "time" "github.com/cenkalti/backoff/v4" "github.com/hashicorp/go-multierror" @@ -52,28 +51,16 @@ func (svc *Mux) Run(ctx context.Context, opts ...fanout.Option) error { } func (svc *Mux) run(ctx context.Context) error { - bo := backoff.NewExponentialBackOff() - bo.MaxElapsedTime = 0 - - ticker := time.NewTicker(time.Microsecond) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-ticker.C: - } - - err := svc.subscribeAndDispatch(ctx, bo.Reset) - if err != nil { - ticker.Reset(bo.NextBackOff()) - } + b := backoff.NewExponentialBackOff() + b.MaxElapsedTime = 0 + return backoff.Retry(func() error { + err := svc.subscribeAndDispatch(ctx, b.Reset) if apierror.IsTerminalError(err) { - return err + return backoff.Permanent(err) } - } + return err + }, backoff.WithContext(b, ctx)) } func (svc *Mux) subscribeAndDispatch(ctx context.Context, onConnected func()) (err error) {