package controlplane import ( "context" "net" "net/http" "sync/atomic" "time" "github.com/gorilla/mux" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/telemetry/requestid" ) type versionedOptions struct { config.Options version int64 } type atomicVersionedOptions struct { value atomic.Value } func (avo *atomicVersionedOptions) Load() versionedOptions { return avo.value.Load().(versionedOptions) } func (avo *atomicVersionedOptions) Store(options versionedOptions) { avo.value.Store(options) } // A Server is the control-plane gRPC and HTTP servers. type Server struct { GRPCListener net.Listener GRPCServer *grpc.Server HTTPListener net.Listener HTTPRouter *mux.Router currentConfig atomicVersionedOptions configUpdated chan struct{} name string } // NewServer creates a new Server. Listener ports are chosen by the OS. func NewServer(name string) (*Server, error) { srv := &Server{ configUpdated: make(chan struct{}, 1), } srv.currentConfig.Store(versionedOptions{}) var err error // setup gRPC srv.GRPCListener, err = net.Listen("tcp4", "127.0.0.1:0") if err != nil { return nil, err } srv.GRPCServer = grpc.NewServer( grpc.StatsHandler(telemetry.NewGRPCServerStatsHandler(name)), grpc.UnaryInterceptor(requestid.UnaryServerInterceptor()), grpc.StreamInterceptor(requestid.StreamServerInterceptor()), ) reflection.Register(srv.GRPCServer) srv.registerXDSHandlers() srv.registerAccessLogHandlers() // setup HTTP srv.HTTPListener, err = net.Listen("tcp4", "127.0.0.1:0") if err != nil { _ = srv.GRPCListener.Close() return nil, err } srv.HTTPRouter = mux.NewRouter() srv.addHTTPMiddleware() return srv, nil } // Run runs the control-plane gRPC and HTTP servers. func (srv *Server) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) // start the gRPC server eg.Go(func() error { log.Info().Str("addr", srv.GRPCListener.Addr().String()).Msg("starting control-plane gRPC server") return srv.GRPCServer.Serve(srv.GRPCListener) }) // gracefully stop the gRPC server on context cancellation eg.Go(func() error { <-ctx.Done() ctx, cancel := context.WithCancel(ctx) ctx, cleanup := context.WithTimeout(ctx, time.Second*5) defer cleanup() go func() { srv.GRPCServer.GracefulStop() cancel() }() go func() { <-ctx.Done() srv.GRPCServer.Stop() cancel() }() <-ctx.Done() return nil }) hsrv := (&http.Server{ BaseContext: func(li net.Listener) context.Context { return ctx }, Handler: srv.HTTPRouter, }) // start the HTTP server eg.Go(func() error { log.Info().Str("addr", srv.HTTPListener.Addr().String()).Msg("starting control-plane HTTP server") return hsrv.Serve(srv.HTTPListener) }) // gracefully stop the HTTP server on context cancellation eg.Go(func() error { <-ctx.Done() ctx, cleanup := context.WithTimeout(ctx, time.Second*5) defer cleanup() return hsrv.Shutdown(ctx) }) return eg.Wait() } // OnConfigChange updates the pomerium config options. func (srv *Server) OnConfigChange(cfg *config.Config) { select { case <-srv.configUpdated: default: } prev := srv.currentConfig.Load() srv.currentConfig.Store(versionedOptions{ Options: *cfg.Options, version: prev.version + 1, }) srv.configUpdated <- struct{}{} }