authenticate: add databroker versions to session cookie (#2709)

* authenticate: add databroker versions to session cookie
authorize: wait for databroker synchronization on updated sessions

* fix test
This commit is contained in:
Caleb Doxsey 2021-10-26 14:45:53 -06:00 committed by GitHub
parent b2c76c3816
commit d390e80b30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 192 additions and 32 deletions

View file

@ -72,6 +72,12 @@ func (a *Authorize) forceSync(ctx context.Context, ss *sessions.State) (sessionO
if ss == nil {
return nil, nil, nil
}
// if the session state has databroker versions, wait for those to finish syncing
if ss.DatabrokerServerVersion != 0 && ss.DatabrokerRecordVersion != 0 {
a.forceSyncToVersion(ctx, ss.DatabrokerServerVersion, ss.DatabrokerRecordVersion)
}
s := a.forceSyncSession(ctx, ss.ID)
if s == nil {
return nil, nil, errors.New("session not found")
@ -80,6 +86,29 @@ func (a *Authorize) forceSync(ctx context.Context, ss *sessions.State) (sessionO
return s, u, nil
}
func (a *Authorize) forceSyncToVersion(ctx context.Context, serverVersion, recordVersion uint64) (ready bool) {
ctx, span := trace.StartSpan(ctx, "authorize.forceSyncToVersion")
defer span.End()
ctx, clearTimeout := context.WithTimeout(ctx, forceSyncRecordMaxWait)
defer clearTimeout()
ticker := time.NewTicker(time.Millisecond * 50)
for {
currentServerVersion, currentRecordVersion := a.store.GetDataBrokerVersions()
// check if the local record version is up to date with the expected record version
if currentServerVersion == serverVersion && currentRecordVersion >= recordVersion {
return true
}
select {
case <-ctx.Done():
return false
case <-ticker.C:
}
}
}
func (a *Authorize) forceSyncSession(ctx context.Context, sessionID string) sessionOrServiceAccount {
ctx, span := trace.StartSpan(ctx, "authorize.forceSyncSession")
defer span.End()