mirror of
https://github.com/pomerium/pomerium.git
synced 2025-06-06 04:42:56 +02:00
wait for initial sync to complete before starting control plane (#1636)
This commit is contained in:
parent
e5d55f300e
commit
3f7777f7e0
4 changed files with 49 additions and 6 deletions
|
@ -27,6 +27,8 @@ type Authorize struct {
|
|||
|
||||
dataBrokerDataLock sync.RWMutex
|
||||
dataBrokerData evaluator.DataBrokerData
|
||||
|
||||
dataBrokerInitialSync map[string]chan struct{}
|
||||
}
|
||||
|
||||
// New validates and creates a new Authorize service from a set of config options.
|
||||
|
@ -36,6 +38,10 @@ func New(cfg *config.Config) (*Authorize, error) {
|
|||
store: evaluator.NewStore(),
|
||||
templates: template.Must(frontend.NewTemplates()),
|
||||
dataBrokerData: make(evaluator.DataBrokerData),
|
||||
dataBrokerInitialSync: map[string]chan struct{}{
|
||||
"type.googleapis.com/directory.Group": make(chan struct{}, 1),
|
||||
"type.googleapis.com/directory.User": make(chan struct{}, 1),
|
||||
},
|
||||
}
|
||||
|
||||
state, err := newAuthorizeStateFromConfig(cfg, a.store)
|
||||
|
|
|
@ -420,6 +420,11 @@ func (dbd DataBrokerData) Clear(typeURL string) {
|
|||
delete(dbd, typeURL)
|
||||
}
|
||||
|
||||
// Count returns the number of entries for the given type URL.
|
||||
func (dbd DataBrokerData) Count(typeURL string) int {
|
||||
return len(dbd[typeURL])
|
||||
}
|
||||
|
||||
// Get gets a record from the DataBrokerData.
|
||||
func (dbd DataBrokerData) Get(typeURL, id string) interface{} {
|
||||
m, ok := dbd[typeURL]
|
||||
|
|
|
@ -5,13 +5,12 @@ import (
|
|||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||
)
|
||||
|
||||
|
@ -31,6 +30,20 @@ func (a *Authorize) Run(ctx context.Context) error {
|
|||
return eg.Wait()
|
||||
}
|
||||
|
||||
// WaitForInitialSync waits for the initial sync to complete.
|
||||
func (a *Authorize) WaitForInitialSync(ctx context.Context) error {
|
||||
for typeURL, ch := range a.dataBrokerInitialSync {
|
||||
log.Info().Str("type_url", typeURL).Msg("waiting for initial sync")
|
||||
select {
|
||||
case <-ch:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
log.Info().Str("type_url", typeURL).Msg("initial sync complete")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Authorize) runTypesSyncer(ctx context.Context, updateTypes chan<- []string) error {
|
||||
log.Info().Msg("starting type sync")
|
||||
return tryForever(ctx, func(backoff interface{ Reset() }) error {
|
||||
|
@ -88,8 +101,9 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error
|
|||
var serverVersion, recordVersion string
|
||||
|
||||
log.Info().Str("type_url", typeURL).Msg("starting data initial load")
|
||||
ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.GetAll")
|
||||
ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.InitialSync")
|
||||
backoff := backoff.NewExponentialBackOff()
|
||||
backoff.MaxElapsedTime = 0
|
||||
for {
|
||||
res, err := databroker.InitialSync(ctx, a.state.Load().dataBrokerClient, &databroker.SyncRequest{
|
||||
Type: typeURL,
|
||||
|
@ -113,8 +127,18 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error
|
|||
|
||||
break
|
||||
}
|
||||
a.dataBrokerDataLock.Lock()
|
||||
log.Info().Str("type_url", typeURL).Int("count", a.dataBrokerData.Count(typeURL)).Msg("initial data load complete")
|
||||
a.dataBrokerDataLock.Unlock()
|
||||
span.End()
|
||||
|
||||
if ch, ok := a.dataBrokerInitialSync[typeURL]; ok {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Str("type_url", typeURL).Msg("starting data syncer")
|
||||
return tryForever(ctx, func(backoff interface{ Reset() }) error {
|
||||
ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.Sync")
|
||||
|
@ -176,6 +200,7 @@ func (a *Authorize) updateRecord(record *databroker.Record) {
|
|||
|
||||
func tryForever(ctx context.Context, callback func(onSuccess interface{ Reset() }) error) error {
|
||||
backoff := backoff.NewExponentialBackOff()
|
||||
backoff.MaxElapsedTime = 0
|
||||
for {
|
||||
err := callback(backoff)
|
||||
if err != nil {
|
||||
|
|
|
@ -118,14 +118,21 @@ func Run(ctx context.Context, configFile string) error {
|
|||
|
||||
// run everything
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error {
|
||||
return controlPlane.Run(ctx)
|
||||
})
|
||||
if authorizeServer != nil {
|
||||
eg.Go(func() error {
|
||||
return authorizeServer.Run(ctx)
|
||||
})
|
||||
// in non-all-in-one mode we will wait for the initial sync to complete before starting
|
||||
// the control plane
|
||||
if cacheServer == nil {
|
||||
if err := authorizeServer.WaitForInitialSync(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
eg.Go(func() error {
|
||||
return controlPlane.Run(ctx)
|
||||
})
|
||||
if cacheServer != nil {
|
||||
eg.Go(func() error {
|
||||
return cacheServer.Run(ctx)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue