move directory providers (#3633)

* remove directory providers and support for groups

* idp: remove directory providers

* better error messages

* fix errors

* restore postgres

* fix test
This commit is contained in:
Caleb Doxsey 2022-11-03 11:33:56 -06:00 committed by GitHub
parent bb5c80bae9
commit c178819875
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
78 changed files with 723 additions and 8703 deletions

View file

@ -3,24 +3,18 @@ package manager
import (
"time"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
var (
defaultGroupRefreshInterval = 15 * time.Minute
defaultGroupRefreshTimeout = 10 * time.Minute
defaultSessionRefreshGracePeriod = 1 * time.Minute
defaultSessionRefreshCoolOffDuration = 10 * time.Second
)
type config struct {
authenticator Authenticator
directory directory.Provider
dataBrokerClient databroker.DataBrokerServiceClient
groupRefreshInterval time.Duration
groupRefreshTimeout time.Duration
sessionRefreshGracePeriod time.Duration
sessionRefreshCoolOffDuration time.Duration
now func() time.Time
@ -29,8 +23,6 @@ type config struct {
func newConfig(options ...Option) *config {
cfg := new(config)
WithGroupRefreshInterval(defaultGroupRefreshInterval)(cfg)
WithGroupRefreshTimeout(defaultGroupRefreshTimeout)(cfg)
WithSessionRefreshGracePeriod(defaultSessionRefreshGracePeriod)(cfg)
WithSessionRefreshCoolOffDuration(defaultSessionRefreshCoolOffDuration)(cfg)
WithNow(time.Now)(cfg)
@ -50,13 +42,6 @@ func WithAuthenticator(authenticator Authenticator) Option {
}
}
// WithDirectoryProvider sets the directory provider in the config.
func WithDirectoryProvider(directoryProvider directory.Provider) Option {
return func(cfg *config) {
cfg.directory = directoryProvider
}
}
// WithDataBrokerClient sets the databroker client in the config.
func WithDataBrokerClient(dataBrokerClient databroker.DataBrokerServiceClient) Option {
return func(cfg *config) {
@ -64,20 +49,6 @@ func WithDataBrokerClient(dataBrokerClient databroker.DataBrokerServiceClient) O
}
}
// WithGroupRefreshInterval sets the group refresh interval used by the manager.
func WithGroupRefreshInterval(interval time.Duration) Option {
return func(cfg *config) {
cfg.groupRefreshInterval = interval
}
}
// WithGroupRefreshTimeout sets the group refresh timeout used by the manager.
func WithGroupRefreshTimeout(timeout time.Duration) Option {
return func(cfg *config) {
cfg.groupRefreshTimeout = timeout
}
}
// WithSessionRefreshGracePeriod sets the session refresh grace period used by the manager.
func WithSessionRefreshGracePeriod(dur time.Duration) Option {
return func(cfg *config) {

View file

@ -12,16 +12,17 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/user"
)
const userRefreshInterval = 10 * time.Minute
// A User is a user managed by the Manager.
type User struct {
*user.User
lastRefresh time.Time
refreshInterval time.Duration
lastRefresh time.Time
}
// NextRefresh returns the next time the user information needs to be refreshed.
func (u User) NextRefresh() time.Time {
return u.lastRefresh.Add(u.refreshInterval)
return u.lastRefresh.Add(userRefreshInterval)
}
// UnmarshalJSON unmarshals json data into the user object.

View file

@ -6,16 +6,13 @@ import (
"errors"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/btree"
"github.com/rs/zerolog"
"golang.org/x/oauth2"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/atomicutil"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity/identity"
"github.com/pomerium/pomerium/internal/log"
@ -26,7 +23,6 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/grpcutil"
metrics_ids "github.com/pomerium/pomerium/pkg/metrics"
"github.com/pomerium/pomerium/pkg/protoutil"
)
// Authenticator is an identity.Provider with only the methods needed by the manager.
@ -49,13 +45,8 @@ type Manager struct {
sessionScheduler *scheduler.Scheduler
userScheduler *scheduler.Scheduler
sessions sessionCollection
users userCollection
directoryUsers map[string]*directory.User
directoryGroups map[string]*directory.Group
directoryBackoff *backoff.ExponentialBackOff
directoryNextRefresh time.Time
sessions sessionCollection
users userCollection
}
// New creates a new identity manager.
@ -68,8 +59,6 @@ func New(
sessionScheduler: scheduler.New(),
userScheduler: scheduler.New(),
}
mgr.directoryBackoff = backoff.NewExponentialBackOff()
mgr.directoryBackoff.MaxElapsedTime = 0
mgr.reset()
mgr.UpdateConfig(options...)
return mgr
@ -131,8 +120,6 @@ func (mgr *Manager) refreshLoop(ctx context.Context, update <-chan updateRecords
}
log.Info(ctx).
Int("directory_groups", len(mgr.directoryGroups)).
Int("directory_users", len(mgr.directoryUsers)).
Int("sessions", mgr.sessions.Len()).
Int("users", mgr.users.Len()).
Msg("initial sync complete")
@ -140,9 +127,6 @@ func (mgr *Manager) refreshLoop(ctx context.Context, update <-chan updateRecords
// start refreshing
maxWait := time.Minute * 10
nextTime := time.Now().Add(maxWait)
if mgr.directoryNextRefresh.Before(nextTime) {
nextTime = mgr.directoryNextRefresh
}
timer := time.NewTimer(time.Until(nextTime))
defer timer.Stop()
@ -161,14 +145,6 @@ func (mgr *Manager) refreshLoop(ctx context.Context, update <-chan updateRecords
now := time.Now()
nextTime = now.Add(maxWait)
// refresh groups
if mgr.directoryNextRefresh.Before(now) {
mgr.directoryNextRefresh = now.Add(mgr.refreshDirectoryUserGroups(ctx))
}
if mgr.directoryNextRefresh.Before(nextTime) {
nextTime = mgr.directoryNextRefresh
}
// refresh sessions
for {
tm, key := mgr.sessionScheduler.Next()
@ -203,128 +179,6 @@ func (mgr *Manager) refreshLoop(ctx context.Context, update <-chan updateRecords
}
}
func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefreshDelay time.Duration) {
log.Info(ctx).Msg("refreshing directory users")
ctx, clearTimeout := context.WithTimeout(ctx, mgr.cfg.Load().groupRefreshTimeout)
defer clearTimeout()
directoryGroups, directoryUsers, err := mgr.cfg.Load().directory.UserGroups(ctx)
metrics.RecordIdentityManagerUserGroupRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastUserGroupRefreshError, err)
if err != nil {
msg := "failed to refresh directory users and groups"
if ctx.Err() != nil {
msg += ". You may need to increase the identity provider directory timeout setting"
msg += "(https://www.pomerium.com/docs/reference/identity-provider-refresh-directory-settings)"
}
log.Warn(ctx).Err(err).Msg(msg)
return minDuration(
mgr.cfg.Load().groupRefreshInterval, // never wait more than the refresh interval
mgr.directoryBackoff.NextBackOff(),
)
}
mgr.directoryBackoff.Reset() // success so reset the backoff
mgr.mergeGroups(ctx, directoryGroups)
mgr.mergeUsers(ctx, directoryUsers)
return mgr.cfg.Load().groupRefreshInterval
}
func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) {
lookup := map[string]*directory.Group{}
for _, dg := range directoryGroups {
lookup[dg.GetId()] = dg
}
var records []*databroker.Record
for groupID, newDG := range lookup {
curDG, ok := mgr.directoryGroups[groupID]
if !ok || !proto.Equal(newDG, curDG) {
id := newDG.GetId()
any := protoutil.NewAny(newDG)
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
})
}
}
for groupID, curDG := range mgr.directoryGroups {
_, ok := lookup[groupID]
if !ok {
id := curDG.GetId()
any := protoutil.NewAny(curDG)
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.New(mgr.cfg.Load().now()),
})
}
}
for i, batch := range databroker.OptimumPutRequestsFromRecords(records) {
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch)
if err != nil {
log.Warn(ctx).Err(err).
Int("batch", i).
Int("record-count", len(batch.GetRecords())).
Msg("manager: failed to update groups")
}
}
}
func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) {
lookup := map[string]*directory.User{}
for _, du := range directoryUsers {
lookup[du.GetId()] = du
}
var records []*databroker.Record
for userID, newDU := range lookup {
curDU, ok := mgr.directoryUsers[userID]
if !ok || !proto.Equal(newDU, curDU) {
id := newDU.GetId()
any := protoutil.NewAny(newDU)
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
})
}
}
for userID, curDU := range mgr.directoryUsers {
_, ok := lookup[userID]
if !ok {
id := curDU.GetId()
any := protoutil.NewAny(curDU)
records = append(records, &databroker.Record{
Type: any.GetTypeUrl(),
Id: id,
Data: any,
DeletedAt: timestamppb.New(mgr.cfg.Load().now()),
})
}
}
for i, batch := range databroker.OptimumPutRequestsFromRecords(records) {
_, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, batch)
if err != nil {
log.Warn(ctx).Err(err).
Int("batch", i).
Int("record-count", len(batch.GetRecords())).
Msg("manager: failed to update users")
}
}
}
func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string) {
log.Info(ctx).
Str("user_id", userID).
@ -464,22 +318,6 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) {
func (mgr *Manager) onUpdateRecords(ctx context.Context, msg updateRecordsMessage) {
for _, record := range msg.records {
switch record.GetType() {
case grpcutil.GetTypeURL(new(directory.Group)):
var pbDirectoryGroup directory.Group
err := record.GetData().UnmarshalTo(&pbDirectoryGroup)
if err != nil {
log.Warn(ctx).Msgf("error unmarshaling directory group: %s", err)
continue
}
mgr.onUpdateDirectoryGroup(ctx, &pbDirectoryGroup)
case grpcutil.GetTypeURL(new(directory.User)):
var pbDirectoryUser directory.User
err := record.GetData().UnmarshalTo(&pbDirectoryUser)
if err != nil {
log.Warn(ctx).Msgf("error unmarshaling directory user: %s", err)
continue
}
mgr.onUpdateDirectoryUser(ctx, &pbDirectoryUser)
case grpcutil.GetTypeURL(new(session.Session)):
var pbSession session.Session
err := record.GetData().UnmarshalTo(&pbSession)
@ -528,20 +366,11 @@ func (mgr *Manager) onUpdateUser(_ context.Context, record *databroker.Record, u
u, _ := mgr.users.Get(user.GetId())
u.lastRefresh = mgr.cfg.Load().now()
u.refreshInterval = mgr.cfg.Load().groupRefreshInterval
u.User = user
mgr.users.ReplaceOrInsert(u)
mgr.userScheduler.Add(u.NextRefresh(), u.GetId())
}
func (mgr *Manager) onUpdateDirectoryUser(_ context.Context, pbDirectoryUser *directory.User) {
mgr.directoryUsers[pbDirectoryUser.GetId()] = pbDirectoryUser
}
func (mgr *Manager) onUpdateDirectoryGroup(_ context.Context, pbDirectoryGroup *directory.Group) {
mgr.directoryGroups[pbDirectoryGroup.GetId()] = pbDirectoryGroup
}
func (mgr *Manager) deleteSession(ctx context.Context, pbSession *session.Session) {
err := session.Delete(ctx, mgr.cfg.Load().dataBrokerClient, pbSession.GetId())
if err != nil {
@ -553,8 +382,6 @@ func (mgr *Manager) deleteSession(ctx context.Context, pbSession *session.Sessio
// reset resets all the manager datastructures to their initial state
func (mgr *Manager) reset() {
mgr.directoryGroups = make(map[string]*directory.Group)
mgr.directoryUsers = make(map[string]*directory.User)
mgr.sessions = sessionCollection{BTree: btree.New(8)}
mgr.users = userCollection{BTree: btree.New(8)}
}
@ -586,13 +413,3 @@ func isTemporaryError(err error) bool {
}
return false
}
func minDuration(d1 time.Duration, ds ...time.Duration) time.Duration {
min := d1
for _, d := range ds {
if d < min {
min = d
}
}
return min
}

View file

@ -3,7 +3,6 @@ package manager
import (
"context"
"errors"
"fmt"
"testing"
"time"
@ -13,7 +12,6 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity/identity"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
@ -24,19 +22,6 @@ import (
"github.com/pomerium/pomerium/pkg/protoutil"
)
type mockProvider struct {
user func(ctx context.Context, userID, accessToken string) (*directory.User, error)
userGroups func(ctx context.Context) ([]*directory.Group, []*directory.User, error)
}
func (mock mockProvider) User(ctx context.Context, userID, accessToken string) (*directory.User, error) {
return mock.user(ctx, userID, accessToken)
}
func (mock mockProvider) UserGroups(ctx context.Context) ([]*directory.Group, []*directory.User, error) {
return mock.userGroups(ctx)
}
type mockAuthenticator struct{}
func (mock mockAuthenticator) Refresh(_ context.Context, _ *oauth2.Token, _ identity.State) (*oauth2.Token, error) {
@ -61,72 +46,25 @@ func TestManager_onUpdateRecords(t *testing.T) {
mgr := New(
WithDataBrokerClient(mock_databroker.NewMockDataBrokerServiceClient(ctrl)),
WithDirectoryProvider(mockProvider{}),
WithGroupRefreshInterval(time.Hour),
WithNow(func() time.Time {
return now
}),
)
mgr.directoryBackoff.RandomizationFactor = 0 // disable randomization for deterministic testing
mgr.onUpdateRecords(ctx, updateRecordsMessage{
records: []*databroker.Record{
mkRecord(&directory.Group{Id: "group1", Name: "group 1", Email: "group1@example.com"}),
mkRecord(&directory.User{Id: "user1", DisplayName: "user 1", Email: "user1@example.com", GroupIds: []string{"group1s"}}),
mkRecord(&session.Session{Id: "session1", UserId: "user1"}),
mkRecord(&user.User{Id: "user1", Name: "user 1", Email: "user1@example.com"}),
},
})
assert.NotNil(t, mgr.directoryGroups["group1"])
assert.NotNil(t, mgr.directoryUsers["user1"])
if _, ok := mgr.sessions.Get("user1", "session1"); assert.True(t, ok) {
}
if _, ok := mgr.users.Get("user1"); assert.True(t, ok) {
tm, id := mgr.userScheduler.Next()
assert.Equal(t, now.Add(time.Hour), tm)
assert.Equal(t, now.Add(userRefreshInterval), tm)
assert.Equal(t, "user1", id)
}
}
func TestManager_refreshDirectoryUserGroups(t *testing.T) {
ctrl := gomock.NewController(t)
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
defer clearTimeout()
t.Run("backoff", func(t *testing.T) {
cnt := 0
client := mock_databroker.NewMockDataBrokerServiceClient(ctrl)
client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes()
mgr := New(
WithDataBrokerClient(client),
WithDirectoryProvider(mockProvider{
userGroups: func(ctx context.Context) ([]*directory.Group, []*directory.User, error) {
cnt++
switch cnt {
case 1:
return nil, nil, fmt.Errorf("error 1")
case 2:
return nil, nil, fmt.Errorf("error 2")
}
return nil, nil, nil
},
}),
WithGroupRefreshInterval(time.Hour),
)
mgr.directoryBackoff.RandomizationFactor = 0 // disable randomization for deterministic testing
dur1 := mgr.refreshDirectoryUserGroups(ctx)
dur2 := mgr.refreshDirectoryUserGroups(ctx)
dur3 := mgr.refreshDirectoryUserGroups(ctx)
assert.Greater(t, dur2, dur1)
assert.Greater(t, dur3, dur2)
assert.Equal(t, time.Hour, dur3)
})
}
func TestManager_reportErrors(t *testing.T) {
@ -161,22 +99,10 @@ func TestManager_reportErrors(t *testing.T) {
WithEventManager(evtMgr),
WithDataBrokerClient(client),
WithAuthenticator(mockAuthenticator{}),
WithDirectoryProvider(mockProvider{
user: func(ctx context.Context, userID, accessToken string) (*directory.User, error) {
return nil, fmt.Errorf("user")
},
userGroups: func(ctx context.Context) ([]*directory.Group, []*directory.User, error) {
return nil, nil, fmt.Errorf("user groups")
},
}),
WithGroupRefreshInterval(time.Second),
)
mgr.directoryBackoff.RandomizationFactor = 0 // disable randomization for deterministic testing
mgr.onUpdateRecords(ctx, updateRecordsMessage{
records: []*databroker.Record{
mkRecord(&directory.Group{Id: "group1", Name: "group 1", Email: "group1@example.com"}),
mkRecord(&directory.User{Id: "user1", DisplayName: "user 1", Email: "user1@example.com", GroupIds: []string{"group1s"}}),
mkRecord(&session.Session{Id: "session1", UserId: "user1", OauthToken: &session.OAuthToken{
ExpiresAt: timestamppb.New(time.Now().Add(time.Hour)),
}, ExpiresAt: timestamppb.New(time.Now().Add(time.Hour))}),
@ -184,9 +110,6 @@ func TestManager_reportErrors(t *testing.T) {
},
})
_ = mgr.refreshDirectoryUserGroups(ctx)
expectMsg(metrics_ids.IdentityManagerLastUserGroupRefreshError, "user groups")
mgr.refreshUser(ctx, "user1")
expectMsg(metrics_ids.IdentityManagerLastUserRefreshError, "update user info")

View file

@ -26,10 +26,6 @@ type Options struct {
// Scope specifies optional requested permissions.
Scopes []string
// ServiceAccount can be set for those providers that require additional
// credentials or tokens to do follow up API calls (e.g. Google)
ServiceAccount string
// AuthCodeOptions specifies additional key value pairs query params to add
// to the request flow signin url.
AuthCodeOptions map[string]string

View file

@ -27,7 +27,6 @@ var defaultScopes = []string{oidc.ScopeOpenID, "profile", "email"}
// requires we set this on a custom uri param. Also, ` prompt` must be set to `consent`to ensure
// that our application always receives a refresh token (ask google). And finally, we default to
// having the user select which Google account they'd like to use.
//
// For more details, please see google's documentation:
//
// https://developers.google.com/identity/protocols/oauth2/web-server#offline