pomerium/internal/controlplane/xdsmgr/xdsmgr_test.go
Caleb Doxsey 1a5b8b606f
core/lint: upgrade golangci-lint, replace interface{} with any (#5099)
* core/lint: upgrade golangci-lint, replace interface{} with any

* regen proto
2024-05-02 14:33:52 -06:00

119 lines
3 KiB
Go

package xdsmgr
import (
"context"
"net"
"testing"
"time"
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/test/bufconn"
"github.com/pomerium/pomerium/internal/signal"
)
const bufSize = 1024 * 1024
func TestManager(t *testing.T) {
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
defer clearTimeout()
typeURL := "example.com/example"
stateChanged := signal.New()
origOnHandleDeltaRequest := onHandleDeltaRequest
defer func() { onHandleDeltaRequest = origOnHandleDeltaRequest }()
onHandleDeltaRequest = func(_ *streamState) {
stateChanged.Broadcast(ctx)
}
srv := grpc.NewServer()
mgr := NewManager(map[string][]*envoy_service_discovery_v3.Resource{
typeURL: {
{Name: "r1", Version: "1"},
},
})
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, mgr)
li := bufconn.Listen(bufSize)
go func() { _ = srv.Serve(li) }()
cc, err := grpc.Dial("test",
grpc.WithInsecure(),
grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
return li.Dial()
}))
if !assert.NoError(t, err) {
return
}
defer func() { _ = cc.Close() }()
client := envoy_service_discovery_v3.NewAggregatedDiscoveryServiceClient(cc)
t.Run("stream is disabled", func(t *testing.T) {
stream, err := client.StreamAggregatedResources(ctx)
if !assert.NoError(t, err) {
return
}
_, err = stream.Recv()
assert.Error(t, err, "only delta should be implemented")
assert.Equal(t, codes.Unimplemented, grpc.Code(err))
})
t.Run("updates", func(t *testing.T) {
stream, err := client.DeltaAggregatedResources(ctx)
if !assert.NoError(t, err) {
return
}
ch := stateChanged.Bind()
defer stateChanged.Unbind(ch)
ack := func(nonce string) {
err = stream.Send(&envoy_service_discovery_v3.DeltaDiscoveryRequest{
TypeUrl: typeURL,
ResponseNonce: nonce,
})
assert.NoError(t, err)
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-ch:
}
}
ack("")
msg, err := stream.Recv()
assert.NoError(t, err)
assert.NotEmpty(t, msg.GetNonce(), "nonce should not be empty")
assert.Equal(t, []*envoy_service_discovery_v3.Resource{
{Name: "r1", Version: "1"},
}, msg.GetResources())
ack(msg.Nonce)
mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
typeURL: {{Name: "r1", Version: "2"}},
})
msg, err = stream.Recv()
assert.NoError(t, err)
assert.Equal(t, []*envoy_service_discovery_v3.Resource{
{Name: "r1", Version: "2"},
}, msg.GetResources())
ack(msg.Nonce)
mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
typeURL: nil,
})
assert.Eventually(t, func() bool {
msg, err = stream.Recv()
require.NoError(t, err)
ack(msg.Nonce)
return assert.ObjectsAreEqual([]string{"r1"}, msg.GetRemovedResources())
}, time.Second*5, time.Millisecond)
})
}