mirror of
https://github.com/pomerium/pomerium.git
synced 2025-05-12 00:27:35 +02:00
grpc: wait for connect to be ready before making calls (#3253)
* grpc: wait for connect to be ready before making calls * make sure to stop the ticker
This commit is contained in:
parent
443f4a01f5
commit
761c17b8ac
5 changed files with 42 additions and 6 deletions
|
@ -6,6 +6,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/authorize/evaluator"
|
"github.com/pomerium/pomerium/authorize/evaluator"
|
||||||
"github.com/pomerium/pomerium/config"
|
"github.com/pomerium/pomerium/config"
|
||||||
|
@ -13,6 +14,7 @@ import (
|
||||||
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||||
"github.com/pomerium/pomerium/pkg/cryptutil"
|
"github.com/pomerium/pomerium/pkg/cryptutil"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc"
|
||||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -57,6 +59,7 @@ func (a *Authorize) GetDataBrokerServiceClient() databroker.DataBrokerServiceCli
|
||||||
// Run runs the authorize service.
|
// Run runs the authorize service.
|
||||||
func (a *Authorize) Run(ctx context.Context) error {
|
func (a *Authorize) Run(ctx context.Context) error {
|
||||||
go a.accessTracker.Run(ctx)
|
go a.accessTracker.Run(ctx)
|
||||||
|
_ = grpc.WaitForReady(ctx, a.state.Load().dataBrokerClientConnection, time.Second*10)
|
||||||
return newDataBrokerSyncer(a).Run(ctx)
|
return newDataBrokerSyncer(a).Run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
googlegrpc "google.golang.org/grpc"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/authorize/evaluator"
|
"github.com/pomerium/pomerium/authorize/evaluator"
|
||||||
"github.com/pomerium/pomerium/config"
|
"github.com/pomerium/pomerium/config"
|
||||||
"github.com/pomerium/pomerium/internal/encoding"
|
"github.com/pomerium/pomerium/internal/encoding"
|
||||||
|
@ -17,11 +19,12 @@ import (
|
||||||
var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn)
|
var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn)
|
||||||
|
|
||||||
type authorizeState struct {
|
type authorizeState struct {
|
||||||
sharedKey []byte
|
sharedKey []byte
|
||||||
evaluator *evaluator.Evaluator
|
evaluator *evaluator.Evaluator
|
||||||
encoder encoding.MarshalUnmarshaler
|
encoder encoding.MarshalUnmarshaler
|
||||||
dataBrokerClient databroker.DataBrokerServiceClient
|
dataBrokerClientConnection *googlegrpc.ClientConn
|
||||||
auditEncryptor *protoutil.Encryptor
|
dataBrokerClient databroker.DataBrokerServiceClient
|
||||||
|
auditEncryptor *protoutil.Encryptor
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*authorizeState, error) {
|
func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*authorizeState, error) {
|
||||||
|
@ -62,6 +65,7 @@ func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*a
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("authorize: error creating databroker connection: %w", err)
|
return nil, fmt.Errorf("authorize: error creating databroker connection: %w", err)
|
||||||
}
|
}
|
||||||
|
state.dataBrokerClientConnection = cc
|
||||||
state.dataBrokerClient = databroker.NewDataBrokerServiceClient(cc)
|
state.dataBrokerClient = databroker.NewDataBrokerServiceClient(cc)
|
||||||
|
|
||||||
auditKey, err := cfg.Options.GetAuditKey()
|
auditKey, err := cfg.Options.GetAuditKey()
|
||||||
|
|
|
@ -80,6 +80,7 @@ func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBr
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err)
|
return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err)
|
||||||
}
|
}
|
||||||
|
_ = grpc.WaitForReady(ctx, cc, time.Second*10)
|
||||||
client := databrokerpb.NewDataBrokerServiceClient(cc)
|
client := databrokerpb.NewDataBrokerServiceClient(cc)
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package databroker
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/config"
|
"github.com/pomerium/pomerium/config"
|
||||||
"github.com/pomerium/pomerium/internal/hashutil"
|
"github.com/pomerium/pomerium/internal/hashutil"
|
||||||
|
@ -208,6 +209,7 @@ func (src *ConfigSource) runUpdater(cfg *config.Config) {
|
||||||
Str("outbound_port", cfg.OutboundPort).
|
Str("outbound_port", cfg.OutboundPort).
|
||||||
Strs("databroker_urls", databrokerURLs).
|
Strs("databroker_urls", databrokerURLs).
|
||||||
Msg("config: starting databroker config source syncer")
|
Msg("config: starting databroker config source syncer")
|
||||||
|
_ = grpc.WaitForReady(ctx, cc, time.Second*10)
|
||||||
_ = syncer.Run(ctx)
|
_ = syncer.Run(ctx)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/log"
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry"
|
"github.com/pomerium/pomerium/internal/telemetry"
|
||||||
|
@ -54,7 +55,7 @@ func NewGRPCClientConn(ctx context.Context, opts *Options, other ...grpc.DialOpt
|
||||||
grpc.WithInsecure(),
|
grpc.WithInsecure(),
|
||||||
}
|
}
|
||||||
dialOptions = append(dialOptions, other...)
|
dialOptions = append(dialOptions, other...)
|
||||||
log.Info(ctx).Str("address", opts.Address).Msg("dialing")
|
log.Info(ctx).Str("address", opts.Address).Msg("grpc: dialing")
|
||||||
return grpc.DialContext(ctx, opts.Address, dialOptions...)
|
return grpc.DialContext(ctx, opts.Address, dialOptions...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,3 +125,28 @@ func (cache *CachedOutboundGRPClientConn) Get(ctx context.Context, opts *Outboun
|
||||||
cache.opts = opts
|
cache.opts = opts
|
||||||
return cache.current, nil
|
return cache.current, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitForReady waits for the connection to be ready.
|
||||||
|
func WaitForReady(ctx context.Context, cc *grpc.ClientConn, timeout time.Duration) error {
|
||||||
|
if cc.GetState() == connectivity.Ready {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, clearTimeout := context.WithTimeout(ctx, timeout)
|
||||||
|
defer clearTimeout()
|
||||||
|
|
||||||
|
cc.Connect()
|
||||||
|
ticker := time.NewTicker(time.Millisecond * 50)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
|
||||||
|
if cc.GetState() == connectivity.Ready {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue