diff --git a/cmd/pomerium/main.go b/cmd/pomerium/main.go index ad3703d6f..f4f290438 100644 --- a/cmd/pomerium/main.go +++ b/cmd/pomerium/main.go @@ -12,6 +12,7 @@ import ( "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/version" + zero_cmd "github.com/pomerium/pomerium/internal/zero/cmd" "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/envoy/files" ) @@ -30,7 +31,12 @@ func main() { } ctx := context.Background() - if err := run(ctx); !errors.Is(err, context.Canceled) { + runFn := run + if zero_cmd.IsManagedMode() { + runFn = zero_cmd.Run + } + + if err := runFn(ctx); err != nil && !errors.Is(err, context.Canceled) { log.Fatal().Err(err).Msg("cmd/pomerium") } log.Info(ctx).Msg("cmd/pomerium: exiting") diff --git a/go.mod b/go.mod index 969099760..1495a693e 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/jackc/pgx/v5 v5.4.2 github.com/klauspost/compress v1.16.7 github.com/martinlindhe/base36 v1.1.1 + github.com/mattn/go-isatty v0.0.19 github.com/mholt/acmez v1.2.0 github.com/minio/minio-go/v7 v7.0.61 github.com/mitchellh/hashstructure/v2 v2.0.2 @@ -169,7 +170,6 @@ require ( github.com/lyft/protoc-gen-star/v2 v2.0.3 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.55 // indirect github.com/minio/md5-simd v1.1.2 // indirect diff --git a/internal/zero/cmd/command.go b/internal/zero/cmd/command.go new file mode 100644 index 000000000..24321b4f0 --- /dev/null +++ b/internal/zero/cmd/command.go @@ -0,0 +1,83 @@ +// Package cmd implements the pomerium zero command. +package cmd + +import ( + "context" + "errors" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/mattn/go-isatty" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/pomerium/pomerium/internal/zero/controller" +) + +// Run runs the pomerium zero command. +func Run(ctx context.Context) error { + err := setupLogger() + if err != nil { + return fmt.Errorf("error setting up logger: %w", err) + } + + token := getToken() + if token == "" { + return errors.New("no token provided") + } + + return controller.Run( + withInterrupt(ctx), + controller.WithAPIToken(token), + controller.WithClusterAPIEndpoint(getClusterAPIEndpoint()), + controller.WithConnectAPIEndpoint(getConnectAPIEndpoint()), + ) +} + +// IsManagedMode returns true if Pomerium should start in managed mode using this command. +func IsManagedMode() bool { + return getToken() != "" +} + +func withInterrupt(ctx context.Context) context.Context { + ctx, cancel := context.WithCancel(ctx) + go func(ctx context.Context) { + ch := make(chan os.Signal, 2) + defer signal.Stop(ch) + + signal.Notify(ch, os.Interrupt) + signal.Notify(ch, syscall.SIGTERM) + + select { + case sig := <-ch: + log.Ctx(ctx).Info().Str("signal", sig.String()).Msg("quitting...") + case <-ctx.Done(): + } + cancel() + }(ctx) + return ctx +} + +func setupLogger() error { + if isatty.IsTerminal(os.Stdin.Fd()) { + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + } else { + log.Logger = zerolog.New(os.Stderr) + } + + if rawLvl, ok := os.LookupEnv("LOG_LEVEL"); ok { + lvl, err := zerolog.ParseLevel(rawLvl) + if err != nil { + return err + } + log.Logger = log.Logger.Level(lvl) + } else { + log.Logger = log.Logger.Level(zerolog.InfoLevel) + } + + // set the default context logger + zerolog.DefaultContextLogger = &log.Logger + return nil +} diff --git a/internal/zero/cmd/env.go b/internal/zero/cmd/env.go new file mode 100644 index 000000000..5ee6a3c37 --- /dev/null +++ b/internal/zero/cmd/env.go @@ -0,0 +1,13 @@ +package cmd + +import "os" + +const ( + // PomeriumZeroTokenEnv is the environment variable name for the API token. + //nolint: gosec + PomeriumZeroTokenEnv = "POMERIUM_ZERO_TOKEN" +) + +func getToken() string { + return os.Getenv(PomeriumZeroTokenEnv) +} diff --git a/internal/zero/cmd/env_dev.go b/internal/zero/cmd/env_dev.go new file mode 100644 index 000000000..99123ff4e --- /dev/null +++ b/internal/zero/cmd/env_dev.go @@ -0,0 +1,21 @@ +//go:build !release + +package cmd + +import "os" + +func getConnectAPIEndpoint() string { + connectServerEndpoint := os.Getenv("CONNECT_SERVER_ENDPOINT") + if connectServerEndpoint == "" { + connectServerEndpoint = "http://localhost:8721" + } + return connectServerEndpoint +} + +func getClusterAPIEndpoint() string { + clusterAPIEndpoint := os.Getenv("CLUSTER_API_ENDPOINT") + if clusterAPIEndpoint == "" { + clusterAPIEndpoint = "http://localhost:8720/cluster/v1" + } + return clusterAPIEndpoint +} diff --git a/internal/zero/cmd/env_release.go b/internal/zero/cmd/env_release.go new file mode 100644 index 000000000..1438d40c9 --- /dev/null +++ b/internal/zero/cmd/env_release.go @@ -0,0 +1,11 @@ +//go:build release + +package cmd + +func getConnectAPIEndpoint() string { + return "https://connect.pomerium.com" +} + +func getClusterAPIEndpoint() string { + return "https://console.pomerium.com/cluster/v1" +} diff --git a/internal/zero/controller/cloud_api.go b/internal/zero/controller/cloud_api.go new file mode 100644 index 000000000..79dbfd692 --- /dev/null +++ b/internal/zero/controller/cloud_api.go @@ -0,0 +1,36 @@ +package controller + +import ( + "context" + "fmt" + "net/http" + + cluster_api "github.com/pomerium/zero-sdk/cluster" + connect_api "github.com/pomerium/zero-sdk/connect" + connect_mux "github.com/pomerium/zero-sdk/connect-mux" + token_api "github.com/pomerium/zero-sdk/token" +) + +func (c *controller) InitAPI(ctx context.Context) error { + fetcher, err := cluster_api.NewTokenFetcher(c.cfg.clusterAPIEndpoint) + if err != nil { + return fmt.Errorf("error creating token fetcher: %w", err) + } + + tokenCache := token_api.NewCache(fetcher, c.cfg.apiToken) + + clusterClient, err := cluster_api.NewAuthorizedClient(c.cfg.clusterAPIEndpoint, tokenCache.GetToken, http.DefaultClient) + if err != nil { + return fmt.Errorf("error creating cluster client: %w", err) + } + + connectClient, err := connect_api.NewAuthorizedConnectClient(ctx, c.cfg.connectAPIEndpoint, tokenCache.GetToken) + if err != nil { + return fmt.Errorf("error creating connect client: %w", err) + } + + c.connectMux = connect_mux.Start(ctx, connectClient) + c.clusterClient = clusterClient + + return nil +} diff --git a/internal/zero/controller/config.go b/internal/zero/controller/config.go new file mode 100644 index 000000000..4328f3444 --- /dev/null +++ b/internal/zero/controller/config.go @@ -0,0 +1,86 @@ +package controller + +import "time" + +// Option configures a controller. +type Option func(*controllerConfig) + +type controllerConfig struct { + apiToken string + clusterAPIEndpoint string + connectAPIEndpoint string + + tmpDir string + bootstrapConfigFileName string + + reconcilerLeaseDuration time.Duration + databrokerRequestTimeout time.Duration +} + +// WithTmpDir sets the temporary directory to use. +func WithTmpDir(dir string) Option { + return func(c *controllerConfig) { + c.tmpDir = dir + } +} + +// WithClusterAPIEndpoint sets the endpoint to use for the cluster API +func WithClusterAPIEndpoint(endpoint string) Option { + return func(c *controllerConfig) { + c.clusterAPIEndpoint = endpoint + } +} + +// WithConnectAPIEndpoint sets the endpoint to use for the connect API +func WithConnectAPIEndpoint(endpoint string) Option { + return func(c *controllerConfig) { + c.connectAPIEndpoint = endpoint + } +} + +// WithAPIToken sets the API token to use for authentication. +func WithAPIToken(token string) Option { + return func(c *controllerConfig) { + c.apiToken = token + } +} + +// WithBootstrapConfigFileName sets the name of the file to store the bootstrap config in. +func WithBootstrapConfigFileName(name string) Option { + return func(c *controllerConfig) { + c.bootstrapConfigFileName = name + } +} + +// WithDatabrokerLeaseDuration sets the lease duration for the +func WithDatabrokerLeaseDuration(duration time.Duration) Option { + return func(c *controllerConfig) { + c.reconcilerLeaseDuration = duration + } +} + +// WithDatabrokerRequestTimeout sets the timeout for databroker requests. +func WithDatabrokerRequestTimeout(timeout time.Duration) Option { + return func(c *controllerConfig) { + c.databrokerRequestTimeout = timeout + } +} + +func newControllerConfig(opts ...Option) *controllerConfig { + c := new(controllerConfig) + + for _, opt := range []Option{ + WithClusterAPIEndpoint("https://console.pomerium.com/cluster/v1"), + WithConnectAPIEndpoint("https://connect.pomerium.com"), + WithBootstrapConfigFileName("/var/cache/pomerium-bootstrap.dat"), + WithDatabrokerLeaseDuration(time.Minute), + WithDatabrokerRequestTimeout(time.Second * 30), + } { + opt(c) + } + + for _, opt := range opts { + opt(c) + } + return c +} diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go new file mode 100644 index 000000000..994b37c0a --- /dev/null +++ b/internal/zero/controller/controller.go @@ -0,0 +1,80 @@ +// Package controller implements Pomerium managed mode +package controller + +import ( + "context" + "errors" + "fmt" + + "golang.org/x/sync/errgroup" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/zero/bootstrap" + "github.com/pomerium/pomerium/pkg/cmd/pomerium" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + cluster_api "github.com/pomerium/zero-sdk/cluster" + connect_mux "github.com/pomerium/zero-sdk/connect-mux" +) + +// Run runs Pomerium is managed mode using the provided token. +func Run(ctx context.Context, opts ...Option) error { + c := controller{cfg: newControllerConfig(opts...)} + err := c.InitAPI(ctx) + if err != nil { + return fmt.Errorf("error initializing cloud api: %w", err) + } + + src, err := bootstrap.New([]byte(c.cfg.apiToken)) + if err != nil { + return fmt.Errorf("error creating bootstrap config: %w", err) + } + c.bootstrapConfig = src + + err = c.InitDatabrokerClient(ctx, src.GetConfig()) + if err != nil { + return fmt.Errorf("init databroker client: %w", err) + } + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap, nil) }) + eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) }) + eg.Go(func() error { return run(ctx, "zero-reconciler", c.RunReconciler, src.WaitReady) }) + eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) }) + return eg.Wait() +} + +type controller struct { + cfg *controllerConfig + + connectMux *connect_mux.Mux + clusterClient cluster_api.ClientWithResponsesInterface + + bootstrapConfig *bootstrap.Source + + databrokerClient databroker.DataBrokerServiceClient +} + +func run(ctx context.Context, name string, runFn func(context.Context) error, waitFn func(context.Context) error) error { + if waitFn != nil { + log.Ctx(ctx).Info().Str("name", name).Msg("waiting for initial configuration") + err := waitFn(ctx) + if err != nil { + return fmt.Errorf("%s: error waiting for initial configuration: %w", name, err) + } + } + + log.Ctx(ctx).Info().Str("name", name).Msg("starting") + err := runFn(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("%s: %w", name, err) + } + return nil +} + +func (c *controller) runBootstrap(ctx context.Context) error { + return c.bootstrapConfig.Run(ctx, c.clusterClient, c.connectMux, c.cfg.bootstrapConfigFileName) +} + +func (c *controller) runPomeriumCore(ctx context.Context) error { + return pomerium.Run(ctx, c.bootstrapConfig) +} diff --git a/internal/zero/controller/databroker.go b/internal/zero/controller/databroker.go new file mode 100644 index 000000000..0008f6713 --- /dev/null +++ b/internal/zero/controller/databroker.go @@ -0,0 +1,46 @@ +package controller + +import ( + "context" + "encoding/base64" + "fmt" + "net" + "net/url" + + "google.golang.org/grpc" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/grpcutil" +) + +func (c *controller) InitDatabrokerClient(ctx context.Context, cfg *config.Config) error { + conn, err := c.newDataBrokerConnection(ctx, cfg) + if err != nil { + return fmt.Errorf("databroker connection: %w", err) + } + c.databrokerClient = databroker.NewDataBrokerServiceClient(conn) + return nil +} + +// GetDataBrokerServiceClient implements the databroker.Leaser interface. +func (c *controller) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient { + return c.databrokerClient +} + +func (c *controller) newDataBrokerConnection(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, error) { + sharedSecret, err := base64.StdEncoding.DecodeString(cfg.Options.SharedKey) + if err != nil { + return nil, fmt.Errorf("decode shared_secret: %w", err) + } + + return grpcutil.NewGRPCClientConn(ctx, &grpcutil.Options{ + Address: &url.URL{ + Scheme: "http", + Host: net.JoinHostPort("localhost", cfg.GRPCPort), + }, + ServiceName: "databroker", + SignedJWTKey: sharedSecret, + RequestTimeout: c.cfg.databrokerRequestTimeout, + }) +} diff --git a/internal/zero/controller/mux_log.go b/internal/zero/controller/mux_log.go new file mode 100644 index 000000000..3fdcf8bd8 --- /dev/null +++ b/internal/zero/controller/mux_log.go @@ -0,0 +1,29 @@ +package controller + +import ( + "context" + + "github.com/rs/zerolog" + + "github.com/pomerium/pomerium/internal/log" + connect_mux "github.com/pomerium/zero-sdk/connect-mux" +) + +func (c *controller) RunConnectLog(ctx context.Context) error { + logger := log.Ctx(ctx).With().Str("service", "connect-mux").Logger().Level(zerolog.InfoLevel) + + return c.connectMux.Watch(ctx, + connect_mux.WithOnConnected(func(ctx context.Context) { + logger.Info().Msg("connected") + }), + connect_mux.WithOnDisconnected(func(ctx context.Context) { + logger.Info().Msg("disconnected") + }), + connect_mux.WithOnBootstrapConfigUpdated(func(ctx context.Context) { + logger.Info().Msg("bootstrap config updated") + }), + connect_mux.WithOnBundleUpdated(func(ctx context.Context, key string) { + logger.Info().Str("key", key).Msg("bundle updated") + }), + ) +} diff --git a/internal/zero/controller/reconciler.go b/internal/zero/controller/reconciler.go new file mode 100644 index 000000000..8e3101b95 --- /dev/null +++ b/internal/zero/controller/reconciler.go @@ -0,0 +1,24 @@ +package controller + +import ( + "context" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/zero/reconciler" + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +func (c *controller) RunReconciler(ctx context.Context) error { + leaser := databroker.NewLeaser("zero-reconciler", c.cfg.reconcilerLeaseDuration, c) + return leaser.Run(ctx) +} + +// RunLeased implements the databroker.Leaser interface. +func (c *controller) RunLeased(ctx context.Context) error { + log.Ctx(ctx).Info().Msg("starting reconciler") + return reconciler.Run(ctx, + reconciler.WithClusterAPIClient(c.clusterClient), + reconciler.WithConnectMux(c.connectMux), + reconciler.WithDataBrokerClient(c.GetDataBrokerServiceClient()), + ) +}