diff --git a/cmd/pomerium/main.go b/cmd/pomerium/main.go index ad3703d6f..f4f290438 100644 --- a/cmd/pomerium/main.go +++ b/cmd/pomerium/main.go @@ -12,6 +12,7 @@ import ( "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/version" + zero_cmd "github.com/pomerium/pomerium/internal/zero/cmd" "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/envoy/files" ) @@ -30,7 +31,12 @@ func main() { } ctx := context.Background() - if err := run(ctx); !errors.Is(err, context.Canceled) { + runFn := run + if zero_cmd.IsManagedMode() { + runFn = zero_cmd.Run + } + + if err := runFn(ctx); err != nil && !errors.Is(err, context.Canceled) { log.Fatal().Err(err).Msg("cmd/pomerium") } log.Info(ctx).Msg("cmd/pomerium: exiting") diff --git a/go.mod b/go.mod index 112732107..9b4783ff3 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/jackc/pgx/v5 v5.4.2 github.com/klauspost/compress v1.16.7 github.com/martinlindhe/base36 v1.1.1 + github.com/mattn/go-isatty v0.0.19 github.com/mholt/acmez v1.2.0 github.com/minio/minio-go/v7 v7.0.61 github.com/mitchellh/hashstructure/v2 v2.0.2 @@ -188,7 +189,6 @@ require ( github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.55 // indirect github.com/minio/md5-simd v1.1.2 // indirect diff --git a/internal/zero/controller/config.go b/internal/zero/controller/config.go new file mode 100644 index 000000000..4328f3444 --- /dev/null +++ b/internal/zero/controller/config.go @@ -0,0 +1,86 @@ +package controller + +import "time" + +// Option configures a controller. +type Option func(*controllerConfig) + +type controllerConfig struct { + apiToken string + clusterAPIEndpoint string + connectAPIEndpoint string + + tmpDir string + bootstrapConfigFileName string + + reconcilerLeaseDuration time.Duration + databrokerRequestTimeout time.Duration +} + +// WithTmpDir sets the temporary directory to use. +func WithTmpDir(dir string) Option { + return func(c *controllerConfig) { + c.tmpDir = dir + } +} + +// WithClusterAPIEndpoint sets the endpoint to use for the cluster API +func WithClusterAPIEndpoint(endpoint string) Option { + return func(c *controllerConfig) { + c.clusterAPIEndpoint = endpoint + } +} + +// WithConnectAPIEndpoint sets the endpoint to use for the connect API +func WithConnectAPIEndpoint(endpoint string) Option { + return func(c *controllerConfig) { + c.connectAPIEndpoint = endpoint + } +} + +// WithAPIToken sets the API token to use for authentication. +func WithAPIToken(token string) Option { + return func(c *controllerConfig) { + c.apiToken = token + } +} + +// WithBootstrapConfigFileName sets the name of the file to store the bootstrap config in. +func WithBootstrapConfigFileName(name string) Option { + return func(c *controllerConfig) { + c.bootstrapConfigFileName = name + } +} + +// WithDatabrokerLeaseDuration sets the lease duration for the +func WithDatabrokerLeaseDuration(duration time.Duration) Option { + return func(c *controllerConfig) { + c.reconcilerLeaseDuration = duration + } +} + +// WithDatabrokerRequestTimeout sets the timeout for databroker requests. +func WithDatabrokerRequestTimeout(timeout time.Duration) Option { + return func(c *controllerConfig) { + c.databrokerRequestTimeout = timeout + } +} + +func newControllerConfig(opts ...Option) *controllerConfig { + c := new(controllerConfig) + + for _, opt := range []Option{ + WithClusterAPIEndpoint("https://console.pomerium.com/cluster/v1"), + WithConnectAPIEndpoint("https://connect.pomerium.com"), + WithBootstrapConfigFileName("/var/cache/pomerium-bootstrap.dat"), + WithDatabrokerLeaseDuration(time.Minute), + WithDatabrokerRequestTimeout(time.Second * 30), + } { + opt(c) + } + + for _, opt := range opts { + opt(c) + } + return c +} diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go new file mode 100644 index 000000000..b2173880c --- /dev/null +++ b/internal/zero/controller/controller.go @@ -0,0 +1,102 @@ +// Package controller implements Pomerium managed mode +package controller + +import ( + "context" + "errors" + "fmt" + + "golang.org/x/sync/errgroup" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/zero/bootstrap" + "github.com/pomerium/pomerium/pkg/cmd/pomerium" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + sdk "github.com/pomerium/zero-sdk" + connect_mux "github.com/pomerium/zero-sdk/connect-mux" +) + +// Run runs Pomerium is managed mode using the provided token. +func Run(ctx context.Context, opts ...Option) error { + c := controller{cfg: newControllerConfig(opts...)} + eg, ctx := errgroup.WithContext(ctx) + + err := c.initAPI(ctx) + if err != nil { + return fmt.Errorf("init api: %w", err) + } + + src, err := bootstrap.New([]byte(c.cfg.apiToken)) + if err != nil { + return fmt.Errorf("error creating bootstrap config: %w", err) + } + c.bootstrapConfig = src + + err = c.InitDatabrokerClient(ctx, src.GetConfig()) + if err != nil { + return fmt.Errorf("init databroker client: %w", err) + } + + eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap, nil) }) + eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) }) + eg.Go(func() error { return run(ctx, "zero-reconciler", c.RunReconciler, src.WaitReady) }) + eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) }) + return eg.Wait() +} + +type controller struct { + cfg *controllerConfig + + connectMux *connect_mux.Mux + api *sdk.API + + bootstrapConfig *bootstrap.Source + + databrokerClient databroker.DataBrokerServiceClient +} + +func (c *controller) initAPI(ctx context.Context) error { + api, err := sdk.NewAPI( + sdk.WithClusterAPIEndpoint(c.cfg.clusterAPIEndpoint), + sdk.WithAPIToken(c.cfg.apiToken), + sdk.WithConnectAPIEndpoint(c.cfg.connectAPIEndpoint), + ) + if err != nil { + return fmt.Errorf("error initializing cloud api: %w", err) + } + + mux, err := api.Connect(ctx) + if err != nil { + return fmt.Errorf("error starting cloud api: %w", err) + } + + c.api = api + c.connectMux = mux + + return nil +} + +func run(ctx context.Context, name string, runFn func(context.Context) error, waitFn func(context.Context) error) error { + if waitFn != nil { + log.Ctx(ctx).Info().Str("name", name).Msg("waiting for initial configuration") + err := waitFn(ctx) + if err != nil { + return fmt.Errorf("%s: error waiting for initial configuration: %w", name, err) + } + } + + log.Ctx(ctx).Info().Str("name", name).Msg("starting") + err := runFn(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + return fmt.Errorf("%s: %w", name, err) + } + return nil +} + +func (c *controller) runBootstrap(ctx context.Context) error { + return c.bootstrapConfig.Run(ctx, c.api, c.connectMux, c.cfg.bootstrapConfigFileName) +} + +func (c *controller) runPomeriumCore(ctx context.Context) error { + return pomerium.Run(ctx, c.bootstrapConfig) +} diff --git a/internal/zero/controller/databroker.go b/internal/zero/controller/databroker.go new file mode 100644 index 000000000..0008f6713 --- /dev/null +++ b/internal/zero/controller/databroker.go @@ -0,0 +1,46 @@ +package controller + +import ( + "context" + "encoding/base64" + "fmt" + "net" + "net/url" + + "google.golang.org/grpc" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/grpcutil" +) + +func (c *controller) InitDatabrokerClient(ctx context.Context, cfg *config.Config) error { + conn, err := c.newDataBrokerConnection(ctx, cfg) + if err != nil { + return fmt.Errorf("databroker connection: %w", err) + } + c.databrokerClient = databroker.NewDataBrokerServiceClient(conn) + return nil +} + +// GetDataBrokerServiceClient implements the databroker.Leaser interface. +func (c *controller) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient { + return c.databrokerClient +} + +func (c *controller) newDataBrokerConnection(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, error) { + sharedSecret, err := base64.StdEncoding.DecodeString(cfg.Options.SharedKey) + if err != nil { + return nil, fmt.Errorf("decode shared_secret: %w", err) + } + + return grpcutil.NewGRPCClientConn(ctx, &grpcutil.Options{ + Address: &url.URL{ + Scheme: "http", + Host: net.JoinHostPort("localhost", cfg.GRPCPort), + }, + ServiceName: "databroker", + SignedJWTKey: sharedSecret, + RequestTimeout: c.cfg.databrokerRequestTimeout, + }) +} diff --git a/internal/zero/controller/mux_log.go b/internal/zero/controller/mux_log.go new file mode 100644 index 000000000..3fdcf8bd8 --- /dev/null +++ b/internal/zero/controller/mux_log.go @@ -0,0 +1,29 @@ +package controller + +import ( + "context" + + "github.com/rs/zerolog" + + "github.com/pomerium/pomerium/internal/log" + connect_mux "github.com/pomerium/zero-sdk/connect-mux" +) + +func (c *controller) RunConnectLog(ctx context.Context) error { + logger := log.Ctx(ctx).With().Str("service", "connect-mux").Logger().Level(zerolog.InfoLevel) + + return c.connectMux.Watch(ctx, + connect_mux.WithOnConnected(func(ctx context.Context) { + logger.Info().Msg("connected") + }), + connect_mux.WithOnDisconnected(func(ctx context.Context) { + logger.Info().Msg("disconnected") + }), + connect_mux.WithOnBootstrapConfigUpdated(func(ctx context.Context) { + logger.Info().Msg("bootstrap config updated") + }), + connect_mux.WithOnBundleUpdated(func(ctx context.Context, key string) { + logger.Info().Str("key", key).Msg("bundle updated") + }), + ) +} diff --git a/internal/zero/controller/reconciler.go b/internal/zero/controller/reconciler.go new file mode 100644 index 000000000..237dc93d8 --- /dev/null +++ b/internal/zero/controller/reconciler.go @@ -0,0 +1,24 @@ +package controller + +import ( + "context" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/zero/reconciler" + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +func (c *controller) RunReconciler(ctx context.Context) error { + leaser := databroker.NewLeaser("zero-reconciler", c.cfg.reconcilerLeaseDuration, c) + return leaser.Run(ctx) +} + +// RunLeased implements the databroker.Leaser interface. +func (c *controller) RunLeased(ctx context.Context) error { + log.Ctx(ctx).Info().Msg("starting reconciler") + return reconciler.Run(ctx, + reconciler.WithAPI(c.api), + reconciler.WithConnectMux(c.connectMux), + reconciler.WithDataBrokerClient(c.GetDataBrokerServiceClient()), + ) +}