mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-03 16:59:22 +02:00
directory.Group entry for groups (#1118)
* store directory groups separate from directory users * fix group lookup, azure display name * remove fields restriction * fix test * also support email * use Email as name for google' * remove changed file * show groups on dashboard * fix test * re-add accidentally removed code
This commit is contained in:
parent
489cdd8b63
commit
1ad243dfd1
25 changed files with 525 additions and 209 deletions
|
@ -48,7 +48,12 @@ type Manager struct {
|
|||
directoryUsers map[string]*directory.User
|
||||
directoryUsersServerVersion string
|
||||
directoryUsersRecordVersion string
|
||||
directoryUsersNextRefresh time.Time
|
||||
|
||||
directoryGroups map[string]*directory.Group
|
||||
directoryGroupsServerVersion string
|
||||
directoryGroupsRecordVersion string
|
||||
|
||||
directoryNextRefresh time.Time
|
||||
}
|
||||
|
||||
// New creates a new identity manager.
|
||||
|
@ -83,7 +88,12 @@ func New(
|
|||
|
||||
// Run runs the manager. This method blocks until an error occurs or the given context is canceled.
|
||||
func (mgr *Manager) Run(ctx context.Context) error {
|
||||
err := mgr.initDirectoryUsers(ctx)
|
||||
err := mgr.initDirectoryGroups(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize directory groups: %w", err)
|
||||
}
|
||||
|
||||
err = mgr.initDirectoryUsers(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize directory users: %w", err)
|
||||
}
|
||||
|
@ -100,13 +110,18 @@ func (mgr *Manager) Run(ctx context.Context) error {
|
|||
return mgr.syncUsers(ctx, updatedUser)
|
||||
})
|
||||
|
||||
updatedDirectoryGroup := make(chan *directory.Group, 1)
|
||||
t.Go(func() error {
|
||||
return mgr.syncDirectoryGroups(ctx, updatedDirectoryGroup)
|
||||
})
|
||||
|
||||
updatedDirectoryUser := make(chan *directory.User, 1)
|
||||
t.Go(func() error {
|
||||
return mgr.syncDirectoryUsers(ctx, updatedDirectoryUser)
|
||||
})
|
||||
|
||||
t.Go(func() error {
|
||||
return mgr.refreshLoop(ctx, updatedSession, updatedUser, updatedDirectoryUser)
|
||||
return mgr.refreshLoop(ctx, updatedSession, updatedUser, updatedDirectoryUser, updatedDirectoryGroup)
|
||||
})
|
||||
|
||||
return t.Wait()
|
||||
|
@ -117,11 +132,12 @@ func (mgr *Manager) refreshLoop(
|
|||
updatedSession <-chan *session.Session,
|
||||
updatedUser <-chan *user.User,
|
||||
updatedDirectoryUser <-chan *directory.User,
|
||||
updatedDirectoryGroup <-chan *directory.Group,
|
||||
) error {
|
||||
maxWait := time.Minute * 10
|
||||
nextTime := time.Now().Add(maxWait)
|
||||
if mgr.directoryUsersNextRefresh.Before(nextTime) {
|
||||
nextTime = mgr.directoryUsersNextRefresh
|
||||
if mgr.directoryNextRefresh.Before(nextTime) {
|
||||
nextTime = mgr.directoryNextRefresh
|
||||
}
|
||||
|
||||
timer := time.NewTimer(time.Until(nextTime))
|
||||
|
@ -137,6 +153,8 @@ func (mgr *Manager) refreshLoop(
|
|||
mgr.onUpdateUser(ctx, u)
|
||||
case du := <-updatedDirectoryUser:
|
||||
mgr.onUpdateDirectoryUser(ctx, du)
|
||||
case dg := <-updatedDirectoryGroup:
|
||||
mgr.onUpdateDirectoryGroup(ctx, dg)
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
|
@ -144,11 +162,11 @@ func (mgr *Manager) refreshLoop(
|
|||
nextTime := now.Add(maxWait)
|
||||
|
||||
// refresh groups
|
||||
if mgr.directoryUsersNextRefresh.Before(now) {
|
||||
mgr.refreshDirectoryUsers(ctx)
|
||||
mgr.directoryUsersNextRefresh = now.Add(mgr.cfg.groupRefreshInterval)
|
||||
if mgr.directoryUsersNextRefresh.Before(nextTime) {
|
||||
nextTime = mgr.directoryUsersNextRefresh
|
||||
if mgr.directoryNextRefresh.Before(now) {
|
||||
mgr.refreshDirectoryUserGroups(ctx)
|
||||
mgr.directoryNextRefresh = now.Add(mgr.cfg.groupRefreshInterval)
|
||||
if mgr.directoryNextRefresh.Before(nextTime) {
|
||||
nextTime = mgr.directoryNextRefresh
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,18 +203,69 @@ func (mgr *Manager) refreshLoop(
|
|||
}
|
||||
}
|
||||
|
||||
func (mgr *Manager) refreshDirectoryUsers(ctx context.Context) {
|
||||
func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) {
|
||||
mgr.log.Info().Msg("refreshing directory users")
|
||||
|
||||
ctx, clearTimeout := context.WithTimeout(ctx, mgr.cfg.groupRefreshTimeout)
|
||||
defer clearTimeout()
|
||||
|
||||
directoryUsers, err := mgr.directory.UserGroups(ctx)
|
||||
directoryGroups, directoryUsers, err := mgr.directory.UserGroups(ctx)
|
||||
if err != nil {
|
||||
mgr.log.Warn().Err(err).Msg("failed to refresh directory users and groups")
|
||||
return
|
||||
}
|
||||
|
||||
mgr.mergeGroups(ctx, directoryGroups)
|
||||
mgr.mergeUsers(ctx, directoryUsers)
|
||||
}
|
||||
|
||||
func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) {
|
||||
lookup := map[string]*directory.Group{}
|
||||
for _, dg := range directoryGroups {
|
||||
lookup[dg.GetId()] = dg
|
||||
}
|
||||
|
||||
for groupID, newDG := range lookup {
|
||||
curDG, ok := mgr.directoryGroups[groupID]
|
||||
if !ok || !proto.Equal(newDG, curDG) {
|
||||
any, err := ptypes.MarshalAny(newDG)
|
||||
if err != nil {
|
||||
mgr.log.Warn().Err(err).Msg("failed to marshal directory group")
|
||||
return
|
||||
}
|
||||
_, err = mgr.dataBrokerClient.Set(ctx, &databroker.SetRequest{
|
||||
Type: any.GetTypeUrl(),
|
||||
Id: newDG.GetId(),
|
||||
Data: any,
|
||||
})
|
||||
if err != nil {
|
||||
mgr.log.Warn().Err(err).Msg("failed to update directory group")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for groupID, curDG := range mgr.directoryGroups {
|
||||
_, ok := lookup[groupID]
|
||||
if !ok {
|
||||
any, err := ptypes.MarshalAny(curDG)
|
||||
if err != nil {
|
||||
mgr.log.Warn().Err(err).Msg("failed to marshal directory group")
|
||||
return
|
||||
}
|
||||
_, err = mgr.dataBrokerClient.Delete(ctx, &databroker.DeleteRequest{
|
||||
Type: any.GetTypeUrl(),
|
||||
Id: curDG.GetId(),
|
||||
})
|
||||
if err != nil {
|
||||
mgr.log.Warn().Err(err).Msg("failed to delete directory group")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) {
|
||||
lookup := map[string]*directory.User{}
|
||||
for _, du := range directoryUsers {
|
||||
lookup[du.GetId()] = du
|
||||
|
@ -497,6 +566,75 @@ func (mgr *Manager) syncDirectoryUsers(ctx context.Context, ch chan<- *directory
|
|||
}
|
||||
}
|
||||
|
||||
func (mgr *Manager) initDirectoryGroups(ctx context.Context) error {
|
||||
mgr.log.Info().Msg("initializing directory groups")
|
||||
|
||||
any, err := ptypes.MarshalAny(new(directory.Group))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := mgr.dataBrokerClient.GetAll(ctx, &databroker.GetAllRequest{
|
||||
Type: any.GetTypeUrl(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error getting all directory groups: %w", err)
|
||||
}
|
||||
|
||||
mgr.directoryGroups = map[string]*directory.Group{}
|
||||
for _, record := range res.GetRecords() {
|
||||
var pbDirectoryGroup directory.Group
|
||||
err := ptypes.UnmarshalAny(record.GetData(), &pbDirectoryGroup)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error unmarshaling directory group: %w", err)
|
||||
}
|
||||
|
||||
mgr.directoryGroups[pbDirectoryGroup.GetId()] = &pbDirectoryGroup
|
||||
}
|
||||
mgr.directoryGroupsRecordVersion = res.GetRecordVersion()
|
||||
mgr.directoryGroupsServerVersion = res.GetServerVersion()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *Manager) syncDirectoryGroups(ctx context.Context, ch chan<- *directory.Group) error {
|
||||
mgr.log.Info().Msg("syncing directory groups")
|
||||
|
||||
any, err := ptypes.MarshalAny(new(directory.Group))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client, err := mgr.dataBrokerClient.Sync(ctx, &databroker.SyncRequest{
|
||||
Type: any.GetTypeUrl(),
|
||||
ServerVersion: mgr.directoryGroupsServerVersion,
|
||||
RecordVersion: mgr.directoryGroupsRecordVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error syncing directory groups: %w", err)
|
||||
}
|
||||
for {
|
||||
res, err := client.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error receiving directory groups: %w", err)
|
||||
}
|
||||
|
||||
for _, record := range res.GetRecords() {
|
||||
var pbDirectoryGroup directory.Group
|
||||
err := ptypes.UnmarshalAny(record.GetData(), &pbDirectoryGroup)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error unmarshaling directory group: %w", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case ch <- &pbDirectoryGroup:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *Manager) onUpdateSession(ctx context.Context, pbSession *session.Session) {
|
||||
mgr.sessionScheduler.Remove(toSessionSchedulerKey(pbSession.GetUserId(), pbSession.GetId()))
|
||||
|
||||
|
@ -543,6 +681,10 @@ func (mgr *Manager) onUpdateDirectoryUser(_ context.Context, pbDirectoryUser *di
|
|||
mgr.directoryUsers[pbDirectoryUser.GetId()] = pbDirectoryUser
|
||||
}
|
||||
|
||||
func (mgr *Manager) onUpdateDirectoryGroup(_ context.Context, pbDirectoryGroup *directory.Group) {
|
||||
mgr.directoryGroups[pbDirectoryGroup.GetId()] = pbDirectoryGroup
|
||||
}
|
||||
|
||||
func (mgr *Manager) createUser(ctx context.Context, pbSession *session.Session) {
|
||||
u := User{
|
||||
User: &user.User{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue