Merge remote-tracking branch 'origin/main' into cdoxsey/change-dispatcher

This commit is contained in:
Caleb Doxsey 2023-11-01 10:20:22 -06:00
commit cf5b009ce5
11 changed files with 877 additions and 874 deletions

View file

@ -27,7 +27,6 @@ type Config struct {
Options *Options Options *Options
AutoCertificates []tls.Certificate AutoCertificates []tls.Certificate
EnvoyVersion string EnvoyVersion string
Version int64
// DerivedCertificates are TLS certificates derived from the shared secret // DerivedCertificates are TLS certificates derived from the shared secret
DerivedCertificates []tls.Certificate DerivedCertificates []tls.Certificate
@ -63,7 +62,6 @@ func (cfg *Config) Clone() *Config {
_ = copy(endpoints, cfg.MetricsScrapeEndpoints) _ = copy(endpoints, cfg.MetricsScrapeEndpoints)
return &Config{ return &Config{
Version: cfg.Version,
Options: newOptions, Options: newOptions,
AutoCertificates: cfg.AutoCertificates, AutoCertificates: cfg.AutoCertificates,
EnvoyVersion: cfg.EnvoyVersion, EnvoyVersion: cfg.EnvoyVersion,

View file

@ -117,7 +117,6 @@ func NewFileOrEnvironmentSource(
cfg := &Config{ cfg := &Config{
Options: options, Options: options,
EnvoyVersion: envoyVersion, EnvoyVersion: envoyVersion,
Version: 1,
} }
ports, err := netutil.AllocatePorts(6) ports, err := netutil.AllocatePorts(6)
@ -155,7 +154,6 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
options, err := newOptionsFromConfig(src.configFile) options, err := newOptionsFromConfig(src.configFile)
if err == nil { if err == nil {
cfg = cfg.Clone() cfg = cfg.Clone()
cfg.Version++
cfg.Options = options cfg.Options = options
metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true) metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true)
} else { } else {
@ -165,7 +163,7 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
src.config = cfg src.config = cfg
src.mu.Unlock() src.mu.Unlock()
log.Info(ctx).Int64("config-version", cfg.Version).Msg("config: loaded configuration") log.Info(ctx).Msg("config: loaded configuration")
src.Trigger(ctx, cfg) src.Trigger(ctx, cfg)
} }

View file

@ -75,16 +75,15 @@ func TestEvents(t *testing.T) {
srv := &Server{ srv := &Server{
haveSetCapacity: make(map[string]bool), haveSetCapacity: make(map[string]bool),
currentConfig: atomicutil.NewValue(versionedConfig{ currentConfig: atomicutil.NewValue(&config.Config{
Config: &config.Config{ OutboundPort: outboundPort,
OutboundPort: outboundPort, Options: &config.Options{
Options: &config.Options{ SharedKey: cryptutil.NewBase64Key(),
SharedKey: cryptutil.NewBase64Key(), DataBrokerURLString: "http://" + li.Addr().String(),
DataBrokerURLString: "http://" + li.Addr().String(), GRPCInsecure: proto.Bool(true),
GRPCInsecure: proto.Bool(true),
},
}, },
}), },
),
} }
err := srv.storeEvent(ctx, new(events.LastError)) err := srv.storeEvent(ctx, new(events.LastError))
assert.NoError(t, err) assert.NoError(t, err)

View file

@ -33,7 +33,7 @@ func (srv *Server) StreamAccessLogs(stream envoy_service_accesslog_v3.AccessLogS
} }
evt = evt.Str("service", "envoy") evt = evt.Str("service", "envoy")
fields := srv.currentConfig.Load().Config.Options.GetAccessLogFields() fields := srv.currentConfig.Load().Options.GetAccessLogFields()
for _, field := range fields { for _, field := range fields {
evt = populateLogEvent(field, evt, entry) evt = populateLogEvent(field, evt, entry)
} }

View file

@ -27,6 +27,7 @@ import (
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/requestid" "github.com/pomerium/pomerium/internal/telemetry/requestid"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/urlutil" "github.com/pomerium/pomerium/internal/urlutil"
"github.com/pomerium/pomerium/internal/version" "github.com/pomerium/pomerium/internal/version"
"github.com/pomerium/pomerium/pkg/envoy/files" "github.com/pomerium/pomerium/pkg/envoy/files"
@ -34,11 +35,6 @@ import (
"github.com/pomerium/pomerium/pkg/grpcutil" "github.com/pomerium/pomerium/pkg/grpcutil"
) )
type versionedConfig struct {
*config.Config
version int64
}
// A Service can be mounted on the control plane. // A Service can be mounted on the control plane.
type Service interface { type Service interface {
Mount(r *mux.Router) Mount(r *mux.Router)
@ -56,7 +52,8 @@ type Server struct {
Builder *envoyconfig.Builder Builder *envoyconfig.Builder
EventsMgr *events.Manager EventsMgr *events.Manager
currentConfig *atomicutil.Value[versionedConfig] updateConfig chan *config.Config
currentConfig *atomicutil.Value[*config.Config]
name string name string
xdsmgr *xdsmgr.Manager xdsmgr *xdsmgr.Manager
filemgr *filemgr.Manager filemgr *filemgr.Manager
@ -77,10 +74,9 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager, eventsMgr
EventsMgr: eventsMgr, EventsMgr: eventsMgr,
reproxy: reproxy.New(), reproxy: reproxy.New(),
haveSetCapacity: map[string]bool{}, haveSetCapacity: map[string]bool{},
currentConfig: atomicutil.NewValue(versionedConfig{ updateConfig: make(chan *config.Config, 1),
Config: cfg, currentConfig: atomicutil.NewValue(cfg),
}), httpRouter: atomicutil.NewValue(mux.NewRouter()),
httpRouter: atomicutil.NewValue(mux.NewRouter()),
} }
var err error var err error
@ -249,38 +245,65 @@ func (srv *Server) Run(ctx context.Context) error {
}) })
} }
// apply configuration changes
eg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case cfg := <-srv.updateConfig:
err := srv.update(ctx, cfg)
if err != nil {
log.Error(ctx).Err(err).
Msg("controlplane: error updating server with new config")
}
}
}
})
return eg.Wait() return eg.Wait()
} }
// OnConfigChange updates the pomerium config options. // OnConfigChange updates the pomerium config options.
func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error { func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error {
if err := srv.updateRouter(cfg); err != nil { ctx, span := trace.StartSpan(ctx, "controlplane.Server.OnConfigChange")
return err defer span.End()
select {
case <-ctx.Done():
return ctx.Err()
case srv.updateConfig <- cfg:
} }
srv.reproxy.Update(ctx, cfg)
prev := srv.currentConfig.Load()
srv.currentConfig.Store(versionedConfig{
Config: cfg,
version: prev.version + 1,
})
res, err := srv.buildDiscoveryResources(ctx)
if err != nil {
return err
}
srv.xdsmgr.Update(ctx, cfg.Version, res)
return nil return nil
} }
// EnableAuthenticate enables the authenticate service. // EnableAuthenticate enables the authenticate service.
func (srv *Server) EnableAuthenticate(svc Service) error { func (srv *Server) EnableAuthenticate(svc Service) error {
srv.authenticateSvc = svc srv.authenticateSvc = svc
return srv.updateRouter(srv.currentConfig.Load().Config) return srv.updateRouter(srv.currentConfig.Load())
} }
// EnableProxy enables the proxy service. // EnableProxy enables the proxy service.
func (srv *Server) EnableProxy(svc Service) error { func (srv *Server) EnableProxy(svc Service) error {
srv.proxySvc = svc srv.proxySvc = svc
return srv.updateRouter(srv.currentConfig.Load().Config) return srv.updateRouter(srv.currentConfig.Load())
}
func (srv *Server) update(ctx context.Context, cfg *config.Config) error {
ctx, span := trace.StartSpan(ctx, "controlplane.Server.update")
defer span.End()
if err := srv.updateRouter(cfg); err != nil {
return err
}
srv.reproxy.Update(ctx, cfg)
srv.currentConfig.Store(cfg)
res, err := srv.buildDiscoveryResources(ctx)
if err != nil {
return err
}
srv.xdsmgr.Update(ctx, res)
return nil
} }
func (srv *Server) updateRouter(cfg *config.Config) error { func (srv *Server) updateRouter(cfg *config.Config) error {

View file

@ -3,8 +3,10 @@ package controlplane
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"fmt"
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/internal/telemetry/trace"
@ -24,56 +26,72 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
cfg := srv.currentConfig.Load() cfg := srv.currentConfig.Load()
log.Info(ctx).Int64("config-version", cfg.Version).Msg("controlplane: building discovery resources") log.Info(ctx).Msg("controlplane: building discovery resources")
resources := map[string][]*envoy_service_discovery_v3.Resource{} eg, ctx := errgroup.WithContext(ctx)
var clusterCount, listenerCount, routeConfigurationCount int
clusters, err := srv.Builder.BuildClusters(ctx, cfg.Config) var clusterResources []*envoy_service_discovery_v3.Resource
eg.Go(func() error {
clusters, err := srv.Builder.BuildClusters(ctx, cfg)
if err != nil {
return fmt.Errorf("error building clusters: %w", err)
}
for _, cluster := range clusters {
clusterResources = append(clusterResources, &envoy_service_discovery_v3.Resource{
Name: cluster.Name,
Version: hex.EncodeToString(cryptutil.HashProto(cluster)),
Resource: protoutil.NewAny(cluster),
})
}
return nil
})
var listenerResources []*envoy_service_discovery_v3.Resource
eg.Go(func() error {
listeners, err := srv.Builder.BuildListeners(ctx, cfg, false)
if err != nil {
return fmt.Errorf("error building listeners: %w", err)
}
for _, listener := range listeners {
listenerResources = append(listenerResources, &envoy_service_discovery_v3.Resource{
Name: listener.Name,
Version: hex.EncodeToString(cryptutil.HashProto(listener)),
Resource: protoutil.NewAny(listener),
})
}
return nil
})
var routeConfigurationResources []*envoy_service_discovery_v3.Resource
eg.Go(func() error {
routeConfigurations, err := srv.Builder.BuildRouteConfigurations(ctx, cfg)
if err != nil {
return fmt.Errorf("error building route configurations: %w", err)
}
for _, routeConfiguration := range routeConfigurations {
routeConfigurationResources = append(routeConfigurationResources, &envoy_service_discovery_v3.Resource{
Name: routeConfiguration.Name,
Version: hex.EncodeToString(cryptutil.HashProto(routeConfiguration)),
Resource: protoutil.NewAny(routeConfiguration),
})
}
return nil
})
err := eg.Wait()
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, cluster := range clusters {
clusterCount++
resources[clusterTypeURL] = append(resources[clusterTypeURL], &envoy_service_discovery_v3.Resource{
Name: cluster.Name,
Version: hex.EncodeToString(cryptutil.HashProto(cluster)),
Resource: protoutil.NewAny(cluster),
})
}
listeners, err := srv.Builder.BuildListeners(ctx, cfg.Config, false)
if err != nil {
return nil, err
}
for _, listener := range listeners {
listenerCount++
resources[listenerTypeURL] = append(resources[listenerTypeURL], &envoy_service_discovery_v3.Resource{
Name: listener.Name,
Version: hex.EncodeToString(cryptutil.HashProto(listener)),
Resource: protoutil.NewAny(listener),
})
}
routeConfigurations, err := srv.Builder.BuildRouteConfigurations(ctx, cfg.Config)
if err != nil {
return nil, err
}
for _, routeConfiguration := range routeConfigurations {
routeConfigurationCount++
resources[routeConfigurationTypeURL] = append(resources[routeConfigurationTypeURL], &envoy_service_discovery_v3.Resource{
Name: routeConfiguration.Name,
Version: hex.EncodeToString(cryptutil.HashProto(routeConfiguration)),
Resource: protoutil.NewAny(routeConfiguration),
})
}
log.Info(ctx). log.Info(ctx).
Int64("config-version", cfg.Version). Int("cluster-count", len(clusterResources)).
Int("cluster-count", clusterCount). Int("listener-count", len(listenerResources)).
Int("listener-count", listenerCount). Int("route-configuration-count", len(routeConfigurationResources)).
Int("route-configuration-count", routeConfigurationCount).
Msg("controlplane: built discovery resources") Msg("controlplane: built discovery resources")
return resources, nil return map[string][]*envoy_service_discovery_v3.Resource{
clusterTypeURL: clusterResources,
listenerTypeURL: listenerResources,
routeConfigurationTypeURL: routeConfigurationResources,
}, nil
} }

View file

@ -3,9 +3,6 @@ package xdsmgr
import ( import (
"context" "context"
"fmt"
"strconv"
"strings"
"sync" "sync"
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@ -40,7 +37,7 @@ func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Ma
return &Manager{ return &Manager{
signal: signal.New(), signal: signal.New(),
nonce: toNonce(0), nonce: uuid.New().String(),
resources: resources, resources: resources,
} }
} }
@ -112,7 +109,6 @@ func (mgr *Manager) DeltaAggregatedResources(
case req.GetErrorDetail() != nil: case req.GetErrorDetail() != nil:
log.Info(ctx). log.Info(ctx).
Any("error-detail", req.GetErrorDetail()). Any("error-detail", req.GetErrorDetail()).
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
Msg("xdsmgr: nack") Msg("xdsmgr: nack")
// a NACK // a NACK
// - set the client resource versions to the current resource versions // - set the client resource versions to the current resource versions
@ -122,7 +118,6 @@ func (mgr *Manager) DeltaAggregatedResources(
} }
case req.GetResponseNonce() == mgr.nonce: case req.GetResponseNonce() == mgr.nonce:
log.Info(ctx). log.Info(ctx).
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
Msg("xdsmgr: ack") Msg("xdsmgr: ack")
// an ACK for the last response // an ACK for the last response
// - set the client resource versions to the current resource versions // - set the client resource versions to the current resource versions
@ -133,7 +128,6 @@ func (mgr *Manager) DeltaAggregatedResources(
default: default:
// an ACK for a response that's not the last response // an ACK for a response that's not the last response
log.Info(ctx). log.Info(ctx).
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
Msg("xdsmgr: ack") Msg("xdsmgr: ack")
} }
@ -215,7 +209,6 @@ func (mgr *Manager) DeltaAggregatedResources(
return ctx.Err() return ctx.Err()
case res := <-outgoing: case res := <-outgoing:
log.Info(ctx). log.Info(ctx).
Int64("config-version", versionFromNonce(res.GetNonce())).
Int("resource-count", len(res.GetResources())). Int("resource-count", len(res.GetResources())).
Int("removed-resource-count", len(res.GetRemovedResources())). Int("removed-resource-count", len(res.GetRemovedResources())).
Msg("xdsmgr: sending resources") Msg("xdsmgr: sending resources")
@ -238,8 +231,8 @@ func (mgr *Manager) StreamAggregatedResources(
// Update updates the state of resources. If any changes are made they will be pushed to any listening // Update updates the state of resources. If any changes are made they will be pushed to any listening
// streams. For each TypeURL the list of resources should be the complete list of resources. // streams. For each TypeURL the list of resources should be the complete list of resources.
func (mgr *Manager) Update(ctx context.Context, version int64, resources map[string][]*envoy_service_discovery_v3.Resource) { func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_service_discovery_v3.Resource) {
nonce := toNonce(version) nonce := uuid.New().String()
mgr.mu.Lock() mgr.mu.Lock()
mgr.nonce = nonce mgr.nonce = nonce
@ -248,15 +241,3 @@ func (mgr *Manager) Update(ctx context.Context, version int64, resources map[str
mgr.signal.Broadcast(ctx) mgr.signal.Broadcast(ctx)
} }
func toNonce(version int64) string {
return fmt.Sprintf("%d/%s", version, uuid.New().String())
}
// versionFromNonce parses the version out of the nonce. A missing or invalid version will be returned as 0.
func versionFromNonce(nonce string) (version int64) {
if idx := strings.Index(nonce, "/"); idx > 0 {
version, _ = strconv.ParseInt(nonce[:idx], 10, 64)
}
return version
}

View file

@ -94,7 +94,7 @@ func TestManager(t *testing.T) {
}, msg.GetResources()) }, msg.GetResources())
ack(msg.Nonce) ack(msg.Nonce)
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{ mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
typeURL: {{Name: "r1", Version: "2"}}, typeURL: {{Name: "r1", Version: "2"}},
}) })
@ -105,7 +105,7 @@ func TestManager(t *testing.T) {
}, msg.GetResources()) }, msg.GetResources())
ack(msg.Nonce) ack(msg.Nonce)
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{ mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
typeURL: nil, typeURL: nil,
}) })

View file

@ -109,9 +109,6 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
// add all the config policies to the list // add all the config policies to the list
for _, id := range ids { for _, id := range ids {
cfgpb := src.dbConfigs[id] cfgpb := src.dbConfigs[id]
if cfgpb.GetVersion() > 0 {
cfg.Version = cfgpb.GetVersion()
}
cfg.Options.ApplySettings(ctx, certsIndex, cfgpb.Settings) cfg.Options.ApplySettings(ctx, certsIndex, cfgpb.Settings)
var errCount uint64 var errCount uint64
@ -171,7 +168,7 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
// add the additional policies here since calling `Validate` will reset them. // add the additional policies here since calling `Validate` will reset them.
cfg.Options.AdditionalPolicies = append(cfg.Options.AdditionalPolicies, additionalPolicies...) cfg.Options.AdditionalPolicies = append(cfg.Options.AdditionalPolicies, additionalPolicies...)
log.Info(ctx).Int64("config-version", cfg.Version).Msg("databroker: built new config") log.Info(ctx).Msg("databroker: built new config")
src.computedConfig = cfg src.computedConfig = cfg
if !firstTime { if !firstTime {

File diff suppressed because it is too large Load diff

View file

@ -12,7 +12,6 @@ import "crypt/crypt.proto";
message Config { message Config {
string name = 1; string name = 1;
int64 version = 4;
repeated Route routes = 2; repeated Route routes = 2;
Settings settings = 3; Settings settings = 3;
} }