mirror of
https://github.com/pomerium/pomerium.git
synced 2025-06-02 10:52:49 +02:00
zero: add service accounts support (#5031)
* zero: add service accounts support * quit on terminal errors
This commit is contained in:
parent
715fd75994
commit
4193583301
7 changed files with 210 additions and 108 deletions
|
@ -108,17 +108,11 @@ func initSecrets(cfg *config.Config, r io.Reader) error {
|
||||||
return fmt.Errorf("pem: %w", err)
|
return fmt.Errorf("pem: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sharedKey, err := readKey(r)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("read key: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cookieSecret, err := readKey(r)
|
cookieSecret, err := readKey(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("read key: %w", err)
|
return fmt.Errorf("read key: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.Options.SharedKey = base64.StdEncoding.EncodeToString(sharedKey)
|
|
||||||
cfg.Options.CookieSecret = base64.StdEncoding.EncodeToString(cookieSecret)
|
cfg.Options.CookieSecret = base64.StdEncoding.EncodeToString(cookieSecret)
|
||||||
cfg.Options.SigningKey = base64.StdEncoding.EncodeToString(signingKeyEncoded)
|
cfg.Options.SigningKey = base64.StdEncoding.EncodeToString(signingKeyEncoded)
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package bootstrap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
@ -56,13 +57,12 @@ func (src *source) UpdateBootstrap(ctx context.Context, cfg cluster_api.Bootstra
|
||||||
incoming := current.Clone()
|
incoming := current.Clone()
|
||||||
applyBootstrapConfig(incoming.Options, &cfg)
|
applyBootstrapConfig(incoming.Options, &cfg)
|
||||||
|
|
||||||
src.markReady.Do(func() { close(src.ready) })
|
|
||||||
|
|
||||||
if cmp.Equal(incoming.Options, current.Options, cmpOpts...) {
|
if cmp.Equal(incoming.Options, current.Options, cmpOpts...) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
src.cfg.Store(incoming)
|
src.cfg.Store(incoming)
|
||||||
|
src.markReady.Do(func() { close(src.ready) })
|
||||||
|
|
||||||
src.notifyListeners(ctx, incoming)
|
src.notifyListeners(ctx, incoming)
|
||||||
|
|
||||||
|
@ -82,6 +82,7 @@ func (src *source) notifyListeners(ctx context.Context, cfg *config.Config) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyBootstrapConfig(dst *config.Options, src *cluster_api.BootstrapConfig) {
|
func applyBootstrapConfig(dst *config.Options, src *cluster_api.BootstrapConfig) {
|
||||||
|
dst.SharedKey = base64.StdEncoding.EncodeToString(src.SharedSecret)
|
||||||
if src.DatabrokerStorageConnection != nil {
|
if src.DatabrokerStorageConnection != nil {
|
||||||
dst.DataBrokerStorageType = config.StoragePostgresName
|
dst.DataBrokerStorageType = config.StoragePostgresName
|
||||||
dst.DataBrokerStorageConnectionString = *src.DatabrokerStorageConnection
|
dst.DataBrokerStorageConnectionString = *src.DatabrokerStorageConnection
|
||||||
|
|
|
@ -37,16 +37,11 @@ func Run(ctx context.Context, opts ...Option) error {
|
||||||
}
|
}
|
||||||
c.bootstrapConfig = src
|
c.bootstrapConfig = src
|
||||||
|
|
||||||
err = c.InitDatabrokerClient(ctx, src.GetConfig())
|
eg.Go(func() error { return run(ctx, "connect", c.runConnect) })
|
||||||
if err != nil {
|
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog) })
|
||||||
return fmt.Errorf("init databroker client: %w", err)
|
eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap) })
|
||||||
}
|
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore) })
|
||||||
|
eg.Go(func() error { return run(ctx, "zero-control-loop", c.runZeroControlLoop) })
|
||||||
eg.Go(func() error { return run(ctx, "connect", c.runConnect, nil) })
|
|
||||||
eg.Go(func() error { return run(ctx, "connect-log", c.RunConnectLog, nil) })
|
|
||||||
eg.Go(func() error { return run(ctx, "zero-bootstrap", c.runBootstrap, nil) })
|
|
||||||
eg.Go(func() error { return run(ctx, "pomerium-core", c.runPomeriumCore, src.WaitReady) })
|
|
||||||
eg.Go(func() error { return c.runZeroControlLoop(ctx, src.WaitReady) })
|
|
||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,8 +51,6 @@ type controller struct {
|
||||||
api *sdk.API
|
api *sdk.API
|
||||||
|
|
||||||
bootstrapConfig *bootstrap.Source
|
bootstrapConfig *bootstrap.Source
|
||||||
|
|
||||||
databrokerClient databroker.DataBrokerServiceClient
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) initAPI(ctx context.Context) error {
|
func (c *controller) initAPI(ctx context.Context) error {
|
||||||
|
@ -76,15 +69,7 @@ func (c *controller) initAPI(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func run(ctx context.Context, name string, runFn func(context.Context) error, waitFn func(context.Context) error) error {
|
func run(ctx context.Context, name string, runFn func(context.Context) error) error {
|
||||||
if waitFn != nil {
|
|
||||||
log.Ctx(ctx).Info().Str("name", name).Msg("waiting for initial configuration")
|
|
||||||
err := waitFn(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("%s: error waiting for initial configuration: %w", name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Ctx(ctx).Info().Str("name", name).Msg("starting")
|
log.Ctx(ctx).Info().Str("name", name).Msg("starting")
|
||||||
err := runFn(ctx)
|
err := runFn(ctx)
|
||||||
if err != nil && !errors.Is(err, context.Canceled) {
|
if err != nil && !errors.Is(err, context.Canceled) {
|
||||||
|
@ -101,6 +86,10 @@ func (c *controller) runBootstrap(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) runPomeriumCore(ctx context.Context) error {
|
func (c *controller) runPomeriumCore(ctx context.Context) error {
|
||||||
|
err := c.bootstrapConfig.WaitReady(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("waiting for config source to be ready: %w", err)
|
||||||
|
}
|
||||||
return pomerium.Run(ctx, c.bootstrapConfig)
|
return pomerium.Run(ctx, c.bootstrapConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,36 +101,31 @@ func (c *controller) runConnect(ctx context.Context) error {
|
||||||
return c.api.Connect(ctx)
|
return c.api.Connect(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) runZeroControlLoop(ctx context.Context, waitFn func(context.Context) error) error {
|
func (c *controller) runZeroControlLoop(ctx context.Context) error {
|
||||||
err := waitFn(ctx)
|
return leaser.Run(ctx, c.bootstrapConfig,
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error waiting for initial configuration: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return leaser.Run(ctx, c.databrokerClient,
|
|
||||||
c.runReconciler,
|
c.runReconciler,
|
||||||
c.runAnalytics,
|
c.runAnalytics,
|
||||||
c.runReporter,
|
c.runReporter,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) runReconciler(ctx context.Context) error {
|
func (c *controller) runReconciler(ctx context.Context, client databroker.DataBrokerServiceClient) error {
|
||||||
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
|
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
|
||||||
return c.Str("service", "zero-reconciler")
|
return c.Str("service", "zero-reconciler")
|
||||||
})
|
})
|
||||||
|
|
||||||
return reconciler.Run(ctx,
|
return reconciler.Run(ctx,
|
||||||
reconciler.WithAPI(c.api),
|
reconciler.WithAPI(c.api),
|
||||||
reconciler.WithDataBrokerClient(c.GetDataBrokerServiceClient()),
|
reconciler.WithDataBrokerClient(client),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) runAnalytics(ctx context.Context) error {
|
func (c *controller) runAnalytics(ctx context.Context, client databroker.DataBrokerServiceClient) error {
|
||||||
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
|
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
|
||||||
return c.Str("service", "zero-analytics")
|
return c.Str("service", "zero-analytics")
|
||||||
})
|
})
|
||||||
|
|
||||||
err := analytics.Collect(ctx, c.GetDataBrokerServiceClient(), time.Hour)
|
err := analytics.Collect(ctx, client, time.Hour)
|
||||||
if err != nil && ctx.Err() == nil {
|
if err != nil && ctx.Err() == nil {
|
||||||
log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling")
|
log.Ctx(ctx).Error().Err(err).Msg("error collecting analytics, disabling")
|
||||||
return nil
|
return nil
|
||||||
|
@ -150,13 +134,13 @@ func (c *controller) runAnalytics(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *controller) runReporter(ctx context.Context) error {
|
func (c *controller) runReporter(ctx context.Context, client databroker.DataBrokerServiceClient) error {
|
||||||
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
|
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
|
||||||
return c.Str("service", "zero-reporter")
|
return c.Str("service", "zero-reporter")
|
||||||
})
|
})
|
||||||
|
|
||||||
return c.api.Report(ctx,
|
return c.api.Report(ctx,
|
||||||
reporter.WithCollectInterval(time.Hour),
|
reporter.WithCollectInterval(time.Hour),
|
||||||
reporter.WithMetrics(analytics.Metrics(c.GetDataBrokerServiceClient)...),
|
reporter.WithMetrics(analytics.Metrics(func() databroker.DataBrokerServiceClient { return client })...),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,46 +0,0 @@
|
||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/base64"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"net/url"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/config"
|
|
||||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
|
||||||
"github.com/pomerium/pomerium/pkg/grpcutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (c *controller) InitDatabrokerClient(ctx context.Context, cfg *config.Config) error {
|
|
||||||
conn, err := c.newDataBrokerConnection(ctx, cfg)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("databroker connection: %w", err)
|
|
||||||
}
|
|
||||||
c.databrokerClient = databroker.NewDataBrokerServiceClient(conn)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetDataBrokerServiceClient implements the databroker.Leaser interface.
|
|
||||||
func (c *controller) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
|
|
||||||
return c.databrokerClient
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *controller) newDataBrokerConnection(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, error) {
|
|
||||||
sharedSecret, err := base64.StdEncoding.DecodeString(cfg.Options.SharedKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("decode shared_secret: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return grpcutil.NewGRPCClientConn(ctx, &grpcutil.Options{
|
|
||||||
Address: &url.URL{
|
|
||||||
Scheme: "http",
|
|
||||||
Host: net.JoinHostPort("localhost", cfg.GRPCPort),
|
|
||||||
},
|
|
||||||
ServiceName: "databroker",
|
|
||||||
SignedJWTKey: sharedSecret,
|
|
||||||
RequestTimeout: c.cfg.databrokerRequestTimeout,
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -5,49 +5,52 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v4"
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/retry"
|
||||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
)
|
)
|
||||||
|
|
||||||
type service struct {
|
type leaser struct {
|
||||||
|
cancel context.CancelCauseFunc
|
||||||
client databroker.DataBrokerServiceClient
|
client databroker.DataBrokerServiceClient
|
||||||
funcs []func(ctx context.Context) error
|
funcs []func(ctx context.Context, client databroker.DataBrokerServiceClient) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetDataBrokerServiceClient implements the databroker.LeaseHandler interface.
|
// GetDataBrokerServiceClient implements the databroker.LeaseHandler interface.
|
||||||
func (c *service) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
|
func (c *leaser) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
|
||||||
return c.client
|
return c.client
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunLeased implements the databroker.LeaseHandler interface.
|
// RunLeased implements the databroker.LeaseHandler interface.
|
||||||
func (c *service) RunLeased(ctx context.Context) error {
|
func (c *leaser) RunLeased(ctx context.Context) error {
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
for _, fn := range append(c.funcs, c.databrokerChangeMonitor) {
|
for _, fn := range append(c.funcs, databrokerChangeMonitor) {
|
||||||
fn := fn
|
fn := fn
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return fn(ctx)
|
err := fn(ctx, c.client)
|
||||||
|
if retry.IsTerminalError(err) {
|
||||||
|
c.cancel(err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return eg.Wait()
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run runs services within a lease
|
func runWithLease(
|
||||||
func Run(
|
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
client databroker.DataBrokerServiceClient,
|
client databroker.DataBrokerServiceClient,
|
||||||
funcs ...func(ctx context.Context) error,
|
funcs ...func(context.Context, databroker.DataBrokerServiceClient) error,
|
||||||
) error {
|
) error {
|
||||||
srv := &service{
|
ctx, cancel := context.WithCancelCause(ctx)
|
||||||
|
defer cancel(context.Canceled)
|
||||||
|
|
||||||
|
srv := &leaser{
|
||||||
|
cancel: cancel,
|
||||||
client: client,
|
client: client,
|
||||||
funcs: funcs,
|
funcs: funcs,
|
||||||
}
|
}
|
||||||
b := backoff.NewExponentialBackOff()
|
|
||||||
b.MaxElapsedTime = 0
|
|
||||||
leaser := databroker.NewLeaser("zero-ctrl", time.Second*30, srv)
|
leaser := databroker.NewLeaser("zero-ctrl", time.Second*30, srv)
|
||||||
return backoff.Retry(
|
return leaser.Run(ctx)
|
||||||
func() error { return leaser.Run(ctx) },
|
|
||||||
backoff.WithContext(b, ctx),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,15 +11,15 @@ const typeStr = "pomerium.io/zero/leaser"
|
||||||
|
|
||||||
// databrokerChangeMonitor runs infinite sync loop to see if there is any change in databroker
|
// databrokerChangeMonitor runs infinite sync loop to see if there is any change in databroker
|
||||||
// it doesn't really syncs anything, just checks if the underlying databroker has changed
|
// it doesn't really syncs anything, just checks if the underlying databroker has changed
|
||||||
func (c *service) databrokerChangeMonitor(ctx context.Context) error {
|
func databrokerChangeMonitor(ctx context.Context, client databroker.DataBrokerServiceClient) error {
|
||||||
_, recordVersion, serverVersion, err := databroker.InitialSync(ctx, c.GetDataBrokerServiceClient(), &databroker.SyncLatestRequest{
|
_, recordVersion, serverVersion, err := databroker.InitialSync(ctx, client, &databroker.SyncLatestRequest{
|
||||||
Type: typeStr,
|
Type: typeStr,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error during initial sync: %w", err)
|
return fmt.Errorf("error during initial sync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stream, err := c.GetDataBrokerServiceClient().Sync(ctx, &databroker.SyncRequest{
|
stream, err := client.Sync(ctx, &databroker.SyncRequest{
|
||||||
Type: typeStr,
|
Type: typeStr,
|
||||||
ServerVersion: serverVersion,
|
ServerVersion: serverVersion,
|
||||||
RecordVersion: recordVersion,
|
RecordVersion: recordVersion,
|
||||||
|
|
166
internal/zero/leaser/runner.go
Normal file
166
internal/zero/leaser/runner.go
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
package leaser
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/url"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/config"
|
||||||
|
"github.com/pomerium/pomerium/internal/retry"
|
||||||
|
"github.com/pomerium/pomerium/internal/zero/bootstrap"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpcutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrBootstrapConfigurationChanged is returned when the bootstrap configuration has changed and the function needs to be restarted.
|
||||||
|
var ErrBootstrapConfigurationChanged = errors.New("bootstrap configuration changed")
|
||||||
|
|
||||||
|
// Run runs the given function with a databroker client.
|
||||||
|
// the function would be restarted if the databroker connection has to be re-established.
|
||||||
|
func Run(
|
||||||
|
ctx context.Context,
|
||||||
|
source *bootstrap.Source,
|
||||||
|
funcs ...func(ctx context.Context, client databroker.DataBrokerServiceClient) error,
|
||||||
|
) error {
|
||||||
|
err := source.WaitReady(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("waiting for config source to be ready: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
p := newRunner(ctx, source)
|
||||||
|
defer p.Close()
|
||||||
|
|
||||||
|
b := backoff.NewExponentialBackOff()
|
||||||
|
b.MaxElapsedTime = 0
|
||||||
|
return backoff.Retry(
|
||||||
|
func() error {
|
||||||
|
err := p.runOnce(ctx, funcs...)
|
||||||
|
if retry.IsTerminalError(err) {
|
||||||
|
return backoff.Permanent(err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
backoff.WithContext(b, ctx),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
type runner struct {
|
||||||
|
source *bootstrap.Source
|
||||||
|
|
||||||
|
lock sync.RWMutex
|
||||||
|
cancel chan struct{}
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
client databroker.DataBrokerServiceClient
|
||||||
|
initError error
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRunner(ctx context.Context, source *bootstrap.Source) *runner {
|
||||||
|
p := &runner{
|
||||||
|
source: source,
|
||||||
|
}
|
||||||
|
p.initLocked(ctx, source.GetConfig())
|
||||||
|
source.OnConfigChange(context.Background(), p.onConfigChange)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close releases the resources used by the databroker provider.
|
||||||
|
func (p *runner) Close() {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
p.closeLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDatabrokerClient returns the databroker client and a channel that will be closed when the client is no longer valid.
|
||||||
|
func (p *runner) getDatabrokerClient() (databroker.DataBrokerServiceClient, <-chan struct{}, error) {
|
||||||
|
p.lock.RLock()
|
||||||
|
defer p.lock.RUnlock()
|
||||||
|
|
||||||
|
if p.initError != nil {
|
||||||
|
return nil, nil, p.initError
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.client, p.cancel, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *runner) onConfigChange(ctx context.Context, cfg *config.Config) {
|
||||||
|
p.lock.Lock()
|
||||||
|
defer p.lock.Unlock()
|
||||||
|
|
||||||
|
p.closeLocked()
|
||||||
|
p.initLocked(ctx, cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *runner) initLocked(ctx context.Context, cfg *config.Config) {
|
||||||
|
conn, err := newDataBrokerConnection(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
p.initError = fmt.Errorf("databroker connection: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
p.conn = conn
|
||||||
|
p.client = databroker.NewDataBrokerServiceClient(conn)
|
||||||
|
p.cancel = make(chan struct{})
|
||||||
|
p.initError = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *runner) closeLocked() {
|
||||||
|
if p.conn != nil {
|
||||||
|
p.conn.Close()
|
||||||
|
p.conn = nil
|
||||||
|
}
|
||||||
|
if p.cancel != nil {
|
||||||
|
close(p.cancel)
|
||||||
|
p.cancel = nil
|
||||||
|
}
|
||||||
|
p.initError = errors.New("databroker connection closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *runner) runOnce(
|
||||||
|
ctx context.Context,
|
||||||
|
funcs ...func(ctx context.Context, client databroker.DataBrokerServiceClient) error,
|
||||||
|
) error {
|
||||||
|
client, cancelCh, err := p.getDatabrokerClient()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get databroker client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancelCause(ctx)
|
||||||
|
defer cancel(context.Canceled)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-cancelCh:
|
||||||
|
cancel(ErrBootstrapConfigurationChanged)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return runWithLease(ctx, client, funcs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDataBrokerConnection(ctx context.Context, cfg *config.Config) (*grpc.ClientConn, error) {
|
||||||
|
sharedSecret, err := base64.StdEncoding.DecodeString(cfg.Options.SharedKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode shared_secret: %w", err)
|
||||||
|
}
|
||||||
|
if len(sharedSecret) != 32 {
|
||||||
|
return nil, fmt.Errorf("shared_secret: expected 32 bytes, got %d", len(sharedSecret))
|
||||||
|
}
|
||||||
|
|
||||||
|
return grpcutil.NewGRPCClientConn(ctx, &grpcutil.Options{
|
||||||
|
Address: &url.URL{
|
||||||
|
Scheme: "http",
|
||||||
|
Host: net.JoinHostPort("localhost", cfg.GRPCPort),
|
||||||
|
},
|
||||||
|
ServiceName: "databroker",
|
||||||
|
SignedJWTKey: sharedSecret,
|
||||||
|
})
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue