pomerium/internal/zero/reconciler/service.go
2023-08-07 22:41:52 -04:00

148 lines
3.4 KiB
Go

package reconciler
/*
* This is a main control loop for the reconciler service.
*
*/
import (
"context"
"net/url"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"github.com/pomerium/pomerium/internal/atomicutil"
"github.com/pomerium/pomerium/internal/log"
connect_mux "github.com/pomerium/zero-sdk/connect-mux"
)
type service struct {
config *reconcilerConfig
databrokerRateLimit *rate.Limiter
downloadURLCache map[string]urlEntry
bundles Bundles
fullUpdateRequest chan struct{}
bundleUpdatedRequest chan struct{}
updateInterval atomicutil.Value[time.Duration]
}
type urlEntry struct {
URL url.URL
ExpiresAt time.Time
}
// Run creates a new bundle updater client
// that runs until the context is canceled or a fatal error occurs.
func Run(ctx context.Context, opts ...Option) error {
config := newConfig(opts...)
c := &service{
config: config,
databrokerRateLimit: rate.NewLimiter(rate.Limit(config.databrokerRPS), 1),
downloadURLCache: make(map[string]urlEntry),
fullUpdateRequest: make(chan struct{}, 1),
bundleUpdatedRequest: make(chan struct{}, 1),
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error { return c.watchUpdates(ctx) })
eg.Go(func() error { return c.updateLoop(ctx) })
return eg.Wait()
}
func (c *service) updateLoop(ctx context.Context) error {
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
for {
next, ok := c.bundles.GetNextBundleToSync()
if ok {
err := c.SyncBundle(ctx, next)
if err != nil {
log.Error(ctx).Err(err).Str("bundle", next).Msg("reconciler: failed to sync bundle")
}
select {
case <-ctx.Done():
return ctx.Err()
case <-c.fullUpdateRequest:
c.fullUpdate(ctx)
default:
continue
}
}
select {
case <-ctx.Done():
return ctx.Err()
case <-c.fullUpdateRequest:
c.fullUpdate(ctx)
case <-ticker.C:
c.fullUpdate(ctx)
case <-c.bundleUpdatedRequest:
}
ticker.Reset(c.updateInterval.Load())
}
}
func (c *service) fullUpdate(ctx context.Context) {
err := c.RefreshBundleList(ctx)
if err != nil {
log.Error(ctx).Err(err).Msg("reconciler: failed to refresh bundle list")
return
}
err = c.PurgeRecordsNotInList(ctx)
if err != nil {
log.Error(ctx).Err(err).Msg("reconciler: failed to purge records not in list")
return
}
}
// run is a main control loop.
// it is very simple and sequential download and reconcile.
// it may be later optimized by splitting between download and reconciliation process,
// as we would get more resource bundles beyond the config.
func (c *service) watchUpdates(ctx context.Context) error {
return c.config.connectMux.Watch(ctx,
connect_mux.WithOnConnected(func(ctx context.Context) {
c.triggerFullUpdate(true)
}),
connect_mux.WithOnDisconnected(func(_ context.Context) {
c.triggerFullUpdate(false)
}),
connect_mux.WithOnBundleUpdated(func(_ context.Context, key string) {
c.triggerBundleUpdate(key)
}),
)
}
func (c *service) triggerBundleUpdate(id string) {
c.updateInterval.Store(c.config.checkForUpdateIntervalWhenConnected)
c.bundles.MarkForSync(id)
select {
case c.bundleUpdatedRequest <- struct{}{}:
default:
}
}
func (c *service) triggerFullUpdate(connected bool) {
timeout := c.config.checkForUpdateIntervalWhenDisconnected
if connected {
timeout = c.config.checkForUpdateIntervalWhenConnected
}
c.updateInterval.Store(timeout)
select {
case c.fullUpdateRequest <- struct{}{}:
default:
}
}