mirror of
https://github.com/pomerium/pomerium.git
synced 2025-05-10 23:57:34 +02:00
connect: add gRPC keep-alive (#4961)
This commit is contained in:
parent
c6d1f17100
commit
2db2bd09a1
3 changed files with 18 additions and 4 deletions
|
@ -6,6 +6,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/keepalive"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/zero/apierror"
|
"github.com/pomerium/pomerium/internal/zero/apierror"
|
||||||
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
|
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
|
||||||
"github.com/pomerium/pomerium/internal/zero/grpcconn"
|
"github.com/pomerium/pomerium/internal/zero/grpcconn"
|
||||||
|
@ -34,6 +37,13 @@ const (
|
||||||
minTelemetryTokenTTL = time.Minute * 5
|
minTelemetryTokenTTL = time.Minute * 5
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// see https://github.com/pomerium/pomerium-zero/issues/1711
|
||||||
|
var connectClientKeepaliveParams = keepalive.ClientParameters{
|
||||||
|
Time: time.Minute, // send pings every minute
|
||||||
|
Timeout: time.Minute, // wait 1 minute for ping ack
|
||||||
|
PermitWithoutStream: false,
|
||||||
|
}
|
||||||
|
|
||||||
// WatchOption defines which events to watch for
|
// WatchOption defines which events to watch for
|
||||||
type WatchOption = connect_mux.WatchOption
|
type WatchOption = connect_mux.WatchOption
|
||||||
|
|
||||||
|
@ -60,7 +70,7 @@ func NewAPI(ctx context.Context, opts ...Option) (*API, error) {
|
||||||
|
|
||||||
connectGRPCConn, err := grpcconn.New(ctx, cfg.connectAPIEndpoint, func(ctx context.Context) (string, error) {
|
connectGRPCConn, err := grpcconn.New(ctx, cfg.connectAPIEndpoint, func(ctx context.Context) (string, error) {
|
||||||
return tokenCache.GetToken(ctx, minConnectTokenTTL)
|
return tokenCache.GetToken(ctx, minConnectTokenTTL)
|
||||||
})
|
}, grpc.WithKeepaliveParams(connectClientKeepaliveParams))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating connect grpc client: %w", err)
|
return nil, fmt.Errorf("error creating connect grpc client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,9 @@ func New(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
endpoint string,
|
endpoint string,
|
||||||
tokenProvider TokenProviderFn,
|
tokenProvider TokenProviderFn,
|
||||||
|
dialOpts ...grpc.DialOption,
|
||||||
) (*grpc.ClientConn, error) {
|
) (*grpc.ClientConn, error) {
|
||||||
cfg, err := getConfig(endpoint)
|
cfg, err := getConfig(endpoint, dialOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,8 +25,11 @@ type config struct {
|
||||||
// NewConfig returns a new Config from an endpoint string, that has to be in a URL format.
|
// NewConfig returns a new Config from an endpoint string, that has to be in a URL format.
|
||||||
// The endpoint can be either http:// or https:// that will be used to determine whether TLS should be used.
|
// The endpoint can be either http:// or https:// that will be used to determine whether TLS should be used.
|
||||||
// if port is not specified, it will be inferred from the scheme (80 for http, 443 for https).
|
// if port is not specified, it will be inferred from the scheme (80 for http, 443 for https).
|
||||||
func getConfig(endpoint string) (*config, error) {
|
func getConfig(
|
||||||
c := new(config)
|
endpoint string,
|
||||||
|
opts ...grpc.DialOption,
|
||||||
|
) (*config, error) {
|
||||||
|
c := &config{opts: opts}
|
||||||
err := c.parseEndpoint(endpoint)
|
err := c.parseEndpoint(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid endpoint: %w", err)
|
return nil, fmt.Errorf("invalid endpoint: %w", err)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue