identity: fix expired session deletion (#3855)

This commit is contained in:
Caleb Doxsey 2023-01-05 13:48:10 -07:00 committed by GitHub
parent e019885218
commit 78fc4853db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 10 deletions

View file

@ -10,6 +10,8 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
"golang.org/x/oauth2" "golang.org/x/oauth2"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/atomicutil" "github.com/pomerium/pomerium/internal/atomicutil"
@ -200,7 +202,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string
Str("user_id", userID). Str("user_id", userID).
Str("session_id", sessionID). Str("session_id", sessionID).
Msg("deleting expired session") Msg("deleting expired session")
mgr.deleteSession(ctx, s.Session) mgr.deleteSession(ctx, userID, sessionID)
return return
} }
@ -226,7 +228,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string
Str("user_id", s.GetUserId()). Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()). Str("session_id", s.GetId()).
Msg("failed to refresh oauth2 token, deleting session") Msg("failed to refresh oauth2 token, deleting session")
mgr.deleteSession(ctx, s.Session) mgr.deleteSession(ctx, userID, sessionID)
return return
} }
s.OauthToken = ToOAuthToken(newToken) s.OauthToken = ToOAuthToken(newToken)
@ -245,7 +247,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string
Str("user_id", s.GetUserId()). Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()). Str("session_id", s.GetId()).
Msg("failed to update user info, deleting session") Msg("failed to update user info, deleting session")
mgr.deleteSession(ctx, s.Session) mgr.deleteSession(ctx, userID, sessionID)
return return
} }
@ -298,7 +300,7 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) {
Str("user_id", s.GetUserId()). Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()). Str("session_id", s.GetId()).
Msg("failed to update user info, deleting session") Msg("failed to update user info, deleting session")
mgr.deleteSession(ctx, s.Session) mgr.deleteSession(ctx, userID, s.GetId())
continue continue
} }
@ -371,12 +373,35 @@ func (mgr *Manager) onUpdateUser(_ context.Context, record *databroker.Record, u
mgr.userScheduler.Add(u.NextRefresh(), u.GetId()) mgr.userScheduler.Add(u.NextRefresh(), u.GetId())
} }
func (mgr *Manager) deleteSession(ctx context.Context, pbSession *session.Session) { func (mgr *Manager) deleteSession(ctx context.Context, userID, sessionID string) {
err := session.Delete(ctx, mgr.cfg.Load().dataBrokerClient, pbSession.GetId()) mgr.sessionScheduler.Remove(toSessionSchedulerKey(userID, sessionID))
mgr.sessions.Delete(userID, sessionID)
client := mgr.cfg.Load().dataBrokerClient
res, err := client.Get(ctx, &databroker.GetRequest{
Type: grpcutil.GetTypeURL(new(session.Session)),
Id: sessionID,
})
if status.Code(err) == codes.NotFound {
return
} else if err != nil {
log.Error(ctx).Err(err).
Str("session_id", sessionID).
Msg("failed to delete session")
return
}
record := res.GetRecord()
record.DeletedAt = timestamppb.Now()
_, err = client.Put(ctx, &databroker.PutRequest{
Records: []*databroker.Record{record},
})
if err != nil { if err != nil {
log.Error(ctx).Err(err). log.Error(ctx).Err(err).
Str("session_id", pbSession.GetId()). Str("session_id", sessionID).
Msg("failed to delete session") Msg("failed to delete session")
return
} }
} }

View file

@ -93,7 +93,17 @@ func TestManager_reportErrors(t *testing.T) {
}, time.Second, time.Millisecond*20, msg) }, time.Second, time.Millisecond*20, msg)
} }
s := &session.Session{
Id: "session1",
UserId: "user1",
OauthToken: &session.OAuthToken{
ExpiresAt: timestamppb.New(time.Now().Add(time.Hour)),
},
ExpiresAt: timestamppb.New(time.Now().Add(time.Hour)),
}
client := mock_databroker.NewMockDataBrokerServiceClient(ctrl) client := mock_databroker.NewMockDataBrokerServiceClient(ctrl)
client.EXPECT().Get(gomock.Any(), gomock.Any()).AnyTimes().Return(&databroker.GetResponse{Record: databroker.NewRecord(s)}, nil)
client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes() client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes()
mgr := New( mgr := New(
WithEventManager(evtMgr), WithEventManager(evtMgr),
@ -103,9 +113,7 @@ func TestManager_reportErrors(t *testing.T) {
mgr.onUpdateRecords(ctx, updateRecordsMessage{ mgr.onUpdateRecords(ctx, updateRecordsMessage{
records: []*databroker.Record{ records: []*databroker.Record{
mkRecord(&session.Session{Id: "session1", UserId: "user1", OauthToken: &session.OAuthToken{ mkRecord(s),
ExpiresAt: timestamppb.New(time.Now().Add(time.Hour)),
}, ExpiresAt: timestamppb.New(time.Now().Add(time.Hour))}),
mkRecord(&user.User{Id: "user1", Name: "user 1", Email: "user1@example.com"}), mkRecord(&user.User{Id: "user1", Name: "user 1", Email: "user1@example.com"}),
}, },
}) })
@ -113,6 +121,13 @@ func TestManager_reportErrors(t *testing.T) {
mgr.refreshUser(ctx, "user1") mgr.refreshUser(ctx, "user1")
expectMsg(metrics_ids.IdentityManagerLastUserRefreshError, "update user info") expectMsg(metrics_ids.IdentityManagerLastUserRefreshError, "update user info")
mgr.onUpdateRecords(ctx, updateRecordsMessage{
records: []*databroker.Record{
mkRecord(s),
mkRecord(&user.User{Id: "user1", Name: "user 1", Email: "user1@example.com"}),
},
})
mgr.refreshSession(ctx, "user1", "session1") mgr.refreshSession(ctx, "user1", "session1")
expectMsg(metrics_ids.IdentityManagerLastSessionRefreshError, "update session") expectMsg(metrics_ids.IdentityManagerLastSessionRefreshError, "update session")
} }