circuit breaker: add temporary runtime flag to lift default connection/request limits

This commit is contained in:
Denis Mishin 2025-04-14 20:00:46 -04:00
parent 153a8efe9d
commit af1afabb0c
3 changed files with 121 additions and 1 deletions

View file

@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"fmt"
"math"
"net"
"net/url"
"strings"
@ -188,6 +189,17 @@ func (b *Builder) buildPolicyCluster(ctx context.Context, cfg *config.Config, po
}
}
if cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagTmpUnlimitedConnections) {
cluster.CircuitBreakers = &envoy_config_cluster_v3.CircuitBreakers{
Thresholds: []*envoy_config_cluster_v3.CircuitBreakers_Thresholds{{
Priority: envoy_config_core_v3.RoutingPriority_DEFAULT,
MaxConnections: wrapperspb.UInt32(math.MaxUint32),
MaxPendingRequests: wrapperspb.UInt32(math.MaxUint32),
MaxRequests: wrapperspb.UInt32(math.MaxUint32),
}},
}
}
cluster.AltStatName = getClusterStatsName(policy)
upstreamProtocol := getUpstreamProtocolForPolicy(ctx, policy)
@ -216,7 +228,6 @@ func (b *Builder) buildPolicyEndpoints(
) ([]Endpoint, error) {
var endpoints []Endpoint
for _, dst := range policy.To {
dst := dst
ts, err := b.buildPolicyTransportSocket(ctx, cfg, policy, dst.URL)
if err != nil {
return nil, err

View file

@ -0,0 +1,105 @@
package envoyconfig_test
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/interop"
"google.golang.org/grpc/interop/grpc_testing"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/testenv"
"github.com/pomerium/pomerium/internal/testenv/snippets"
"github.com/pomerium/pomerium/internal/testenv/upstreams"
)
func TestMaxGRPCConn(t *testing.T) {
const maxConnections = 1025
env := testenv.New(t)
env.Add(testenv.ModifierFunc(func(_ context.Context, cfg *config.Config) {
cfg.Options.RuntimeFlags[config.RuntimeFlagTmpUnlimitedConnections] = true
}))
up := upstreams.GRPC(insecure.NewCredentials())
srv := interop.NewTestServer()
grpc_testing.RegisterTestServiceServer(up, srv)
h2c := up.Route().
From(env.SubdomainURL("grpc-h2c")).
Policy(func(p *config.Policy) { p.AllowPublicUnauthenticatedAccess = true })
grpcTestRunner := func(ctx context.Context) error {
cc := up.Dial(h2c)
client := grpc_testing.NewTestServiceClient(cc)
call, err := client.FullDuplexCall(ctx)
if err != nil {
return fmt.Errorf("call: %w", err)
}
err = call.Send(&grpc_testing.StreamingOutputCallRequest{
ResponseParameters: []*grpc_testing.ResponseParameters{
{
Size: 17,
},
},
ResponseStatus: &grpc_testing.EchoStatus{
Message: "hello",
},
})
if err != nil {
return fmt.Errorf("send: %w", err)
}
resp, err := call.Recv()
if err != nil {
return fmt.Errorf("recv: %w", err)
}
if n := len(resp.Payload.Body); n != 17 {
return fmt.Errorf("got %d bytes, want 17", n)
}
if err != nil {
return fmt.Errorf("recv: %w", err)
}
return nil
}
env.AddUpstream(up)
env.Start()
snippets.WaitStartupComplete(env)
ctx, cancel := context.WithCancel(env.Context())
t.Cleanup(cancel)
ch := make(chan error)
for range maxConnections {
go func() {
if err := grpcTestRunner(ctx); err != nil {
ch <- err
return
}
ch <- nil
<-ctx.Done()
}()
}
for i := range maxConnections {
select {
case err := <-ch:
t.Logf("#%d: got response %v", i, err)
if !assert.NoError(t, err) {
cancel()
t.FailNow()
}
case <-ctx.Done():
t.Fatal("timeout")
}
}
cancel()
}

View file

@ -28,6 +28,10 @@ var (
// RuntimeFlagAddExtraMetricsLabels enables adding extra labels to metrics (host and installation id)
RuntimeFlagAddExtraMetricsLabels = runtimeFlag("add_extra_metrics_labels", true)
// RuntimeFlagTmpUnlimitedConnections enables unlimited connections to the upstream clusters
// this is temporary measure until circuit breaker options are added to the config
RuntimeFlagTmpUnlimitedConnections = runtimeFlag("tmp_unlimited_connections", false)
)
// RuntimeFlag is a runtime flag that can flip on/off certain features