mirror of
https://github.com/pomerium/pomerium.git
synced 2025-07-06 11:28:10 +02:00
core/controlplane: apply configuration changes in a background thread
This commit is contained in:
parent
2472490075
commit
a45f07762b
4 changed files with 54 additions and 38 deletions
|
@ -75,8 +75,7 @@ func TestEvents(t *testing.T) {
|
||||||
|
|
||||||
srv := &Server{
|
srv := &Server{
|
||||||
haveSetCapacity: make(map[string]bool),
|
haveSetCapacity: make(map[string]bool),
|
||||||
currentConfig: atomicutil.NewValue(versionedConfig{
|
currentConfig: atomicutil.NewValue(&config.Config{
|
||||||
Config: &config.Config{
|
|
||||||
OutboundPort: outboundPort,
|
OutboundPort: outboundPort,
|
||||||
Options: &config.Options{
|
Options: &config.Options{
|
||||||
SharedKey: cryptutil.NewBase64Key(),
|
SharedKey: cryptutil.NewBase64Key(),
|
||||||
|
@ -84,7 +83,7 @@ func TestEvents(t *testing.T) {
|
||||||
GRPCInsecure: proto.Bool(true),
|
GRPCInsecure: proto.Bool(true),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}),
|
),
|
||||||
}
|
}
|
||||||
err := srv.storeEvent(ctx, new(events.LastError))
|
err := srv.storeEvent(ctx, new(events.LastError))
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
|
@ -33,7 +33,7 @@ func (srv *Server) StreamAccessLogs(stream envoy_service_accesslog_v3.AccessLogS
|
||||||
}
|
}
|
||||||
evt = evt.Str("service", "envoy")
|
evt = evt.Str("service", "envoy")
|
||||||
|
|
||||||
fields := srv.currentConfig.Load().Config.Options.GetAccessLogFields()
|
fields := srv.currentConfig.Load().Options.GetAccessLogFields()
|
||||||
for _, field := range fields {
|
for _, field := range fields {
|
||||||
evt = populateLogEvent(field, evt, entry)
|
evt = populateLogEvent(field, evt, entry)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,11 +34,6 @@ import (
|
||||||
"github.com/pomerium/pomerium/pkg/grpcutil"
|
"github.com/pomerium/pomerium/pkg/grpcutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
type versionedConfig struct {
|
|
||||||
*config.Config
|
|
||||||
version int64
|
|
||||||
}
|
|
||||||
|
|
||||||
// A Service can be mounted on the control plane.
|
// A Service can be mounted on the control plane.
|
||||||
type Service interface {
|
type Service interface {
|
||||||
Mount(r *mux.Router)
|
Mount(r *mux.Router)
|
||||||
|
@ -56,7 +51,8 @@ type Server struct {
|
||||||
Builder *envoyconfig.Builder
|
Builder *envoyconfig.Builder
|
||||||
EventsMgr *events.Manager
|
EventsMgr *events.Manager
|
||||||
|
|
||||||
currentConfig *atomicutil.Value[versionedConfig]
|
updateConfig chan *config.Config
|
||||||
|
currentConfig *atomicutil.Value[*config.Config]
|
||||||
name string
|
name string
|
||||||
xdsmgr *xdsmgr.Manager
|
xdsmgr *xdsmgr.Manager
|
||||||
filemgr *filemgr.Manager
|
filemgr *filemgr.Manager
|
||||||
|
@ -77,9 +73,8 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager, eventsMgr
|
||||||
EventsMgr: eventsMgr,
|
EventsMgr: eventsMgr,
|
||||||
reproxy: reproxy.New(),
|
reproxy: reproxy.New(),
|
||||||
haveSetCapacity: map[string]bool{},
|
haveSetCapacity: map[string]bool{},
|
||||||
currentConfig: atomicutil.NewValue(versionedConfig{
|
updateConfig: make(chan *config.Config, 1),
|
||||||
Config: cfg,
|
currentConfig: atomicutil.NewValue(cfg),
|
||||||
}),
|
|
||||||
httpRouter: atomicutil.NewValue(mux.NewRouter()),
|
httpRouter: atomicutil.NewValue(mux.NewRouter()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,38 +244,60 @@ func (srv *Server) Run(ctx context.Context) error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// apply configuration changes
|
||||||
|
eg.Go(func() error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case cfg := <-srv.updateConfig:
|
||||||
|
err := srv.update(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(ctx).Err(err).
|
||||||
|
Int64("config-version", cfg.Version).
|
||||||
|
Msg("controlplane: error updating server with new config")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnConfigChange updates the pomerium config options.
|
// OnConfigChange updates the pomerium config options.
|
||||||
func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error {
|
func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error {
|
||||||
if err := srv.updateRouter(cfg); err != nil {
|
select {
|
||||||
return err
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case srv.updateConfig <- cfg:
|
||||||
}
|
}
|
||||||
srv.reproxy.Update(ctx, cfg)
|
|
||||||
prev := srv.currentConfig.Load()
|
|
||||||
srv.currentConfig.Store(versionedConfig{
|
|
||||||
Config: cfg,
|
|
||||||
version: prev.version + 1,
|
|
||||||
})
|
|
||||||
res, err := srv.buildDiscoveryResources(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
srv.xdsmgr.Update(ctx, cfg.Version, res)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnableAuthenticate enables the authenticate service.
|
// EnableAuthenticate enables the authenticate service.
|
||||||
func (srv *Server) EnableAuthenticate(svc Service) error {
|
func (srv *Server) EnableAuthenticate(svc Service) error {
|
||||||
srv.authenticateSvc = svc
|
srv.authenticateSvc = svc
|
||||||
return srv.updateRouter(srv.currentConfig.Load().Config)
|
return srv.updateRouter(srv.currentConfig.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnableProxy enables the proxy service.
|
// EnableProxy enables the proxy service.
|
||||||
func (srv *Server) EnableProxy(svc Service) error {
|
func (srv *Server) EnableProxy(svc Service) error {
|
||||||
srv.proxySvc = svc
|
srv.proxySvc = svc
|
||||||
return srv.updateRouter(srv.currentConfig.Load().Config)
|
return srv.updateRouter(srv.currentConfig.Load())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *Server) update(ctx context.Context, cfg *config.Config) error {
|
||||||
|
if err := srv.updateRouter(cfg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
srv.reproxy.Update(ctx, cfg)
|
||||||
|
srv.currentConfig.Store(cfg)
|
||||||
|
res, err := srv.buildDiscoveryResources(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
srv.xdsmgr.Update(ctx, cfg.Version, res)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) updateRouter(cfg *config.Config) error {
|
func (srv *Server) updateRouter(cfg *config.Config) error {
|
||||||
|
|
|
@ -29,7 +29,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
||||||
resources := map[string][]*envoy_service_discovery_v3.Resource{}
|
resources := map[string][]*envoy_service_discovery_v3.Resource{}
|
||||||
var clusterCount, listenerCount, routeConfigurationCount int
|
var clusterCount, listenerCount, routeConfigurationCount int
|
||||||
|
|
||||||
clusters, err := srv.Builder.BuildClusters(ctx, cfg.Config)
|
clusters, err := srv.Builder.BuildClusters(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
listeners, err := srv.Builder.BuildListeners(ctx, cfg.Config, false)
|
listeners, err := srv.Builder.BuildListeners(ctx, cfg, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
routeConfigurations, err := srv.Builder.BuildRouteConfigurations(ctx, cfg.Config)
|
routeConfigurations, err := srv.Builder.BuildRouteConfigurations(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue