grpc: disable gRPC connection re-use across services (#2515)

This commit is contained in:
Caleb Doxsey 2021-08-24 11:47:16 -06:00 committed by GitHub
parent 526f946097
commit f5a558d4a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 63 additions and 60 deletions

View file

@ -25,6 +25,8 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/directory"
)
var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn)
type authenticateState struct {
redirectURL *url.URL
// sharedEncoder is the encoder to use to serialize data to be consumed
@ -146,7 +148,7 @@ func newAuthenticateStateFromConfig(cfg *config.Config) (*authenticateState, err
return nil, err
}
dataBrokerConn, err := grpc.GetOutboundGRPCClientConn(context.Background(), &grpc.OutboundOptions{
dataBrokerConn, err := outboundGRPCConnection.Get(context.Background(), &grpc.OutboundOptions{
OutboundPort: cfg.OutboundPort,
InstallationID: cfg.Options.InstallationID,
ServiceName: cfg.Options.Services,

View file

@ -14,6 +14,8 @@ import (
"github.com/pomerium/pomerium/pkg/protoutil"
)
var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn)
type authorizeState struct {
sharedKey []byte
evaluator *evaluator.Evaluator
@ -51,7 +53,7 @@ func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*a
return nil, err
}
cc, err := grpc.GetOutboundGRPCClientConn(context.Background(), &grpc.OutboundOptions{
cc, err := outboundGRPCConnection.Get(context.Background(), &grpc.OutboundOptions{
OutboundPort: cfg.OutboundPort,
InstallationID: cfg.Options.InstallationID,
ServiceName: cfg.Options.Services,

View file

@ -205,7 +205,7 @@ func setupDataBroker(ctx context.Context, src config.Source, controlPlane *contr
}
func setupRegistryReporter(ctx context.Context, src config.Source) error {
reporter := new(registry.Reporter)
reporter := registry.NewReporter()
src.OnConfigChange(ctx, reporter.OnConfigChange)
reporter.OnConfigChange(ctx, src.GetConfig())
return nil

View file

@ -20,6 +20,8 @@ import (
const maxEnvoyConfigurationEvents = 50
var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn)
func (srv *Server) handleEnvoyConfigurationEvent(evt *events.EnvoyConfigurationEvent) {
select {
case srv.envoyConfigurationEvents <- evt:
@ -88,7 +90,7 @@ func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBr
return nil, err
}
cc, err := grpc.GetOutboundGRPCClientConn(context.Background(), &grpc.OutboundOptions{
cc, err := outboundGRPCConnection.Get(context.Background(), &grpc.OutboundOptions{
OutboundPort: cfg.OutboundPort,
InstallationID: cfg.Options.InstallationID,
ServiceName: cfg.Options.Services,

View file

@ -18,12 +18,13 @@ import (
// ConfigSource provides a new Config source that decorates an underlying config with
// configuration derived from the data broker.
type ConfigSource struct {
mu sync.RWMutex
computedConfig *config.Config
underlyingConfig *config.Config
dbConfigs map[string]dbConfig
updaterHash uint64
cancel func()
mu sync.RWMutex
outboundGRPCConnection *grpc.CachedOutboundGRPClientConn
computedConfig *config.Config
underlyingConfig *config.Config
dbConfigs map[string]dbConfig
updaterHash uint64
cancel func()
config.ChangeDispatcher
}
@ -36,7 +37,8 @@ type dbConfig struct {
// NewConfigSource creates a new ConfigSource.
func NewConfigSource(ctx context.Context, underlying config.Source, listeners ...config.ChangeListener) *ConfigSource {
src := &ConfigSource{
dbConfigs: map[string]dbConfig{},
dbConfigs: map[string]dbConfig{},
outboundGRPCConnection: new(grpc.CachedOutboundGRPClientConn),
}
for _, li := range listeners {
src.OnConfigChange(ctx, li)
@ -182,7 +184,7 @@ func (src *ConfigSource) runUpdater(cfg *config.Config) {
ctx := context.Background()
ctx, src.cancel = context.WithCancel(ctx)
cc, err := grpc.GetOutboundGRPCClientConn(ctx, connectionOptions)
cc, err := src.outboundGRPCConnection.Get(ctx, connectionOptions)
if err != nil {
log.Error(ctx).Err(err).Msg("databroker: failed to create gRPC connection to data broker")
return

View file

@ -19,7 +19,15 @@ import (
// Reporter periodically submits a list of services available on this instance to the service registry
type Reporter struct {
cancel func()
cancel func()
outboundGRPCConnection *grpc.CachedOutboundGRPClientConn
}
// NewReporter creates a new Reporter.
func NewReporter() *Reporter {
return &Reporter{
outboundGRPCConnection: new(grpc.CachedOutboundGRPClientConn),
}
}
// OnConfigChange applies configuration changes to the reporter
@ -39,7 +47,7 @@ func (r *Reporter) OnConfigChange(ctx context.Context, cfg *config.Config) {
return
}
registryConn, err := grpc.GetOutboundGRPCClientConn(ctx, &grpc.OutboundOptions{
registryConn, err := r.outboundGRPCConnection.Get(ctx, &grpc.OutboundOptions{
OutboundPort: cfg.OutboundPort,
InstallationID: cfg.Options.InstallationID,
ServiceName: cfg.Options.Services,

View file

@ -70,49 +70,6 @@ func grpcTimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor {
}
}
type grpcClientConnRecord struct {
conn *grpc.ClientConn
opts *Options
}
var grpcClientConns = struct {
sync.Mutex
m map[string]grpcClientConnRecord
}{
m: make(map[string]grpcClientConnRecord),
}
// GetGRPCClientConn returns a gRPC client connection for the given name. If a connection for that name has already been
// established the existing connection will be returned. If any options change for that connection, the existing
// connection will be closed and a new one established.
func GetGRPCClientConn(ctx context.Context, name string, opts *Options) (*grpc.ClientConn, error) {
grpcClientConns.Lock()
defer grpcClientConns.Unlock()
current, ok := grpcClientConns.m[name]
if ok {
if cmp.Equal(current.opts, opts) {
return current.conn, nil
}
err := current.conn.Close()
if err != nil {
log.Error(context.TODO()).Err(err).Msg("grpc: failed to close existing connection")
}
}
cc, err := NewGRPCClientConn(ctx, opts)
if err != nil {
return nil, err
}
grpcClientConns.m[name] = grpcClientConnRecord{
conn: cc,
opts: opts,
}
return cc, nil
}
// OutboundOptions are the options for the outbound gRPC client.
type OutboundOptions struct {
// OutboundPort is the port for the outbound gRPC listener.
@ -128,12 +85,42 @@ type OutboundOptions struct {
SignedJWTKey []byte
}
// GetOutboundGRPCClientConn gets the outbound gRPC client.
func GetOutboundGRPCClientConn(ctx context.Context, opts *OutboundOptions) (*grpc.ClientConn, error) {
return GetGRPCClientConn(ctx, "outbound", &Options{
// newOutboundGRPCClientConn gets a new outbound gRPC client.
func newOutboundGRPCClientConn(ctx context.Context, opts *OutboundOptions) (*grpc.ClientConn, error) {
return NewGRPCClientConn(ctx, &Options{
Address: net.JoinHostPort("127.0.0.1", opts.OutboundPort),
InstallationID: opts.InstallationID,
ServiceName: opts.ServiceName,
SignedJWTKey: opts.SignedJWTKey,
})
}
// CachedOutboundGRPClientConn keeps a cached outbound gRPC client connection open based on options.
type CachedOutboundGRPClientConn struct {
mu sync.Mutex
opts *OutboundOptions
current *grpc.ClientConn
}
// Get gets the cached outbound gRPC client, or creates a new one if the options have changed.
func (cache *CachedOutboundGRPClientConn) Get(ctx context.Context, opts *OutboundOptions) (*grpc.ClientConn, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
if cache.current != nil && cmp.Equal(cache.opts, opts) {
return cache.current, nil
}
if cache.current != nil {
_ = cache.current.Close()
cache.current = nil
}
var err error
cache.current, err = newOutboundGRPCClientConn(ctx, opts)
if err != nil {
return nil, err
}
cache.opts = opts
return cache.current, nil
}