diff --git a/authorize/authorize.go b/authorize/authorize.go index d6c2b3ffa..3fc5fcef4 100644 --- a/authorize/authorize.go +++ b/authorize/authorize.go @@ -10,7 +10,6 @@ import ( "time" "github.com/rs/zerolog" - "golang.org/x/sync/errgroup" "github.com/pomerium/pomerium/authorize/evaluator" "github.com/pomerium/pomerium/authorize/internal/store" @@ -20,7 +19,6 @@ import ( "github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/pkg/cryptutil" - "github.com/pomerium/pomerium/pkg/grpc" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" ) @@ -64,16 +62,8 @@ func (a *Authorize) GetDataBrokerServiceClient() databroker.DataBrokerServiceCli // Run runs the authorize service. func (a *Authorize) Run(ctx context.Context) error { - eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - a.accessTracker.Run(ctx) - return nil - }) - eg.Go(func() error { - _ = grpc.WaitForReady(ctx, a.state.Load().dataBrokerClientConnection, time.Second*10) - return nil - }) - return eg.Wait() + a.accessTracker.Run(ctx) + return nil } func validateOptions(o *config.Options) error { diff --git a/internal/controlplane/events.go b/internal/controlplane/events.go index 71c866376..90bc086d2 100644 --- a/internal/controlplane/events.go +++ b/internal/controlplane/events.go @@ -80,7 +80,6 @@ func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBr if err != nil { return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err) } - _ = grpc.WaitForReady(ctx, cc, time.Second*10) client := databrokerpb.NewDataBrokerServiceClient(cc) return client, nil } diff --git a/internal/databroker/config_source.go b/internal/databroker/config_source.go index 7a379777b..b76986ab3 100644 --- a/internal/databroker/config_source.go +++ b/internal/databroker/config_source.go @@ -276,7 +276,6 @@ func (src *ConfigSource) runUpdater(ctx context.Context, cfg *config.Config) { log.Ctx(ctx).Debug(). Str("outbound_port", cfg.OutboundPort). Msg("config: starting databroker config source syncer") - _ = grpc.WaitForReady(ctx, cc, time.Second*10) _ = syncer.Run(ctx) }() } diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index a8e7aad71..0d145769f 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -4,11 +4,10 @@ import ( "context" "net" "sync" - "time" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry" @@ -52,23 +51,12 @@ func NewGRPCClientConn(ctx context.Context, opts *Options, other ...grpc.DialOpt grpc.WithChainStreamInterceptor(streamClientInterceptors...), grpc.WithStatsHandler(clientStatsHandler.Handler), grpc.WithDisableServiceConfig(), - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.WaitForReady(true)), } dialOptions = append(dialOptions, other...) log.Ctx(ctx).Debug().Str("address", opts.Address).Msg("grpc: dialing") - return grpc.DialContext(ctx, opts.Address, dialOptions...) -} - -// grpcTimeoutInterceptor enforces per-RPC request timeouts -func grpcTimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor { - return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if timeout <= 0 { - return invoker(ctx, method, req, reply, cc, opts...) - } - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - return invoker(ctx, method, req, reply, cc, opts...) - } + return grpc.NewClient(opts.Address, dialOptions...) } // OutboundOptions are the options for the outbound gRPC client. @@ -125,28 +113,3 @@ func (cache *CachedOutboundGRPClientConn) Get(ctx context.Context, opts *Outboun cache.opts = opts return cache.current, nil } - -// WaitForReady waits for the connection to be ready. -func WaitForReady(ctx context.Context, cc *grpc.ClientConn, timeout time.Duration) error { - if cc.GetState() == connectivity.Ready { - return nil - } - - ctx, clearTimeout := context.WithTimeout(ctx, timeout) - defer clearTimeout() - - cc.Connect() - ticker := time.NewTicker(time.Millisecond * 50) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return context.Cause(ctx) - case <-ticker.C: - } - - if cc.GetState() == connectivity.Ready { - return nil - } - } -} diff --git a/pkg/grpc/client_test.go b/pkg/grpc/client_test.go deleted file mode 100644 index 1dffa4e46..000000000 --- a/pkg/grpc/client_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package grpc - -import ( - "context" - "testing" - "time" - - "google.golang.org/grpc" -) - -func Test_grpcTimeoutInterceptor(t *testing.T) { - mockInvoker := func(sleepTime time.Duration, wantFail bool) grpc.UnaryInvoker { - return func(ctx context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error { - time.Sleep(sleepTime) - deadline, ok := ctx.Deadline() - if !ok { - t.Fatal("No deadline set") - } - - now := time.Now() - - if ok && now.After(deadline) && !wantFail { - t.Errorf("Deadline exceeded, but should not have. now=%v, deadline=%v", now, deadline) - } else if now.Before(deadline) && wantFail { - t.Errorf("Deadline not exceeded, but should have. now=%v, deadline=%v", now, deadline) - } - return nil - } - } - - timeOut := 300 * time.Millisecond - to := grpcTimeoutInterceptor(timeOut) - - to(context.Background(), "test", nil, nil, nil, mockInvoker(timeOut*2, true)) - to(context.Background(), "test", nil, nil, nil, mockInvoker(timeOut/2, false)) -}