diff --git a/internal/zero/controller/config.go b/internal/zero/controller/config.go index 4328f3444..0e729acb5 100644 --- a/internal/zero/controller/config.go +++ b/internal/zero/controller/config.go @@ -73,7 +73,7 @@ func newControllerConfig(opts ...Option) *controllerConfig { WithClusterAPIEndpoint("https://console.pomerium.com/cluster/v1"), WithConnectAPIEndpoint("https://connect.pomerium.com"), WithBootstrapConfigFileName("/var/cache/pomerium-bootstrap.dat"), - WithDatabrokerLeaseDuration(time.Minute), + WithDatabrokerLeaseDuration(time.Second * 30), WithDatabrokerRequestTimeout(time.Second * 30), } { opt(c) diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index b2173880c..d1377168e 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -13,7 +13,6 @@ import ( "github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/grpc/databroker" sdk "github.com/pomerium/zero-sdk" - connect_mux "github.com/pomerium/zero-sdk/connect-mux" ) // Run runs Pomerium is managed mode using the provided token. @@ -37,6 +36,7 @@ func Run(ctx context.Context, opts ...Option) error { return fmt.Errorf("init databroker client: %w", err) } + eg.Go(func() error { return run(ctx, "connect", c.runConnect, nil) }) eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap, nil) }) eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) }) eg.Go(func() error { return run(ctx, "zero-reconciler", c.RunReconciler, src.WaitReady) }) @@ -47,8 +47,7 @@ func Run(ctx context.Context, opts ...Option) error { type controller struct { cfg *controllerConfig - connectMux *connect_mux.Mux - api *sdk.API + api *sdk.API bootstrapConfig *bootstrap.Source @@ -56,7 +55,7 @@ type controller struct { } func (c *controller) initAPI(ctx context.Context) error { - api, err := sdk.NewAPI( + api, err := sdk.NewAPI(ctx, sdk.WithClusterAPIEndpoint(c.cfg.clusterAPIEndpoint), sdk.WithAPIToken(c.cfg.apiToken), sdk.WithConnectAPIEndpoint(c.cfg.connectAPIEndpoint), @@ -65,13 +64,7 @@ func (c *controller) initAPI(ctx context.Context) error { return fmt.Errorf("error initializing cloud api: %w", err) } - mux, err := api.Connect(ctx) - if err != nil { - return fmt.Errorf("error starting cloud api: %w", err) - } - c.api = api - c.connectMux = mux return nil } @@ -94,9 +87,13 @@ func run(ctx context.Context, name string, runFn func(context.Context) error, wa } func (c *controller) runBootstrap(ctx context.Context) error { - return c.bootstrapConfig.Run(ctx, c.api, c.connectMux, c.cfg.bootstrapConfigFileName) + return c.bootstrapConfig.Run(ctx, c.api, c.cfg.bootstrapConfigFileName) } func (c *controller) runPomeriumCore(ctx context.Context) error { return pomerium.Run(ctx, c.bootstrapConfig) } + +func (c *controller) runConnect(ctx context.Context) error { + return c.api.Connect(ctx) +} diff --git a/internal/zero/controller/mux_log.go b/internal/zero/controller/mux_log.go index 3fdcf8bd8..b0cad8d58 100644 --- a/internal/zero/controller/mux_log.go +++ b/internal/zero/controller/mux_log.go @@ -12,7 +12,7 @@ import ( func (c *controller) RunConnectLog(ctx context.Context) error { logger := log.Ctx(ctx).With().Str("service", "connect-mux").Logger().Level(zerolog.InfoLevel) - return c.connectMux.Watch(ctx, + return c.api.Watch(ctx, connect_mux.WithOnConnected(func(ctx context.Context) { logger.Info().Msg("connected") }), diff --git a/internal/zero/controller/reconciler.go b/internal/zero/controller/reconciler.go index 237dc93d8..1fec038a7 100644 --- a/internal/zero/controller/reconciler.go +++ b/internal/zero/controller/reconciler.go @@ -2,23 +2,28 @@ package controller import ( "context" + "time" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/zero/reconciler" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) +var now time.Time // for testing + func (c *controller) RunReconciler(ctx context.Context) error { + now = time.Now() leaser := databroker.NewLeaser("zero-reconciler", c.cfg.reconcilerLeaseDuration, c) return leaser.Run(ctx) } // RunLeased implements the databroker.Leaser interface. func (c *controller) RunLeased(ctx context.Context) error { - log.Ctx(ctx).Info().Msg("starting reconciler") + log.Ctx(ctx).Info(). + Str("lease acquired in", time.Since(now).String()). + Msg("starting reconciler (lease acquired)") return reconciler.Run(ctx, reconciler.WithAPI(c.api), - reconciler.WithConnectMux(c.connectMux), reconciler.WithDataBrokerClient(c.GetDataBrokerServiceClient()), ) }