mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-03 00:40:25 +02:00
use retry package
This commit is contained in:
parent
de6d559a08
commit
54369e83e9
9 changed files with 274 additions and 161 deletions
|
@ -7,14 +7,12 @@ package reconciler
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/atomicutil"
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
connect_mux "github.com/pomerium/zero-sdk/connect-mux"
|
||||
)
|
||||
|
||||
|
@ -22,18 +20,12 @@ type service struct {
|
|||
config *reconcilerConfig
|
||||
|
||||
databrokerRateLimit *rate.Limiter
|
||||
downloadURLCache map[string]urlEntry
|
||||
|
||||
bundles Bundles
|
||||
|
||||
fullUpdateRequest chan struct{}
|
||||
bundleUpdatedRequest chan struct{}
|
||||
updateInterval atomicutil.Value[time.Duration]
|
||||
}
|
||||
|
||||
type urlEntry struct {
|
||||
URL url.URL
|
||||
ExpiresAt time.Time
|
||||
fullSyncRequest chan struct{}
|
||||
bundleSyncRequest chan struct{}
|
||||
periodicUpdateInterval atomicutil.Value[time.Duration]
|
||||
}
|
||||
|
||||
// Run creates a new bundle updater client
|
||||
|
@ -42,70 +34,19 @@ func Run(ctx context.Context, opts ...Option) error {
|
|||
config := newConfig(opts...)
|
||||
|
||||
c := &service{
|
||||
config: config,
|
||||
databrokerRateLimit: rate.NewLimiter(rate.Limit(config.databrokerRPS), 1),
|
||||
downloadURLCache: make(map[string]urlEntry),
|
||||
fullUpdateRequest: make(chan struct{}, 1),
|
||||
bundleUpdatedRequest: make(chan struct{}, 1),
|
||||
config: config,
|
||||
databrokerRateLimit: rate.NewLimiter(rate.Limit(config.databrokerRPS), 1),
|
||||
fullSyncRequest: make(chan struct{}, 1),
|
||||
}
|
||||
c.periodicUpdateInterval.Store(config.checkForUpdateIntervalWhenDisconnected)
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error { return c.watchUpdates(ctx) })
|
||||
eg.Go(func() error { return c.updateLoop(ctx) })
|
||||
eg.Go(func() error { return c.SyncLoop(ctx) })
|
||||
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (c *service) updateLoop(ctx context.Context) error {
|
||||
ticker := time.NewTicker(time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
next, ok := c.bundles.GetNextBundleToSync()
|
||||
if ok {
|
||||
err := c.SyncBundle(ctx, next)
|
||||
if err != nil {
|
||||
log.Error(ctx).Err(err).Str("bundle", next).Msg("reconciler: failed to sync bundle")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-c.fullUpdateRequest:
|
||||
c.fullUpdate(ctx)
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-c.fullUpdateRequest:
|
||||
c.fullUpdate(ctx)
|
||||
case <-ticker.C:
|
||||
c.fullUpdate(ctx)
|
||||
case <-c.bundleUpdatedRequest:
|
||||
}
|
||||
|
||||
ticker.Reset(c.updateInterval.Load())
|
||||
}
|
||||
}
|
||||
|
||||
func (c *service) fullUpdate(ctx context.Context) {
|
||||
err := c.RefreshBundleList(ctx)
|
||||
if err != nil {
|
||||
log.Error(ctx).Err(err).Msg("reconciler: failed to refresh bundle list")
|
||||
return
|
||||
}
|
||||
|
||||
err = c.PurgeRecordsNotInList(ctx)
|
||||
if err != nil {
|
||||
log.Error(ctx).Err(err).Msg("reconciler: failed to purge records not in list")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// run is a main control loop.
|
||||
// it is very simple and sequential download and reconcile.
|
||||
// it may be later optimized by splitting between download and reconciliation process,
|
||||
|
@ -125,11 +66,11 @@ func (c *service) watchUpdates(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (c *service) triggerBundleUpdate(id string) {
|
||||
c.updateInterval.Store(c.config.checkForUpdateIntervalWhenConnected)
|
||||
c.periodicUpdateInterval.Store(c.config.checkForUpdateIntervalWhenConnected)
|
||||
c.bundles.MarkForSync(id)
|
||||
|
||||
select {
|
||||
case c.bundleUpdatedRequest <- struct{}{}:
|
||||
case c.fullSyncRequest <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
@ -139,10 +80,10 @@ func (c *service) triggerFullUpdate(connected bool) {
|
|||
if connected {
|
||||
timeout = c.config.checkForUpdateIntervalWhenConnected
|
||||
}
|
||||
c.updateInterval.Store(timeout)
|
||||
c.periodicUpdateInterval.Store(timeout)
|
||||
|
||||
select {
|
||||
case c.fullUpdateRequest <- struct{}{}:
|
||||
case c.fullSyncRequest <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue