From a7bd284b521c6cec98ec02b6fda75caaa891c2d2 Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Wed, 8 Jun 2022 16:48:15 -0600 Subject: [PATCH] identity: batch directory updates (#3411) * identity: batch directory updates * add batch details to log message --- internal/identity/manager/manager.go | 125 +++++++--------------- internal/identity/manager/manager_test.go | 10 ++ 2 files changed, 48 insertions(+), 87 deletions(-) diff --git a/internal/identity/manager/manager.go b/internal/identity/manager/manager.go index 40877b69d..f2f814361 100644 --- a/internal/identity/manager/manager.go +++ b/internal/identity/manager/manager.go @@ -4,7 +4,6 @@ package manager import ( "context" "errors" - "fmt" "time" "github.com/cenkalti/backoff/v4" @@ -12,7 +11,6 @@ import ( "github.com/rs/zerolog" "golang.org/x/oauth2" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -28,10 +26,6 @@ import ( "github.com/pomerium/pomerium/pkg/protoutil" ) -const ( - dataBrokerParallelism = 10 -) - // Authenticator is an identity.Provider with only the methods needed by the manager. type Authenticator interface { Refresh(context.Context, *oauth2.Token, identity.State) (*oauth2.Token, error) @@ -59,8 +53,6 @@ type Manager struct { directoryBackoff *backoff.ExponentialBackOff directoryNextRefresh time.Time - - dataBrokerSemaphore *semaphore.Weighted } // New creates a new identity manager. @@ -72,8 +64,6 @@ func New( sessionScheduler: scheduler.New(), userScheduler: scheduler.New(), - - dataBrokerSemaphore: semaphore.NewWeighted(dataBrokerParallelism), } mgr.directoryBackoff = backoff.NewExponentialBackOff() mgr.directoryBackoff.MaxElapsedTime = 0 @@ -240,35 +230,22 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh } func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) { - eg, ctx := errgroup.WithContext(ctx) - lookup := map[string]*directory.Group{} for _, dg := range directoryGroups { lookup[dg.GetId()] = dg } + var records []*databroker.Record + for groupID, newDG := range lookup { curDG, ok := mgr.directoryGroups[groupID] if !ok || !proto.Equal(newDG, curDG) { id := newDG.GetId() any := protoutil.NewAny(newDG) - eg.Go(func() error { - if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { - return err - } - defer mgr.dataBrokerSemaphore.Release(1) - - _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{ - Records: []*databroker.Record{{ - Type: any.GetTypeUrl(), - Id: id, - Data: any, - }}, - }) - if err != nil { - return fmt.Errorf("failed to update directory group: %s", id) - } - return nil + records = append(records, &databroker.Record{ + Type: any.GetTypeUrl(), + Id: id, + Data: any, }) } } @@ -278,62 +255,43 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director if !ok { id := curDG.GetId() any := protoutil.NewAny(curDG) - eg.Go(func() error { - if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { - return err - } - defer mgr.dataBrokerSemaphore.Release(1) - - _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{ - Records: []*databroker.Record{{ - Type: any.GetTypeUrl(), - Id: id, - DeletedAt: timestamppb.Now(), - }}, - }) - if err != nil { - return fmt.Errorf("failed to delete directory group: %s", id) - } - return nil + records = append(records, &databroker.Record{ + Type: any.GetTypeUrl(), + Id: id, + Data: any, + DeletedAt: timestamppb.New(mgr.cfg.Load().now()), }) } } - if err := eg.Wait(); err != nil { - log.Warn(ctx).Err(err).Msg("manager: failed to merge groups") + for i, batch := range databroker.OptimumPutRequestsFromRecords(records) { + _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch) + if err != nil { + log.Warn(ctx).Err(err). + Int("batch", i). + Int("record-count", len(batch.GetRecords())). + Msg("manager: failed to update groups") + } } } func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) { - eg, ctx := errgroup.WithContext(ctx) - lookup := map[string]*directory.User{} for _, du := range directoryUsers { lookup[du.GetId()] = du } + var records []*databroker.Record + for userID, newDU := range lookup { curDU, ok := mgr.directoryUsers[userID] if !ok || !proto.Equal(newDU, curDU) { id := newDU.GetId() any := protoutil.NewAny(newDU) - eg.Go(func() error { - if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { - return err - } - defer mgr.dataBrokerSemaphore.Release(1) - - client := mgr.cfg.Load().dataBrokerClient - if _, err := client.Put(ctx, &databroker.PutRequest{ - Records: []*databroker.Record{{ - Type: any.GetTypeUrl(), - Id: id, - Data: any, - }}, - }); err != nil { - return fmt.Errorf("failed to update directory user: %s", id) - } - return nil + records = append(records, &databroker.Record{ + Type: any.GetTypeUrl(), + Id: id, + Data: any, }) } } @@ -343,30 +301,23 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory. if !ok { id := curDU.GetId() any := protoutil.NewAny(curDU) - eg.Go(func() error { - if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { - return err - } - defer mgr.dataBrokerSemaphore.Release(1) - - client := mgr.cfg.Load().dataBrokerClient - if _, err := client.Put(ctx, &databroker.PutRequest{ - Records: []*databroker.Record{{ - Type: any.GetTypeUrl(), - Id: id, - Data: any, - DeletedAt: timestamppb.Now(), - }}, - }); err != nil { - return fmt.Errorf("failed to delete directory user (%s): %w", id, err) - } - return nil + records = append(records, &databroker.Record{ + Type: any.GetTypeUrl(), + Id: id, + Data: any, + DeletedAt: timestamppb.New(mgr.cfg.Load().now()), }) } } - if err := eg.Wait(); err != nil { - log.Warn(ctx).Err(err).Msg("manager: failed to merge users") + for i, batch := range databroker.OptimumPutRequestsFromRecords(records) { + _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch) + if err != nil { + log.Warn(ctx).Err(err). + Int("batch", i). + Int("record-count", len(batch.GetRecords())). + Msg("manager: failed to update users") + } } } diff --git a/internal/identity/manager/manager_test.go b/internal/identity/manager/manager_test.go index 41578a53d..2551d3b8f 100644 --- a/internal/identity/manager/manager_test.go +++ b/internal/identity/manager/manager_test.go @@ -6,11 +6,13 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" "github.com/pomerium/pomerium/internal/directory" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/grpc/databroker/mock_databroker" "github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/protoutil" @@ -30,12 +32,15 @@ func (mock mockProvider) UserGroups(ctx context.Context) ([]*directory.Group, [] } func TestManager_onUpdateRecords(t *testing.T) { + ctrl := gomock.NewController(t) + ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10) defer clearTimeout() now := time.Now() mgr := New( + WithDataBrokerClient(mock_databroker.NewMockDataBrokerServiceClient(ctrl)), WithDirectoryProvider(mockProvider{}), WithGroupRefreshInterval(time.Hour), WithNow(func() time.Time { @@ -67,12 +72,17 @@ func TestManager_onUpdateRecords(t *testing.T) { } func TestManager_refreshDirectoryUserGroups(t *testing.T) { + ctrl := gomock.NewController(t) + ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10) defer clearTimeout() t.Run("backoff", func(t *testing.T) { cnt := 0 + client := mock_databroker.NewMockDataBrokerServiceClient(ctrl) + client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes() mgr := New( + WithDataBrokerClient(client), WithDirectoryProvider(mockProvider{ userGroups: func(ctx context.Context) ([]*directory.Group, []*directory.User, error) { cnt++