identity: batch directory updates (#3411)

* identity: batch directory updates

* add batch details to log message
This commit is contained in:
Caleb Doxsey 2022-06-08 16:48:15 -06:00 committed by GitHub
parent 493148b13f
commit a7bd284b52
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 87 deletions

View file

@ -4,7 +4,6 @@ package manager
import (
"context"
"errors"
"fmt"
"time"
"github.com/cenkalti/backoff/v4"
@ -12,7 +11,6 @@ import (
"github.com/rs/zerolog"
"golang.org/x/oauth2"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
@ -28,10 +26,6 @@ import (
"github.com/pomerium/pomerium/pkg/protoutil"
)
const (
dataBrokerParallelism = 10
)
// Authenticator is an identity.Provider with only the methods needed by the manager.
type Authenticator interface {
Refresh(context.Context, *oauth2.Token, identity.State) (*oauth2.Token, error)
@ -59,8 +53,6 @@ type Manager struct {
directoryBackoff *backoff.ExponentialBackOff
directoryNextRefresh time.Time
dataBrokerSemaphore *semaphore.Weighted
}
// New creates a new identity manager.
@ -72,8 +64,6 @@ func New(
sessionScheduler: scheduler.New(),
userScheduler: scheduler.New(),
dataBrokerSemaphore: semaphore.NewWeighted(dataBrokerParallelism),
}
mgr.directoryBackoff = backoff.NewExponentialBackOff()
mgr.directoryBackoff.MaxElapsedTime = 0
@ -240,35 +230,22 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh
}
func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) {
eg, ctx := errgroup.WithContext(ctx)
lookup := map[string]*directory.Group{}
for _, dg := range directoryGroups {
lookup[dg.GetId()] = dg
}
var records []*databroker.Record
for groupID, newDG := range lookup {
curDG, ok := mgr.directoryGroups[groupID]
if !ok || !proto.Equal(newDG, curDG) {
id := newDG.GetId()
any := protoutil.NewAny(newDG)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
}},
})
if err != nil {
return fmt.Errorf("failed to update directory group: %s", id)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
})
}
}
@ -278,62 +255,43 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director
if !ok {
id := curDG.GetId()
any := protoutil.NewAny(curDG)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
DeletedAt: timestamppb.Now(),
}},
})
if err != nil {
return fmt.Errorf("failed to delete directory group: %s", id)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.New(mgr.cfg.Load().now()),
})
}
}
if err := eg.Wait(); err != nil {
log.Warn(ctx).Err(err).Msg("manager: failed to merge groups")
for i, batch := range databroker.OptimumPutRequestsFromRecords(records) {
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch)
if err != nil {
log.Warn(ctx).Err(err).
Int("batch", i).
Int("record-count", len(batch.GetRecords())).
Msg("manager: failed to update groups")
}
}
}
func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) {
eg, ctx := errgroup.WithContext(ctx)
lookup := map[string]*directory.User{}
for _, du := range directoryUsers {
lookup[du.GetId()] = du
}
var records []*databroker.Record
for userID, newDU := range lookup {
curDU, ok := mgr.directoryUsers[userID]
if !ok || !proto.Equal(newDU, curDU) {
id := newDU.GetId()
any := protoutil.NewAny(newDU)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)
client := mgr.cfg.Load().dataBrokerClient
if _, err := client.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
}},
}); err != nil {
return fmt.Errorf("failed to update directory user: %s", id)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
})
}
}
@ -343,30 +301,23 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.
if !ok {
id := curDU.GetId()
any := protoutil.NewAny(curDU)
eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err
}
defer mgr.dataBrokerSemaphore.Release(1)
client := mgr.cfg.Load().dataBrokerClient
if _, err := client.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.Now(),
}},
}); err != nil {
return fmt.Errorf("failed to delete directory user (%s): %w", id, err)
}
return nil
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.New(mgr.cfg.Load().now()),
})
}
}
if err := eg.Wait(); err != nil {
log.Warn(ctx).Err(err).Msg("manager: failed to merge users")
for i, batch := range databroker.OptimumPutRequestsFromRecords(records) {
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch)
if err != nil {
log.Warn(ctx).Err(err).
Int("batch", i).
Int("record-count", len(batch.GetRecords())).
Msg("manager: failed to update users")
}
}
}

View file

@ -6,11 +6,13 @@ import (
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/databroker/mock_databroker"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/protoutil"
@ -30,12 +32,15 @@ func (mock mockProvider) UserGroups(ctx context.Context) ([]*directory.Group, []
}
func TestManager_onUpdateRecords(t *testing.T) {
ctrl := gomock.NewController(t)
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
defer clearTimeout()
now := time.Now()
mgr := New(
WithDataBrokerClient(mock_databroker.NewMockDataBrokerServiceClient(ctrl)),
WithDirectoryProvider(mockProvider{}),
WithGroupRefreshInterval(time.Hour),
WithNow(func() time.Time {
@ -67,12 +72,17 @@ func TestManager_onUpdateRecords(t *testing.T) {
}
func TestManager_refreshDirectoryUserGroups(t *testing.T) {
ctrl := gomock.NewController(t)
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
defer clearTimeout()
t.Run("backoff", func(t *testing.T) {
cnt := 0
client := mock_databroker.NewMockDataBrokerServiceClient(ctrl)
client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes()
mgr := New(
WithDataBrokerClient(client),
WithDirectoryProvider(mockProvider{
userGroups: func(ctx context.Context) ([]*directory.Group, []*directory.User, error) {
cnt++