authenticate: allow changing the authenticate service URL at runtime (#3378)

* config: better change detection

* wip

* fix middleware

* add middleware before handlers

* use ctx
This commit is contained in:
Caleb Doxsey 2022-05-31 13:24:40 -06:00 committed by GitHub
parent 9baaea5e85
commit fd82cc7870
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 104 additions and 51 deletions

View file

@ -6,7 +6,6 @@ import (
"os"
"sync"
"github.com/fsnotify/fsnotify"
"github.com/google/uuid"
"github.com/rs/zerolog"
@ -90,6 +89,7 @@ func (src *StaticSource) OnConfigChange(ctx context.Context, li ChangeListener)
// A FileOrEnvironmentSource retrieves config options from a file or the environment.
type FileOrEnvironmentSource struct {
configFile string
watcher *fileutil.Watcher
mu sync.RWMutex
config *Config
@ -134,17 +134,22 @@ func NewFileOrEnvironmentSource(
src := &FileOrEnvironmentSource{
configFile: configFile,
watcher: fileutil.NewWatcher(),
config: cfg,
}
options.viper.OnConfigChange(src.onConfigChange(ctx))
go options.viper.WatchConfig()
src.watcher.Add(configFile)
ch := src.watcher.Bind()
go func() {
for range ch {
src.check(ctx)
}
}()
return src, nil
}
func (src *FileOrEnvironmentSource) onConfigChange(ctx context.Context) func(fsnotify.Event) {
return func(evt fsnotify.Event) {
ctx := log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
func (src *FileOrEnvironmentSource) check(ctx context.Context) {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("config_change_id", uuid.New().String())
})
log.Info(ctx).Msg("config: file updated, reconfiguring...")
@ -159,10 +164,10 @@ func (src *FileOrEnvironmentSource) onConfigChange(ctx context.Context) func(fsn
log.Error(ctx).Err(err).Msg("config: error updating config")
metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), false)
}
src.config = cfg
src.mu.Unlock()
src.Trigger(ctx, cfg)
}
}
// GetConfig gets the config.

2
go.mod
View file

@ -15,7 +15,7 @@ require (
github.com/docker/docker v20.10.16+incompatible
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1
github.com/envoyproxy/protoc-gen-validate v0.6.7
github.com/fsnotify/fsnotify v1.5.4
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-chi/chi/v5 v5.0.7
github.com/go-jose/go-jose/v3 v3.0.0
github.com/go-redis/redis/v8 v8.11.5

View file

@ -25,7 +25,6 @@ import (
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/registry"
"github.com/pomerium/pomerium/internal/urlutil"
"github.com/pomerium/pomerium/internal/version"
"github.com/pomerium/pomerium/proxy"
)
@ -165,18 +164,14 @@ func setupAuthenticate(ctx context.Context, src config.Source, controlPlane *con
if err != nil {
return fmt.Errorf("error creating authenticate service: %w", err)
}
authenticateURL, err := src.GetConfig().Options.GetInternalAuthenticateURL()
err = controlPlane.EnableAuthenticate(svc)
if err != nil {
return fmt.Errorf("error getting authenticate URL: %w", err)
return fmt.Errorf("error adding authenticate service to control plane: %w", err)
}
src.OnConfigChange(ctx, svc.OnConfigChange)
svc.OnConfigChange(ctx, src.GetConfig())
host := urlutil.StripPort(authenticateURL.Host)
sr := controlPlane.HTTPRouter.Host(host).Subrouter()
svc.Mount(sr)
log.Info(context.TODO()).Str("host", host).Msg("enabled authenticate service")
log.Info(ctx).Msg("enabled authenticate service")
return nil
}
@ -222,7 +217,10 @@ func setupProxy(ctx context.Context, src config.Source, controlPlane *controlpla
if err != nil {
return fmt.Errorf("error creating proxy service: %w", err)
}
controlPlane.HTTPRouter.PathPrefix("/").Handler(svc)
err = controlPlane.EnableProxy(svc)
if err != nil {
return fmt.Errorf("error adding proxy service to control plane: %w", err)
}
log.Info(context.TODO()).Msg("enabled proxy service")
src.OnConfigChange(ctx, svc.OnConfigChange)

View file

@ -3,11 +3,11 @@ package controlplane
import (
"net/http"
"net/http/pprof"
"time"
"github.com/CAFxX/httpcompression"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/pomerium/pomerium/internal/httputil"
"github.com/pomerium/pomerium/internal/log"
@ -15,13 +15,12 @@ import (
"github.com/pomerium/pomerium/internal/telemetry/requestid"
)
func (srv *Server) addHTTPMiddleware() {
func (srv *Server) addHTTPMiddleware(root *mux.Router) {
compressor, err := httpcompression.DefaultAdapter()
if err != nil {
panic(err)
}
root := srv.HTTPRouter
root.Use(compressor)
root.Use(srv.reproxy.Middleware)
root.Use(requestid.HTTPMiddleware())
@ -47,14 +46,4 @@ func (srv *Server) addHTTPMiddleware() {
}, srv.name))
root.HandleFunc("/healthz", httputil.HealthCheck)
root.HandleFunc("/ping", httputil.HealthCheck)
// pprof
srv.DebugRouter.Path("/debug/pprof/cmdline").HandlerFunc(pprof.Cmdline)
srv.DebugRouter.Path("/debug/pprof/profile").HandlerFunc(pprof.Profile)
srv.DebugRouter.Path("/debug/pprof/symbol").HandlerFunc(pprof.Symbol)
srv.DebugRouter.Path("/debug/pprof/trace").HandlerFunc(pprof.Trace)
srv.DebugRouter.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
// metrics
srv.MetricsRouter.Handle("/metrics", srv.metricsMgr)
}

View file

@ -4,6 +4,7 @@ import (
"context"
"net"
"net/http"
"net/http/pprof"
"sync/atomic"
"time"
@ -26,6 +27,7 @@ import (
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/requestid"
"github.com/pomerium/pomerium/internal/urlutil"
"github.com/pomerium/pomerium/internal/version"
pom_grpc "github.com/pomerium/pomerium/pkg/grpc"
"github.com/pomerium/pomerium/pkg/grpcutil"
@ -48,12 +50,16 @@ func (avo *atomicVersionedConfig) Store(cfg versionedConfig) {
avo.value.Store(cfg)
}
// A Service can be mounted on the control plane.
type Service interface {
Mount(r *mux.Router)
}
// A Server is the control-plane gRPC and HTTP servers.
type Server struct {
GRPCListener net.Listener
GRPCServer *grpc.Server
HTTPListener net.Listener
HTTPRouter *mux.Router
MetricsListener net.Listener
MetricsRouter *mux.Router
DebugListener net.Listener
@ -67,6 +73,10 @@ type Server struct {
metricsMgr *config.MetricsManager
reproxy *reproxy.Handler
httpRouter atomic.Value
authenticateSvc Service
proxySvc Service
haveSetCapacity map[string]bool
}
@ -126,10 +136,21 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server,
return nil, err
}
srv.HTTPRouter = mux.NewRouter()
if err := srv.updateRouter(cfg); err != nil {
return nil, err
}
srv.DebugRouter = mux.NewRouter()
srv.MetricsRouter = mux.NewRouter()
srv.addHTTPMiddleware()
// pprof
srv.DebugRouter.Path("/debug/pprof/cmdline").HandlerFunc(pprof.Cmdline)
srv.DebugRouter.Path("/debug/pprof/profile").HandlerFunc(pprof.Profile)
srv.DebugRouter.Path("/debug/pprof/symbol").HandlerFunc(pprof.Symbol)
srv.DebugRouter.Path("/debug/pprof/trace").HandlerFunc(pprof.Trace)
srv.DebugRouter.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
// metrics
srv.MetricsRouter.Handle("/metrics", srv.metricsMgr)
srv.filemgr = filemgr.NewManager()
srv.filemgr.ClearCache()
@ -201,9 +222,11 @@ func (srv *Server) Run(ctx context.Context) error {
for _, entry := range []struct {
Name string
Listener net.Listener
Handler *mux.Router
Handler http.Handler
}{
{"http", srv.HTTPListener, srv.HTTPRouter},
{"http", srv.HTTPListener, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
srv.httpRouter.Load().(http.Handler).ServeHTTP(w, r)
})},
{"debug", srv.DebugListener, srv.DebugRouter},
{"metrics", srv.MetricsListener, srv.MetricsRouter},
} {
@ -239,6 +262,9 @@ func (srv *Server) Run(ctx context.Context) error {
// OnConfigChange updates the pomerium config options.
func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error {
if err := srv.updateRouter(cfg); err != nil {
return err
}
srv.reproxy.Update(ctx, cfg)
prev := srv.currentConfig.Load()
srv.currentConfig.Store(versionedConfig{
@ -252,3 +278,33 @@ func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error
srv.xdsmgr.Update(ctx, res)
return nil
}
// EnableAuthenticate enables the authenticate service.
func (srv *Server) EnableAuthenticate(svc Service) error {
srv.authenticateSvc = svc
return srv.updateRouter(srv.currentConfig.Load().Config)
}
// EnableProxy enables the proxy service.
func (srv *Server) EnableProxy(svc Service) error {
srv.proxySvc = svc
return srv.updateRouter(srv.currentConfig.Load().Config)
}
func (srv *Server) updateRouter(cfg *config.Config) error {
httpRouter := mux.NewRouter()
srv.addHTTPMiddleware(httpRouter)
if srv.authenticateSvc != nil {
authenticateURL, err := cfg.Options.GetInternalAuthenticateURL()
if err != nil {
return err
}
authenticateHost := urlutil.StripPort(authenticateURL.Host)
srv.authenticateSvc.Mount(httpRouter.Host(authenticateHost).Subrouter())
}
if srv.proxySvc != nil {
srv.proxySvc.Mount(httpRouter)
}
srv.httpRouter.Store(http.Handler(httpRouter))
return nil
}

View file

@ -73,6 +73,11 @@ func New(cfg *config.Config) (*Proxy, error) {
return p, nil
}
// Mount mounts the http handler to a mux router.
func (p *Proxy) Mount(r *mux.Router) {
r.PathPrefix("/").Handler(p)
}
// OnConfigChange updates internal structures based on config.Options
func (p *Proxy) OnConfigChange(ctx context.Context, cfg *config.Config) {
if p == nil {