diff --git a/cmd/pomerium/main.go b/cmd/pomerium/main.go index ad3703d6f..f4f290438 100644 --- a/cmd/pomerium/main.go +++ b/cmd/pomerium/main.go @@ -12,6 +12,7 @@ import ( "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/version" + zero_cmd "github.com/pomerium/pomerium/internal/zero/cmd" "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/envoy/files" ) @@ -30,7 +31,12 @@ func main() { } ctx := context.Background() - if err := run(ctx); !errors.Is(err, context.Canceled) { + runFn := run + if zero_cmd.IsManagedMode() { + runFn = zero_cmd.Run + } + + if err := runFn(ctx); err != nil && !errors.Is(err, context.Canceled) { log.Fatal().Err(err).Msg("cmd/pomerium") } log.Info(ctx).Msg("cmd/pomerium: exiting") diff --git a/go.mod b/go.mod index 2773e5ac0..7c45d46b1 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/jackc/pgx/v5 v5.4.2 github.com/klauspost/compress v1.16.7 github.com/martinlindhe/base36 v1.1.1 + github.com/mattn/go-isatty v0.0.19 github.com/mholt/acmez v1.2.0 github.com/minio/minio-go/v7 v7.0.61 github.com/mitchellh/hashstructure/v2 v2.0.2 @@ -188,7 +189,6 @@ require ( github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.55 // indirect github.com/minio/md5-simd v1.1.2 // indirect diff --git a/internal/zero/cmd/command.go b/internal/zero/cmd/command.go new file mode 100644 index 000000000..24321b4f0 --- /dev/null +++ b/internal/zero/cmd/command.go @@ -0,0 +1,83 @@ +// Package cmd implements the pomerium zero command. +package cmd + +import ( + "context" + "errors" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/mattn/go-isatty" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/pomerium/pomerium/internal/zero/controller" +) + +// Run runs the pomerium zero command. +func Run(ctx context.Context) error { + err := setupLogger() + if err != nil { + return fmt.Errorf("error setting up logger: %w", err) + } + + token := getToken() + if token == "" { + return errors.New("no token provided") + } + + return controller.Run( + withInterrupt(ctx), + controller.WithAPIToken(token), + controller.WithClusterAPIEndpoint(getClusterAPIEndpoint()), + controller.WithConnectAPIEndpoint(getConnectAPIEndpoint()), + ) +} + +// IsManagedMode returns true if Pomerium should start in managed mode using this command. +func IsManagedMode() bool { + return getToken() != "" +} + +func withInterrupt(ctx context.Context) context.Context { + ctx, cancel := context.WithCancel(ctx) + go func(ctx context.Context) { + ch := make(chan os.Signal, 2) + defer signal.Stop(ch) + + signal.Notify(ch, os.Interrupt) + signal.Notify(ch, syscall.SIGTERM) + + select { + case sig := <-ch: + log.Ctx(ctx).Info().Str("signal", sig.String()).Msg("quitting...") + case <-ctx.Done(): + } + cancel() + }(ctx) + return ctx +} + +func setupLogger() error { + if isatty.IsTerminal(os.Stdin.Fd()) { + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + } else { + log.Logger = zerolog.New(os.Stderr) + } + + if rawLvl, ok := os.LookupEnv("LOG_LEVEL"); ok { + lvl, err := zerolog.ParseLevel(rawLvl) + if err != nil { + return err + } + log.Logger = log.Logger.Level(lvl) + } else { + log.Logger = log.Logger.Level(zerolog.InfoLevel) + } + + // set the default context logger + zerolog.DefaultContextLogger = &log.Logger + return nil +} diff --git a/internal/zero/cmd/env.go b/internal/zero/cmd/env.go new file mode 100644 index 000000000..5ee6a3c37 --- /dev/null +++ b/internal/zero/cmd/env.go @@ -0,0 +1,13 @@ +package cmd + +import "os" + +const ( + // PomeriumZeroTokenEnv is the environment variable name for the API token. + //nolint: gosec + PomeriumZeroTokenEnv = "POMERIUM_ZERO_TOKEN" +) + +func getToken() string { + return os.Getenv(PomeriumZeroTokenEnv) +} diff --git a/internal/zero/cmd/env_dev.go b/internal/zero/cmd/env_dev.go new file mode 100644 index 000000000..99123ff4e --- /dev/null +++ b/internal/zero/cmd/env_dev.go @@ -0,0 +1,21 @@ +//go:build !release + +package cmd + +import "os" + +func getConnectAPIEndpoint() string { + connectServerEndpoint := os.Getenv("CONNECT_SERVER_ENDPOINT") + if connectServerEndpoint == "" { + connectServerEndpoint = "http://localhost:8721" + } + return connectServerEndpoint +} + +func getClusterAPIEndpoint() string { + clusterAPIEndpoint := os.Getenv("CLUSTER_API_ENDPOINT") + if clusterAPIEndpoint == "" { + clusterAPIEndpoint = "http://localhost:8720/cluster/v1" + } + return clusterAPIEndpoint +} diff --git a/internal/zero/cmd/env_release.go b/internal/zero/cmd/env_release.go new file mode 100644 index 000000000..1438d40c9 --- /dev/null +++ b/internal/zero/cmd/env_release.go @@ -0,0 +1,11 @@ +//go:build release + +package cmd + +func getConnectAPIEndpoint() string { + return "https://connect.pomerium.com" +} + +func getClusterAPIEndpoint() string { + return "https://console.pomerium.com/cluster/v1" +} diff --git a/internal/zero/controller/config.go b/internal/zero/controller/config.go new file mode 100644 index 000000000..0e729acb5 --- /dev/null +++ b/internal/zero/controller/config.go @@ -0,0 +1,86 @@ +package controller + +import "time" + +// Option configures a controller. +type Option func(*controllerConfig) + +type controllerConfig struct { + apiToken string + clusterAPIEndpoint string + connectAPIEndpoint string + + tmpDir string + bootstrapConfigFileName string + + reconcilerLeaseDuration time.Duration + databrokerRequestTimeout time.Duration +} + +// WithTmpDir sets the temporary directory to use. +func WithTmpDir(dir string) Option { + return func(c *controllerConfig) { + c.tmpDir = dir + } +} + +// WithClusterAPIEndpoint sets the endpoint to use for the cluster API +func WithClusterAPIEndpoint(endpoint string) Option { + return func(c *controllerConfig) { + c.clusterAPIEndpoint = endpoint + } +} + +// WithConnectAPIEndpoint sets the endpoint to use for the connect API +func WithConnectAPIEndpoint(endpoint string) Option { + return func(c *controllerConfig) { + c.connectAPIEndpoint = endpoint + } +} + +// WithAPIToken sets the API token to use for authentication. +func WithAPIToken(token string) Option { + return func(c *controllerConfig) { + c.apiToken = token + } +} + +// WithBootstrapConfigFileName sets the name of the file to store the bootstrap config in. +func WithBootstrapConfigFileName(name string) Option { + return func(c *controllerConfig) { + c.bootstrapConfigFileName = name + } +} + +// WithDatabrokerLeaseDuration sets the lease duration for the +func WithDatabrokerLeaseDuration(duration time.Duration) Option { + return func(c *controllerConfig) { + c.reconcilerLeaseDuration = duration + } +} + +// WithDatabrokerRequestTimeout sets the timeout for databroker requests. +func WithDatabrokerRequestTimeout(timeout time.Duration) Option { + return func(c *controllerConfig) { + c.databrokerRequestTimeout = timeout + } +} + +func newControllerConfig(opts ...Option) *controllerConfig { + c := new(controllerConfig) + + for _, opt := range []Option{ + WithClusterAPIEndpoint("https://console.pomerium.com/cluster/v1"), + WithConnectAPIEndpoint("https://connect.pomerium.com"), + WithBootstrapConfigFileName("/var/cache/pomerium-bootstrap.dat"), + WithDatabrokerLeaseDuration(time.Second * 30), + WithDatabrokerRequestTimeout(time.Second * 30), + } { + opt(c) + } + + for _, opt := range opts { + opt(c) + } + return c +} diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go new file mode 100644 index 000000000..d1377168e --- /dev/null +++ b/internal/zero/controller/controller.go @@ -0,0 +1,99 @@ +// Package controller implements Pomerium managed mode +package controller + +import ( + "context" + "errors" + "fmt" + + "golang.org/x/sync/errgroup" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/zero/bootstrap" + "github.com/pomerium/pomerium/pkg/cmd/pomerium" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + sdk "github.com/pomerium/zero-sdk" +) + +// Run runs Pomerium is managed mode using the provided token. +func Run(ctx context.Context, opts ...Option) error { + c := controller{cfg: newControllerConfig(opts...)} + eg, ctx := errgroup.WithContext(ctx) + + err := c.initAPI(ctx) + if err != nil { + return fmt.Errorf("init api: %w", err) + } + + src, err := bootstrap.New([]byte(c.cfg.apiToken)) + if err != nil { + return fmt.Errorf("error creating bootstrap config: %w", err) + } + c.bootstrapConfig = src + + err = c.InitDatabrokerClient(ctx, src.GetConfig()) + if err != nil { + return fmt.Errorf("init databroker client: %w", err) + } + + eg.Go(func() error { return run(ctx, "connect", c.runConnect, nil) }) + eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap, nil) }) + eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) }) + eg.Go(func() error { return run(ctx, "zero-reconciler", c.RunReconciler, src.WaitReady) }) + eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) }) + return eg.Wait() +} + +type controller struct { + cfg *controllerConfig + + api *sdk.API + + bootstrapConfig *bootstrap.Source + + databrokerClient databroker.DataBrokerServiceClient +} + +func (c *controller) initAPI(ctx context.Context) error { + api, err := sdk.NewAPI(ctx, + sdk.WithClusterAPIEndpoint(c.cfg.clusterAPIEndpoint), + sdk.WithAPIToken(c.cfg.apiToken), + sdk.WithConnectAPIEndpoint(c.cfg.connectAPIEndpoint), + ) + if err != nil { + return fmt.Errorf("error initializing cloud api: %w", err) + } + + c.api = api + + return nil +} + +func run(ctx context.Context, name string, runFn func(context.Context) error, waitFn func(context.Context) error) error { + if waitFn != nil { + log.Ctx(ctx).Info().Str("name", name).Msg("waiting for initial configuration") + err := waitFn(ctx) + if err != nil { + return fmt.Errorf("%s: error waiting for initial configuration: %w", name, err) + } + } + + log.Ctx(ctx).Info().Str("name", name).Msg("starting") + err := runFn(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("%s: %w", name, err) + } + return nil +} + +func (c *controller) runBootstrap(ctx context.Context) error { + return c.bootstrapConfig.Run(ctx, c.api, c.cfg.bootstrapConfigFileName) +} + +func (c *controller) runPomeriumCore(ctx context.Context) error { + return pomerium.Run(ctx, c.bootstrapConfig) +} + +func (c *controller) runConnect(ctx context.Context) error { + return c.api.Connect(ctx) +} diff --git a/internal/zero/controller/databroker.go b/internal/zero/controller/databroker.go new file mode 100644 index 000000000..0008f6713 --- /dev/null +++ b/internal/zero/controller/databroker.go @@ -0,0 +1,46 @@ +package controller + +import ( + "context" + "encoding/base64" + "fmt" + "net" + "net/url" + + "google.golang.org/grpc" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/grpcutil" +) + +func (c *controller) InitDatabrokerClient(ctx context.Context, cfg *config.Config) error { + conn, err := c.newDataBrokerConnection(ctx, cfg) + if err != nil { + return fmt.Errorf("databroker connection: %w", err) + } + c.databrokerClient = databroker.NewDataBrokerServiceClient(conn) + return nil +} + +// GetDataBrokerServiceClient implements the databroker.Leaser interface. +func (c *controller) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient { + return c.databrokerClient +} + +func (c *controller) newDataBrokerConnection(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, error) { + sharedSecret, err := base64.StdEncoding.DecodeString(cfg.Options.SharedKey) + if err != nil { + return nil, fmt.Errorf("decode shared_secret: %w", err) + } + + return grpcutil.NewGRPCClientConn(ctx, &grpcutil.Options{ + Address: &url.URL{ + Scheme: "http", + Host: net.JoinHostPort("localhost", cfg.GRPCPort), + }, + ServiceName: "databroker", + SignedJWTKey: sharedSecret, + RequestTimeout: c.cfg.databrokerRequestTimeout, + }) +} diff --git a/internal/zero/controller/mux_log.go b/internal/zero/controller/mux_log.go new file mode 100644 index 000000000..b0cad8d58 --- /dev/null +++ b/internal/zero/controller/mux_log.go @@ -0,0 +1,29 @@ +package controller + +import ( + "context" + + "github.com/rs/zerolog" + + "github.com/pomerium/pomerium/internal/log" + connect_mux "github.com/pomerium/zero-sdk/connect-mux" +) + +func (c *controller) RunConnectLog(ctx context.Context) error { + logger := log.Ctx(ctx).With().Str("service", "connect-mux").Logger().Level(zerolog.InfoLevel) + + return c.api.Watch(ctx, + connect_mux.WithOnConnected(func(ctx context.Context) { + logger.Info().Msg("connected") + }), + connect_mux.WithOnDisconnected(func(ctx context.Context) { + logger.Info().Msg("disconnected") + }), + connect_mux.WithOnBootstrapConfigUpdated(func(ctx context.Context) { + logger.Info().Msg("bootstrap config updated") + }), + connect_mux.WithOnBundleUpdated(func(ctx context.Context, key string) { + logger.Info().Str("key", key).Msg("bundle updated") + }), + ) +} diff --git a/internal/zero/controller/reconciler.go b/internal/zero/controller/reconciler.go new file mode 100644 index 000000000..9c6993e87 --- /dev/null +++ b/internal/zero/controller/reconciler.go @@ -0,0 +1,21 @@ +package controller + +import ( + "context" + + "github.com/pomerium/pomerium/internal/zero/reconciler" + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +func (c *controller) RunReconciler(ctx context.Context) error { + leaser := databroker.NewLeaser("zero-reconciler", c.cfg.reconcilerLeaseDuration, c) + return leaser.Run(ctx) +} + +// RunLeased implements the databroker.Leaser interface. +func (c *controller) RunLeased(ctx context.Context) error { + return reconciler.Run(ctx, + reconciler.WithAPI(c.api), + reconciler.WithDataBrokerClient(c.GetDataBrokerServiceClient()), + ) +}