diff --git a/authenticate/state.go b/authenticate/state.go index d88f0ebe6..026fbc361 100644 --- a/authenticate/state.go +++ b/authenticate/state.go @@ -25,6 +25,8 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/directory" ) +var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn) + type authenticateState struct { redirectURL *url.URL // sharedEncoder is the encoder to use to serialize data to be consumed @@ -146,7 +148,7 @@ func newAuthenticateStateFromConfig(cfg *config.Config) (*authenticateState, err return nil, err } - dataBrokerConn, err := grpc.GetOutboundGRPCClientConn(context.Background(), &grpc.OutboundOptions{ + dataBrokerConn, err := outboundGRPCConnection.Get(context.Background(), &grpc.OutboundOptions{ OutboundPort: cfg.OutboundPort, InstallationID: cfg.Options.InstallationID, ServiceName: cfg.Options.Services, diff --git a/authorize/state.go b/authorize/state.go index 17de135e6..459b6fdc0 100644 --- a/authorize/state.go +++ b/authorize/state.go @@ -14,6 +14,8 @@ import ( "github.com/pomerium/pomerium/pkg/protoutil" ) +var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn) + type authorizeState struct { sharedKey []byte evaluator *evaluator.Evaluator @@ -51,7 +53,7 @@ func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*a return nil, err } - cc, err := grpc.GetOutboundGRPCClientConn(context.Background(), &grpc.OutboundOptions{ + cc, err := outboundGRPCConnection.Get(context.Background(), &grpc.OutboundOptions{ OutboundPort: cfg.OutboundPort, InstallationID: cfg.Options.InstallationID, ServiceName: cfg.Options.Services, diff --git a/internal/cmd/pomerium/pomerium.go b/internal/cmd/pomerium/pomerium.go index 630e8f0b2..5f6fb2bbb 100644 --- a/internal/cmd/pomerium/pomerium.go +++ b/internal/cmd/pomerium/pomerium.go @@ -205,7 +205,7 @@ func setupDataBroker(ctx context.Context, src config.Source, controlPlane *contr } func setupRegistryReporter(ctx context.Context, src config.Source) error { - reporter := new(registry.Reporter) + reporter := registry.NewReporter() src.OnConfigChange(ctx, reporter.OnConfigChange) reporter.OnConfigChange(ctx, src.GetConfig()) return nil diff --git a/internal/controlplane/events.go b/internal/controlplane/events.go index e33141e9b..387230a39 100644 --- a/internal/controlplane/events.go +++ b/internal/controlplane/events.go @@ -20,6 +20,8 @@ import ( const maxEnvoyConfigurationEvents = 50 +var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn) + func (srv *Server) handleEnvoyConfigurationEvent(evt *events.EnvoyConfigurationEvent) { select { case srv.envoyConfigurationEvents <- evt: @@ -88,7 +90,7 @@ func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBr return nil, err } - cc, err := grpc.GetOutboundGRPCClientConn(context.Background(), &grpc.OutboundOptions{ + cc, err := outboundGRPCConnection.Get(context.Background(), &grpc.OutboundOptions{ OutboundPort: cfg.OutboundPort, InstallationID: cfg.Options.InstallationID, ServiceName: cfg.Options.Services, diff --git a/internal/databroker/config_source.go b/internal/databroker/config_source.go index e96cb1cae..8fc910852 100644 --- a/internal/databroker/config_source.go +++ b/internal/databroker/config_source.go @@ -18,12 +18,13 @@ import ( // ConfigSource provides a new Config source that decorates an underlying config with // configuration derived from the data broker. type ConfigSource struct { - mu sync.RWMutex - computedConfig *config.Config - underlyingConfig *config.Config - dbConfigs map[string]dbConfig - updaterHash uint64 - cancel func() + mu sync.RWMutex + outboundGRPCConnection *grpc.CachedOutboundGRPClientConn + computedConfig *config.Config + underlyingConfig *config.Config + dbConfigs map[string]dbConfig + updaterHash uint64 + cancel func() config.ChangeDispatcher } @@ -36,7 +37,8 @@ type dbConfig struct { // NewConfigSource creates a new ConfigSource. func NewConfigSource(ctx context.Context, underlying config.Source, listeners ...config.ChangeListener) *ConfigSource { src := &ConfigSource{ - dbConfigs: map[string]dbConfig{}, + dbConfigs: map[string]dbConfig{}, + outboundGRPCConnection: new(grpc.CachedOutboundGRPClientConn), } for _, li := range listeners { src.OnConfigChange(ctx, li) @@ -182,7 +184,7 @@ func (src *ConfigSource) runUpdater(cfg *config.Config) { ctx := context.Background() ctx, src.cancel = context.WithCancel(ctx) - cc, err := grpc.GetOutboundGRPCClientConn(ctx, connectionOptions) + cc, err := src.outboundGRPCConnection.Get(ctx, connectionOptions) if err != nil { log.Error(ctx).Err(err).Msg("databroker: failed to create gRPC connection to data broker") return diff --git a/internal/registry/reporter.go b/internal/registry/reporter.go index be699e0f5..3121f61b9 100644 --- a/internal/registry/reporter.go +++ b/internal/registry/reporter.go @@ -19,7 +19,15 @@ import ( // Reporter periodically submits a list of services available on this instance to the service registry type Reporter struct { - cancel func() + cancel func() + outboundGRPCConnection *grpc.CachedOutboundGRPClientConn +} + +// NewReporter creates a new Reporter. +func NewReporter() *Reporter { + return &Reporter{ + outboundGRPCConnection: new(grpc.CachedOutboundGRPClientConn), + } } // OnConfigChange applies configuration changes to the reporter @@ -39,7 +47,7 @@ func (r *Reporter) OnConfigChange(ctx context.Context, cfg *config.Config) { return } - registryConn, err := grpc.GetOutboundGRPCClientConn(ctx, &grpc.OutboundOptions{ + registryConn, err := r.outboundGRPCConnection.Get(ctx, &grpc.OutboundOptions{ OutboundPort: cfg.OutboundPort, InstallationID: cfg.Options.InstallationID, ServiceName: cfg.Options.Services, diff --git a/pkg/grpc/client.go b/pkg/grpc/client.go index 26de6ffbf..f969b1f41 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/client.go @@ -70,49 +70,6 @@ func grpcTimeoutInterceptor(timeout time.Duration) grpc.UnaryClientInterceptor { } } -type grpcClientConnRecord struct { - conn *grpc.ClientConn - opts *Options -} - -var grpcClientConns = struct { - sync.Mutex - m map[string]grpcClientConnRecord -}{ - m: make(map[string]grpcClientConnRecord), -} - -// GetGRPCClientConn returns a gRPC client connection for the given name. If a connection for that name has already been -// established the existing connection will be returned. If any options change for that connection, the existing -// connection will be closed and a new one established. -func GetGRPCClientConn(ctx context.Context, name string, opts *Options) (*grpc.ClientConn, error) { - grpcClientConns.Lock() - defer grpcClientConns.Unlock() - - current, ok := grpcClientConns.m[name] - if ok { - if cmp.Equal(current.opts, opts) { - return current.conn, nil - } - - err := current.conn.Close() - if err != nil { - log.Error(context.TODO()).Err(err).Msg("grpc: failed to close existing connection") - } - } - - cc, err := NewGRPCClientConn(ctx, opts) - if err != nil { - return nil, err - } - - grpcClientConns.m[name] = grpcClientConnRecord{ - conn: cc, - opts: opts, - } - return cc, nil -} - // OutboundOptions are the options for the outbound gRPC client. type OutboundOptions struct { // OutboundPort is the port for the outbound gRPC listener. @@ -128,12 +85,42 @@ type OutboundOptions struct { SignedJWTKey []byte } -// GetOutboundGRPCClientConn gets the outbound gRPC client. -func GetOutboundGRPCClientConn(ctx context.Context, opts *OutboundOptions) (*grpc.ClientConn, error) { - return GetGRPCClientConn(ctx, "outbound", &Options{ +// newOutboundGRPCClientConn gets a new outbound gRPC client. +func newOutboundGRPCClientConn(ctx context.Context, opts *OutboundOptions) (*grpc.ClientConn, error) { + return NewGRPCClientConn(ctx, &Options{ Address: net.JoinHostPort("127.0.0.1", opts.OutboundPort), InstallationID: opts.InstallationID, ServiceName: opts.ServiceName, SignedJWTKey: opts.SignedJWTKey, }) } + +// CachedOutboundGRPClientConn keeps a cached outbound gRPC client connection open based on options. +type CachedOutboundGRPClientConn struct { + mu sync.Mutex + opts *OutboundOptions + current *grpc.ClientConn +} + +// Get gets the cached outbound gRPC client, or creates a new one if the options have changed. +func (cache *CachedOutboundGRPClientConn) Get(ctx context.Context, opts *OutboundOptions) (*grpc.ClientConn, error) { + cache.mu.Lock() + defer cache.mu.Unlock() + + if cache.current != nil && cmp.Equal(cache.opts, opts) { + return cache.current, nil + } + + if cache.current != nil { + _ = cache.current.Close() + cache.current = nil + } + + var err error + cache.current, err = newOutboundGRPCClientConn(ctx, opts) + if err != nil { + return nil, err + } + cache.opts = opts + return cache.current, nil +}