diff --git a/config/envoyconfig/clusters.go b/config/envoyconfig/clusters.go index be095e3fe..95c7a7b53 100644 --- a/config/envoyconfig/clusters.go +++ b/config/envoyconfig/clusters.go @@ -4,6 +4,7 @@ import ( "context" "encoding/base64" "fmt" + "math" "net" "net/url" "strings" @@ -188,6 +189,17 @@ func (b *Builder) buildPolicyCluster(ctx context.Context, cfg *config.Config, po } } + if cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagTmpUnlimitedConnections) { + cluster.CircuitBreakers = &envoy_config_cluster_v3.CircuitBreakers{ + Thresholds: []*envoy_config_cluster_v3.CircuitBreakers_Thresholds{{ + Priority: envoy_config_core_v3.RoutingPriority_DEFAULT, + MaxConnections: wrapperspb.UInt32(math.MaxUint32), + MaxPendingRequests: wrapperspb.UInt32(math.MaxUint32), + MaxRequests: wrapperspb.UInt32(math.MaxUint32), + }}, + } + } + cluster.AltStatName = getClusterStatsName(policy) upstreamProtocol := getUpstreamProtocolForPolicy(ctx, policy) @@ -216,7 +228,6 @@ func (b *Builder) buildPolicyEndpoints( ) ([]Endpoint, error) { var endpoints []Endpoint for _, dst := range policy.To { - dst := dst ts, err := b.buildPolicyTransportSocket(ctx, cfg, policy, dst.URL) if err != nil { return nil, err diff --git a/config/envoyconfig/grpc_max_connections_test.go b/config/envoyconfig/grpc_max_connections_test.go new file mode 100644 index 000000000..9d57349f9 --- /dev/null +++ b/config/envoyconfig/grpc_max_connections_test.go @@ -0,0 +1,105 @@ +package envoyconfig_test + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/interop" + "google.golang.org/grpc/interop/grpc_testing" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/internal/testenv" + "github.com/pomerium/pomerium/internal/testenv/snippets" + "github.com/pomerium/pomerium/internal/testenv/upstreams" +) + +func TestMaxGRPCConn(t *testing.T) { + const maxConnections = 1025 + + env := testenv.New(t) + env.Add(testenv.ModifierFunc(func(_ context.Context, cfg *config.Config) { + cfg.Options.RuntimeFlags[config.RuntimeFlagTmpUnlimitedConnections] = true + })) + + up := upstreams.GRPC(insecure.NewCredentials()) + srv := interop.NewTestServer() + grpc_testing.RegisterTestServiceServer(up, srv) + + h2c := up.Route(). + From(env.SubdomainURL("grpc-h2c")). + Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true }) + + grpcTestRunner := func(ctx context.Context) error { + cc := up.Dial(h2c) + + client := grpc_testing.NewTestServiceClient(cc) + call, err := client.FullDuplexCall(ctx) + if err != nil { + return fmt.Errorf("call: %w", err) + } + err = call.Send(&grpc_testing.StreamingOutputCallRequest{ + ResponseParameters: []*grpc_testing.ResponseParameters{ + { + Size: 17, + }, + }, + ResponseStatus: &grpc_testing.EchoStatus{ + Message: "hello", + }, + }) + if err != nil { + return fmt.Errorf("send: %w", err) + } + + resp, err := call.Recv() + if err != nil { + return fmt.Errorf("recv: %w", err) + } + if n := len(resp.Payload.Body); n != 17 { + return fmt.Errorf("got %d bytes, want 17", n) + } + if err != nil { + return fmt.Errorf("recv: %w", err) + } + + return nil + } + + env.AddUpstream(up) + env.Start() + snippets.WaitStartupComplete(env) + + ctx, cancel := context.WithCancel(env.Context()) + t.Cleanup(cancel) + ch := make(chan error) + for range maxConnections { + go func() { + if err := grpcTestRunner(ctx); err != nil { + ch <- err + return + } + + ch <- nil + <-ctx.Done() + }() + } + + for i := range maxConnections { + select { + case err := <-ch: + t.Logf("#%d: got response %v", i, err) + + if !assert.NoError(t, err) { + cancel() + t.FailNow() + } + case <-ctx.Done(): + t.Fatal("timeout") + } + } + + cancel() +} diff --git a/config/runtime_flags.go b/config/runtime_flags.go index 8ba41cc81..55f56363f 100644 --- a/config/runtime_flags.go +++ b/config/runtime_flags.go @@ -28,6 +28,10 @@ var ( // RuntimeFlagAddExtraMetricsLabels enables adding extra labels to metrics (host and installation id) RuntimeFlagAddExtraMetricsLabels = runtimeFlag("add_extra_metrics_labels", true) + + // RuntimeFlagTmpUnlimitedConnections enables unlimited connections to the upstream clusters + // this is temporary measure until circuit breaker options are added to the config + RuntimeFlagTmpUnlimitedConnections = runtimeFlag("tmp_unlimited_connections", false) ) // RuntimeFlag is a runtime flag that can flip on/off certain features