diff --git a/internal/controlplane/events_test.go b/internal/controlplane/events_test.go index 90ebcbfa3..9485adde4 100644 --- a/internal/controlplane/events_test.go +++ b/internal/controlplane/events_test.go @@ -75,16 +75,15 @@ func TestEvents(t *testing.T) { srv := &Server{ haveSetCapacity: make(map[string]bool), - currentConfig: atomicutil.NewValue(versionedConfig{ - Config: &config.Config{ - OutboundPort: outboundPort, - Options: &config.Options{ - SharedKey: cryptutil.NewBase64Key(), - DataBrokerURLString: "http://" + li.Addr().String(), - GRPCInsecure: proto.Bool(true), - }, + currentConfig: atomicutil.NewValue(&config.Config{ + OutboundPort: outboundPort, + Options: &config.Options{ + SharedKey: cryptutil.NewBase64Key(), + DataBrokerURLString: "http://" + li.Addr().String(), + GRPCInsecure: proto.Bool(true), }, - }), + }, + ), } err := srv.storeEvent(ctx, new(events.LastError)) assert.NoError(t, err) diff --git a/internal/controlplane/grpc_accesslog.go b/internal/controlplane/grpc_accesslog.go index 714f0eacb..5a55567b5 100644 --- a/internal/controlplane/grpc_accesslog.go +++ b/internal/controlplane/grpc_accesslog.go @@ -33,7 +33,7 @@ func (srv *Server) StreamAccessLogs(stream envoy_service_accesslog_v3.AccessLogS } evt = evt.Str("service", "envoy") - fields := srv.currentConfig.Load().Config.Options.GetAccessLogFields() + fields := srv.currentConfig.Load().Options.GetAccessLogFields() for _, field := range fields { evt = populateLogEvent(field, evt, entry) } diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index 3d4e482ba..5c82be3d2 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -34,11 +34,6 @@ import ( "github.com/pomerium/pomerium/pkg/grpcutil" ) -type versionedConfig struct { - *config.Config - version int64 -} - // A Service can be mounted on the control plane. type Service interface { Mount(r *mux.Router) @@ -56,7 +51,8 @@ type Server struct { Builder *envoyconfig.Builder EventsMgr *events.Manager - currentConfig *atomicutil.Value[versionedConfig] + updateConfig chan *config.Config + currentConfig *atomicutil.Value[*config.Config] name string xdsmgr *xdsmgr.Manager filemgr *filemgr.Manager @@ -77,10 +73,9 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager, eventsMgr EventsMgr: eventsMgr, reproxy: reproxy.New(), haveSetCapacity: map[string]bool{}, - currentConfig: atomicutil.NewValue(versionedConfig{ - Config: cfg, - }), - httpRouter: atomicutil.NewValue(mux.NewRouter()), + updateConfig: make(chan *config.Config, 1), + currentConfig: atomicutil.NewValue(cfg), + httpRouter: atomicutil.NewValue(mux.NewRouter()), } var err error @@ -249,38 +244,60 @@ func (srv *Server) Run(ctx context.Context) error { }) } + // apply configuration changes + eg.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case cfg := <-srv.updateConfig: + err := srv.update(ctx, cfg) + if err != nil { + log.Error(ctx).Err(err). + Int64("config-version", cfg.Version). + Msg("controlplane: error updating server with new config") + } + } + } + }) + return eg.Wait() } // OnConfigChange updates the pomerium config options. func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error { - if err := srv.updateRouter(cfg); err != nil { - return err + select { + case <-ctx.Done(): + return ctx.Err() + case srv.updateConfig <- cfg: } - srv.reproxy.Update(ctx, cfg) - prev := srv.currentConfig.Load() - srv.currentConfig.Store(versionedConfig{ - Config: cfg, - version: prev.version + 1, - }) - res, err := srv.buildDiscoveryResources(ctx) - if err != nil { - return err - } - srv.xdsmgr.Update(ctx, cfg.Version, res) return nil } // EnableAuthenticate enables the authenticate service. func (srv *Server) EnableAuthenticate(svc Service) error { srv.authenticateSvc = svc - return srv.updateRouter(srv.currentConfig.Load().Config) + return srv.updateRouter(srv.currentConfig.Load()) } // EnableProxy enables the proxy service. func (srv *Server) EnableProxy(svc Service) error { srv.proxySvc = svc - return srv.updateRouter(srv.currentConfig.Load().Config) + return srv.updateRouter(srv.currentConfig.Load()) +} + +func (srv *Server) update(ctx context.Context, cfg *config.Config) error { + if err := srv.updateRouter(cfg); err != nil { + return err + } + srv.reproxy.Update(ctx, cfg) + srv.currentConfig.Store(cfg) + res, err := srv.buildDiscoveryResources(ctx) + if err != nil { + return err + } + srv.xdsmgr.Update(ctx, cfg.Version, res) + return nil } func (srv *Server) updateRouter(cfg *config.Config) error { diff --git a/internal/controlplane/xds.go b/internal/controlplane/xds.go index a3807572b..891caad42 100644 --- a/internal/controlplane/xds.go +++ b/internal/controlplane/xds.go @@ -29,7 +29,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e resources := map[string][]*envoy_service_discovery_v3.Resource{} var clusterCount, listenerCount, routeConfigurationCount int - clusters, err := srv.Builder.BuildClusters(ctx, cfg.Config) + clusters, err := srv.Builder.BuildClusters(ctx, cfg) if err != nil { return nil, err } @@ -42,7 +42,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e }) } - listeners, err := srv.Builder.BuildListeners(ctx, cfg.Config, false) + listeners, err := srv.Builder.BuildListeners(ctx, cfg, false) if err != nil { return nil, err } @@ -55,7 +55,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e }) } - routeConfigurations, err := srv.Builder.BuildRouteConfigurations(ctx, cfg.Config) + routeConfigurations, err := srv.Builder.BuildRouteConfigurations(ctx, cfg) if err != nil { return nil, err }