mirror of
https://github.com/pomerium/pomerium.git
synced 2025-05-25 15:07:33 +02:00
authorize: remove wait for ready (#5376)
This commit is contained in:
parent
5d69b925be
commit
3a8bdde211
5 changed files with 6 additions and 91 deletions
|
@ -10,7 +10,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/authorize/evaluator"
|
"github.com/pomerium/pomerium/authorize/evaluator"
|
||||||
"github.com/pomerium/pomerium/authorize/internal/store"
|
"github.com/pomerium/pomerium/authorize/internal/store"
|
||||||
|
@ -20,7 +19,6 @@ import (
|
||||||
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||||
"github.com/pomerium/pomerium/pkg/cryptutil"
|
"github.com/pomerium/pomerium/pkg/cryptutil"
|
||||||
"github.com/pomerium/pomerium/pkg/grpc"
|
|
||||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
"github.com/pomerium/pomerium/pkg/storage"
|
"github.com/pomerium/pomerium/pkg/storage"
|
||||||
)
|
)
|
||||||
|
@ -64,16 +62,8 @@ func (a *Authorize) GetDataBrokerServiceClient() databroker.DataBrokerServiceCli
|
||||||
|
|
||||||
// Run runs the authorize service.
|
// Run runs the authorize service.
|
||||||
func (a *Authorize) Run(ctx context.Context) error {
|
func (a *Authorize) Run(ctx context.Context) error {
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
a.accessTracker.Run(ctx)
|
||||||
eg.Go(func() error {
|
return nil
|
||||||
a.accessTracker.Run(ctx)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
eg.Go(func() error {
|
|
||||||
_ = grpc.WaitForReady(ctx, a.state.Load().dataBrokerClientConnection, time.Second*10)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
return eg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateOptions(o *config.Options) error {
|
func validateOptions(o *config.Options) error {
|
||||||
|
|
|
@ -80,7 +80,6 @@ func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBr
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err)
|
return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err)
|
||||||
}
|
}
|
||||||
_ = grpc.WaitForReady(ctx, cc, time.Second*10)
|
|
||||||
client := databrokerpb.NewDataBrokerServiceClient(cc)
|
client := databrokerpb.NewDataBrokerServiceClient(cc)
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,7 +276,6 @@ func (src *ConfigSource) runUpdater(ctx context.Context, cfg *config.Config) {
|
||||||
log.Ctx(ctx).Debug().
|
log.Ctx(ctx).Debug().
|
||||||
Str("outbound_port", cfg.OutboundPort).
|
Str("outbound_port", cfg.OutboundPort).
|
||||||
Msg("config: starting databroker config source syncer")
|
Msg("config: starting databroker config source syncer")
|
||||||
_ = grpc.WaitForReady(ctx, cc, time.Second*10)
|
|
||||||
_ = syncer.Run(ctx)
|
_ = syncer.Run(ctx)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,11 +4,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/log"
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry"
|
"github.com/pomerium/pomerium/internal/telemetry"
|
||||||
|
@ -52,23 +51,12 @@ func NewGRPCClientConn(ctx context.Context, opts *Options, other ...grpc.DialOpt
|
||||||
grpc.WithChainStreamInterceptor(streamClientInterceptors...),
|
grpc.WithChainStreamInterceptor(streamClientInterceptors...),
|
||||||
grpc.WithStatsHandler(clientStatsHandler.Handler),
|
grpc.WithStatsHandler(clientStatsHandler.Handler),
|
||||||
grpc.WithDisableServiceConfig(),
|
grpc.WithDisableServiceConfig(),
|
||||||
grpc.WithInsecure(),
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||||
|
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
|
||||||
}
|
}
|
||||||
dialOptions = append(dialOptions, other...)
|
dialOptions = append(dialOptions, other...)
|
||||||
log.Ctx(ctx).Debug().Str("address", opts.Address).Msg("grpc: dialing")
|
log.Ctx(ctx).Debug().Str("address", opts.Address).Msg("grpc: dialing")
|
||||||
return grpc.DialContext(ctx, opts.Address, dialOptions...)
|
return grpc.NewClient(opts.Address, dialOptions...)
|
||||||
}
|
|
||||||
|
|
||||||
// grpcTimeoutInterceptor enforces per-RPC request timeouts
|
|
||||||
func grpcTimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
|
|
||||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
||||||
if timeout <= 0 {
|
|
||||||
return invoker(ctx, method, req, reply, cc, opts...)
|
|
||||||
}
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
|
||||||
defer cancel()
|
|
||||||
return invoker(ctx, method, req, reply, cc, opts...)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OutboundOptions are the options for the outbound gRPC client.
|
// OutboundOptions are the options for the outbound gRPC client.
|
||||||
|
@ -125,28 +113,3 @@ func (cache *CachedOutboundGRPClientConn) Get(ctx context.Context, opts *Outboun
|
||||||
cache.opts = opts
|
cache.opts = opts
|
||||||
return cache.current, nil
|
return cache.current, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitForReady waits for the connection to be ready.
|
|
||||||
func WaitForReady(ctx context.Context, cc *grpc.ClientConn, timeout time.Duration) error {
|
|
||||||
if cc.GetState() == connectivity.Ready {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, clearTimeout := context.WithTimeout(ctx, timeout)
|
|
||||||
defer clearTimeout()
|
|
||||||
|
|
||||||
cc.Connect()
|
|
||||||
ticker := time.NewTicker(time.Millisecond * 50)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return context.Cause(ctx)
|
|
||||||
case <-ticker.C:
|
|
||||||
}
|
|
||||||
|
|
||||||
if cc.GetState() == connectivity.Ready {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
package grpc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_grpcTimeoutInterceptor(t *testing.T) {
|
|
||||||
mockInvoker := func(sleepTime time.Duration, wantFail bool) grpc.UnaryInvoker {
|
|
||||||
return func(ctx context.Context, _ string, _, _ any, _ *grpc.ClientConn, _ ...grpc.CallOption) error {
|
|
||||||
time.Sleep(sleepTime)
|
|
||||||
deadline, ok := ctx.Deadline()
|
|
||||||
if !ok {
|
|
||||||
t.Fatal("No deadline set")
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
if ok && now.After(deadline) && !wantFail {
|
|
||||||
t.Errorf("Deadline exceeded, but should not have. now=%v, deadline=%v", now, deadline)
|
|
||||||
} else if now.Before(deadline) && wantFail {
|
|
||||||
t.Errorf("Deadline not exceeded, but should have. now=%v, deadline=%v", now, deadline)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
timeOut := 300 * time.Millisecond
|
|
||||||
to := grpcTimeoutInterceptor(timeOut)
|
|
||||||
|
|
||||||
to(context.Background(), "test", nil, nil, nil, mockInvoker(timeOut*2, true))
|
|
||||||
to(context.Background(), "test", nil, nil, nil, mockInvoker(timeOut/2, false))
|
|
||||||
}
|
|
Loading…
Add table
Add a link
Reference in a new issue