backport: authorize: cache warming (#5439) (#5441)

authorize: cache warming (#5439)

* authorize: cache warming

* add Authorize to test?

* remove tracing querier

* only update connection when it changes
This commit is contained in:
Caleb Doxsey 2025-01-22 09:51:17 -07:00 committed by GitHub
parent fbe693cfb3
commit ede8248607
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 234 additions and 180 deletions

View file

@ -12,6 +12,7 @@ import (
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/pomerium/datasource/pkg/directory"
"github.com/pomerium/pomerium/authorize/evaluator"
"github.com/pomerium/pomerium/authorize/internal/store"
"github.com/pomerium/pomerium/config"
@ -20,18 +21,18 @@ import (
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/storage"
)
// Authorize struct holds
type Authorize struct {
state *atomicutil.Value[*authorizeState]
store *store.Store
currentOptions *atomicutil.Value[*config.Options]
accessTracker *AccessTracker
globalCache storage.Cache
state *atomicutil.Value[*authorizeState]
store *store.Store
currentOptions *atomicutil.Value[*config.Options]
accessTracker *AccessTracker
globalCache storage.Cache
groupsCacheWarmer *cacheWarmer
// The stateLock prevents updating the evaluator store simultaneously with an evaluation.
// This should provide a consistent view of the data at a given server/record version and
@ -54,6 +55,7 @@ func New(ctx context.Context, cfg *config.Config) (*Authorize, error) {
}
a.state = atomicutil.NewValue(state)
a.groupsCacheWarmer = newCacheWarmer(state.dataBrokerClientConnection, a.globalCache, directory.GroupRecordType)
return a, nil
}
@ -70,7 +72,7 @@ func (a *Authorize) Run(ctx context.Context) error {
return nil
})
eg.Go(func() error {
_ = grpc.WaitForReady(ctx, a.state.Load().dataBrokerClientConnection, time.Second*10)
a.groupsCacheWarmer.Run(ctx)
return nil
})
return eg.Wait()
@ -152,9 +154,13 @@ func newPolicyEvaluator(
func (a *Authorize) OnConfigChange(ctx context.Context, cfg *config.Config) {
currentState := a.state.Load()
a.currentOptions.Store(cfg.Options)
if state, err := newAuthorizeStateFromConfig(ctx, cfg, a.store, currentState.evaluator); err != nil {
if newState, err := newAuthorizeStateFromConfig(ctx, cfg, a.store, currentState.evaluator); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("authorize: error updating state")
} else {
a.state.Store(state)
a.state.Store(newState)
if currentState.dataBrokerClientConnection != newState.dataBrokerClientConnection {
a.groupsCacheWarmer.UpdateConn(newState.dataBrokerClientConnection)
}
}
}