mirror of
https://github.com/pomerium/pomerium.git
synced 2025-05-21 04:57:18 +02:00
authorize: build evaluators cache in parallel (#4731)
authorize: build evaluators cache in parallel (#4722) * authorize: build evaluators cache in parallel * session: add unit tests for gRPC wrapper methods (#4713) * core/config: add support for maps in environments (#4717) * reconciler: allow custom comparison function (#4726) * add loopvar alias --------- Co-authored-by: Denis Mishin <dmishin@pomerium.com> Co-authored-by: Kenneth Jenkins <51246568+kenjenkins@users.noreply.github.com> Co-authored-by: Caleb Doxsey <cdoxsey@pomerium.com>
This commit is contained in:
parent
34187e8ba5
commit
6cec77bad5
3 changed files with 116 additions and 31 deletions
|
@ -3,7 +3,6 @@ package databroker
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -12,6 +11,7 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/pomerium/pomerium/config"
|
||||
"github.com/pomerium/pomerium/internal/errgrouputil"
|
||||
"github.com/pomerium/pomerium/internal/hashutil"
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
@ -115,7 +115,6 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
|
|||
|
||||
func (src *ConfigSource) buildNewConfigLocked(ctx context.Context, cfg *config.Config) error {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.SetLimit(runtime.NumCPU()/2 + 1)
|
||||
eg.Go(func() error {
|
||||
src.applySettingsLocked(ctx, cfg)
|
||||
err := cfg.Options.Validate()
|
||||
|
@ -125,30 +124,32 @@ func (src *ConfigSource) buildNewConfigLocked(ctx context.Context, cfg *config.C
|
|||
return nil
|
||||
})
|
||||
|
||||
var policies []*config.Policy
|
||||
var builders []func() error
|
||||
buildPolicy := func(i int, routepb *configpb.Route) func() error {
|
||||
return func() error {
|
||||
policy, err := src.buildPolicyFromProto(routepb)
|
||||
if err != nil {
|
||||
log.Ctx(ctx).Err(err).Msg("databroker: error building policy from protobuf")
|
||||
return nil
|
||||
}
|
||||
policies[i] = policy
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var policyBuilders []errgrouputil.BuilderFunc[config.Policy]
|
||||
for _, cfgpb := range src.dbConfigs {
|
||||
for _, routepb := range cfgpb.GetRoutes() {
|
||||
builders = append(builders, buildPolicy(len(builders), routepb))
|
||||
routepb := routepb
|
||||
policyBuilders = append(policyBuilders, func(ctx context.Context) (*config.Policy, error) {
|
||||
p, err := src.buildPolicyFromProto(ctx, routepb)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building route id=%s: %w", routepb.GetId(), err)
|
||||
}
|
||||
return p, nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
policies = make([]*config.Policy, len(builders))
|
||||
for _, builder := range builders {
|
||||
eg.Go(builder)
|
||||
}
|
||||
var policies []*config.Policy
|
||||
eg.Go(func() error {
|
||||
var errs []error
|
||||
policies, errs = errgrouputil.Build(ctx, policyBuilders...)
|
||||
if len(errs) > 0 {
|
||||
for _, err := range errs {
|
||||
log.Error(ctx).Msg(err.Error())
|
||||
}
|
||||
return fmt.Errorf("error building policies")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
err := eg.Wait()
|
||||
if err != nil {
|
||||
|
@ -177,7 +178,7 @@ func (src *ConfigSource) applySettingsLocked(ctx context.Context, cfg *config.Co
|
|||
}
|
||||
}
|
||||
|
||||
func (src *ConfigSource) buildPolicyFromProto(routepb *configpb.Route) (*config.Policy, error) {
|
||||
func (src *ConfigSource) buildPolicyFromProto(_ context.Context, routepb *configpb.Route) (*config.Policy, error) {
|
||||
policy, err := config.NewPolicyFromProto(routepb)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error building policy from protobuf: %w", err)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue