This commit is contained in:
Denis Mishin 2023-11-05 00:20:43 -04:00
parent 75284959e1
commit a00de1cc95
2 changed files with 28 additions and 45 deletions

View file

@ -21,7 +21,6 @@ import (
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpcutil"
"github.com/pomerium/pomerium/pkg/slices"
)
// ConfigSource provides a new Config source that decorates an underlying config with
@ -125,27 +124,38 @@ func (src *ConfigSource) buildNewConfigLocked(ctx context.Context, cfg *config.C
}
return nil
})
policies := slices.NewSafeSlice[*config.Policy]()
for _, cfgpb := range src.dbConfigs {
for _, routepb := range cfgpb.GetRoutes() {
routepb := routepb
eg.Go(func() error {
policy, err := src.buildPolicyFromProto(routepb)
if err != nil {
log.Ctx(ctx).Err(err).Msg("databroker: error building policy from protobuf")
return nil
}
policies.Append(policy)
var policies []*config.Policy
var builders []func() error
buildPolicy := func(i int, routepb *configpb.Route) func() error {
return func() error {
policy, err := src.buildPolicyFromProto(routepb)
if err != nil {
log.Ctx(ctx).Err(err).Msg("databroker: error building policy from protobuf")
return nil
})
}
policies[i] = policy
return nil
}
}
for _, cfgpb := range src.dbConfigs {
for _, routepb := range cfgpb.GetRoutes() {
builders = append(builders, buildPolicy(len(builders), routepb))
}
}
policies = make([]*config.Policy, len(builders))
for _, builder := range builders {
eg.Go(builder)
}
err := eg.Wait()
if err != nil {
return err
}
src.addPolicies(ctx, cfg, policies.Get())
src.addPolicies(ctx, cfg, policies)
return nil
}
@ -198,6 +208,10 @@ func (src *ConfigSource) addPolicies(ctx context.Context, cfg *config.Config, po
var additionalPolicies []config.Policy
for _, policy := range policies {
if policy == nil {
continue
}
id, err := policy.RouteID()
if err != nil {
log.Ctx(ctx).Err(err).Str("policy", policy.String()).Msg("databroker: error getting route id")

View file

@ -1,31 +0,0 @@
package slices
import "sync"
// SafeSlice is a thread safe slice.
type SafeSlice[E any] struct {
mu sync.RWMutex
slice []E
}
// NewSafeSlice creates a new SafeSlice.
func NewSafeSlice[E any]() *SafeSlice[E] {
return &SafeSlice[E]{}
}
// Append appends e to the slice.
func (s *SafeSlice[E]) Append(e E) {
s.mu.Lock()
s.slice = append(s.slice, e)
s.mu.Unlock()
}
// Get gets the slice.
func (s *SafeSlice[E]) Get() []E {
s.mu.RLock()
defer s.mu.RUnlock()
c := make([]E, len(s.slice))
copy(c, s.slice)
return c
}