From 3f7777f7e020e962d761b5dec41fb83e7e864f01 Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Mon, 30 Nov 2020 15:45:12 -0700 Subject: [PATCH] wait for initial sync to complete before starting control plane (#1636) --- authorize/authorize.go | 6 ++++++ authorize/evaluator/evaluator.go | 5 +++++ authorize/run.go | 31 ++++++++++++++++++++++++++++--- internal/cmd/pomerium/pomerium.go | 13 ++++++++++--- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/authorize/authorize.go b/authorize/authorize.go index 2487c827e..d3a2ff805 100644 --- a/authorize/authorize.go +++ b/authorize/authorize.go @@ -27,6 +27,8 @@ type Authorize struct { dataBrokerDataLock sync.RWMutex dataBrokerData evaluator.DataBrokerData + + dataBrokerInitialSync map[string]chan struct{} } // New validates and creates a new Authorize service from a set of config options. @@ -36,6 +38,10 @@ func New(cfg *config.Config) (*Authorize, error) { store: evaluator.NewStore(), templates: template.Must(frontend.NewTemplates()), dataBrokerData: make(evaluator.DataBrokerData), + dataBrokerInitialSync: map[string]chan struct{}{ + "type.googleapis.com/directory.Group": make(chan struct{}, 1), + "type.googleapis.com/directory.User": make(chan struct{}, 1), + }, } state, err := newAuthorizeStateFromConfig(cfg, a.store) diff --git a/authorize/evaluator/evaluator.go b/authorize/evaluator/evaluator.go index 5a63a5039..827b89bfd 100644 --- a/authorize/evaluator/evaluator.go +++ b/authorize/evaluator/evaluator.go @@ -420,6 +420,11 @@ func (dbd DataBrokerData) Clear(typeURL string) { delete(dbd, typeURL) } +// Count returns the number of entries for the given type URL. +func (dbd DataBrokerData) Count(typeURL string) int { + return len(dbd[typeURL]) +} + // Get gets a record from the DataBrokerData. func (dbd DataBrokerData) Get(typeURL, id string) interface{} { m, ok := dbd[typeURL] diff --git a/authorize/run.go b/authorize/run.go index 32fc0c07a..d5f452558 100644 --- a/authorize/run.go +++ b/authorize/run.go @@ -5,13 +5,12 @@ import ( "io" "time" - "github.com/pomerium/pomerium/internal/telemetry/trace" - backoff "github.com/cenkalti/backoff/v4" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) @@ -31,6 +30,20 @@ func (a *Authorize) Run(ctx context.Context) error { return eg.Wait() } +// WaitForInitialSync waits for the initial sync to complete. +func (a *Authorize) WaitForInitialSync(ctx context.Context) error { + for typeURL, ch := range a.dataBrokerInitialSync { + log.Info().Str("type_url", typeURL).Msg("waiting for initial sync") + select { + case <-ch: + case <-ctx.Done(): + return ctx.Err() + } + log.Info().Str("type_url", typeURL).Msg("initial sync complete") + } + return nil +} + func (a *Authorize) runTypesSyncer(ctx context.Context, updateTypes chan<- []string) error { log.Info().Msg("starting type sync") return tryForever(ctx, func(backoff interface{ Reset() }) error { @@ -88,8 +101,9 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error var serverVersion, recordVersion string log.Info().Str("type_url", typeURL).Msg("starting data initial load") - ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.GetAll") + ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.InitialSync") backoff := backoff.NewExponentialBackOff() + backoff.MaxElapsedTime = 0 for { res, err := databroker.InitialSync(ctx, a.state.Load().dataBrokerClient, &databroker.SyncRequest{ Type: typeURL, @@ -113,8 +127,18 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error break } + a.dataBrokerDataLock.Lock() + log.Info().Str("type_url", typeURL).Int("count", a.dataBrokerData.Count(typeURL)).Msg("initial data load complete") + a.dataBrokerDataLock.Unlock() span.End() + if ch, ok := a.dataBrokerInitialSync[typeURL]; ok { + select { + case ch <- struct{}{}: + default: + } + } + log.Info().Str("type_url", typeURL).Msg("starting data syncer") return tryForever(ctx, func(backoff interface{ Reset() }) error { ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.Sync") @@ -176,6 +200,7 @@ func (a *Authorize) updateRecord(record *databroker.Record) { func tryForever(ctx context.Context, callback func(onSuccess interface{ Reset() }) error) error { backoff := backoff.NewExponentialBackOff() + backoff.MaxElapsedTime = 0 for { err := callback(backoff) if err != nil { diff --git a/internal/cmd/pomerium/pomerium.go b/internal/cmd/pomerium/pomerium.go index a7a679158..a62375040 100644 --- a/internal/cmd/pomerium/pomerium.go +++ b/internal/cmd/pomerium/pomerium.go @@ -118,14 +118,21 @@ func Run(ctx context.Context, configFile string) error { // run everything eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - return controlPlane.Run(ctx) - }) if authorizeServer != nil { eg.Go(func() error { return authorizeServer.Run(ctx) }) + // in non-all-in-one mode we will wait for the initial sync to complete before starting + // the control plane + if cacheServer == nil { + if err := authorizeServer.WaitForInitialSync(ctx); err != nil { + return err + } + } } + eg.Go(func() error { + return controlPlane.Run(ctx) + }) if cacheServer != nil { eg.Go(func() error { return cacheServer.Run(ctx)