diff --git a/authorize/authorize.go b/authorize/authorize.go index 7f21a8100..b4034b147 100644 --- a/authorize/authorize.go +++ b/authorize/authorize.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/pomerium/pomerium/authorize/evaluator" "github.com/pomerium/pomerium/config" @@ -13,6 +14,7 @@ import ( "github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/pkg/cryptutil" + "github.com/pomerium/pomerium/pkg/grpc" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) @@ -57,6 +59,7 @@ func (a *Authorize) GetDataBrokerServiceClient() databroker.DataBrokerServiceCli // Run runs the authorize service. func (a *Authorize) Run(ctx context.Context) error { go a.accessTracker.Run(ctx) + _ = grpc.WaitForReady(ctx, a.state.Load().dataBrokerClientConnection, time.Second*10) return newDataBrokerSyncer(a).Run(ctx) } diff --git a/authorize/state.go b/authorize/state.go index 459b6fdc0..30a73c8d6 100644 --- a/authorize/state.go +++ b/authorize/state.go @@ -5,6 +5,8 @@ import ( "fmt" "sync/atomic" + googlegrpc "google.golang.org/grpc" + "github.com/pomerium/pomerium/authorize/evaluator" "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/encoding" @@ -17,11 +19,12 @@ import ( var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn) type authorizeState struct { - sharedKey []byte - evaluator *evaluator.Evaluator - encoder encoding.MarshalUnmarshaler - dataBrokerClient databroker.DataBrokerServiceClient - auditEncryptor *protoutil.Encryptor + sharedKey []byte + evaluator *evaluator.Evaluator + encoder encoding.MarshalUnmarshaler + dataBrokerClientConnection *googlegrpc.ClientConn + dataBrokerClient databroker.DataBrokerServiceClient + auditEncryptor *protoutil.Encryptor } func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*authorizeState, error) { @@ -62,6 +65,7 @@ func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*a if err != nil { return nil, fmt.Errorf("authorize: error creating databroker connection: %w", err) } + state.dataBrokerClientConnection = cc state.dataBrokerClient = databroker.NewDataBrokerServiceClient(cc) auditKey, err := cfg.Options.GetAuditKey() diff --git a/internal/controlplane/events.go b/internal/controlplane/events.go index 8e0985ba6..8e469bd97 100644 --- a/internal/controlplane/events.go +++ b/internal/controlplane/events.go @@ -80,6 +80,7 @@ func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBr if err != nil { return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err) } + _ = grpc.WaitForReady(ctx, cc, time.Second*10) client := databrokerpb.NewDataBrokerServiceClient(cc) return client, nil } diff --git a/internal/databroker/config_source.go b/internal/databroker/config_source.go index 3e7996913..faa265709 100644 --- a/internal/databroker/config_source.go +++ b/internal/databroker/config_source.go @@ -3,6 +3,7 @@ package databroker import ( "context" "sync" + "time" "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/hashutil" @@ -208,6 +209,7 @@ func (src *ConfigSource) runUpdater(cfg *config.Config) { Str("outbound_port", cfg.OutboundPort). Strs("databroker_urls", databrokerURLs). Msg("config: starting databroker config source syncer") + _ = grpc.WaitForReady(ctx, cc, time.Second*10) _ = syncer.Run(ctx) }() } diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index f969b1f41..c3a54d8d5 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -8,6 +8,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry" @@ -54,7 +55,7 @@ func NewGRPCClientConn(ctx context.Context, opts *Options, other ...grpc.DialOpt grpc.WithInsecure(), } dialOptions = append(dialOptions, other...) - log.Info(ctx).Str("address", opts.Address).Msg("dialing") + log.Info(ctx).Str("address", opts.Address).Msg("grpc: dialing") return grpc.DialContext(ctx, opts.Address, dialOptions...) } @@ -124,3 +125,28 @@ func (cache *CachedOutboundGRPClientConn) Get(ctx context.Context, opts *Outboun cache.opts = opts return cache.current, nil } + +// WaitForReady waits for the connection to be ready. +func WaitForReady(ctx context.Context, cc *grpc.ClientConn, timeout time.Duration) error { + if cc.GetState() == connectivity.Ready { + return nil + } + + ctx, clearTimeout := context.WithTimeout(ctx, timeout) + defer clearTimeout() + + cc.Connect() + ticker := time.NewTicker(time.Millisecond * 50) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + if cc.GetState() == connectivity.Ready { + return nil + } + } +}