mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-06 10:21:05 +02:00
redis: refactor change signal test to be more deterministic (#2335)
This commit is contained in:
parent
a7ce0494dd
commit
0bca5c9556
1 changed files with 45 additions and 30 deletions
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
@ -129,39 +130,53 @@ func TestChangeSignal(t *testing.T) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
require.NoError(t, testutil.WithTestRedis(false, func(rawURL string) error {
|
require.NoError(t, testutil.WithTestRedis(false, func(rawURL string) error {
|
||||||
backend1, err := New(rawURL)
|
ready := make(chan struct{})
|
||||||
require.NoError(t, err)
|
var eg errgroup.Group
|
||||||
defer func() { _ = backend1.Close() }()
|
eg.Go(func() error {
|
||||||
|
backend, err := New(rawURL)
|
||||||
backend2, err := New(rawURL)
|
if err != nil {
|
||||||
require.NoError(t, err)
|
return err
|
||||||
defer func() { _ = backend2.Close() }()
|
|
||||||
|
|
||||||
ch := backend1.onChange.Bind()
|
|
||||||
defer backend1.onChange.Unbind(ch)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
ticker := time.NewTicker(time.Millisecond * 100)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
_, _ = backend2.Put(ctx, &databroker.Record{
|
|
||||||
Type: "TYPE",
|
|
||||||
Id: "ID",
|
|
||||||
})
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
defer func() { _ = backend.Close() }()
|
||||||
|
|
||||||
select {
|
ch := backend.onChange.Bind()
|
||||||
case <-ch:
|
defer backend.onChange.Unbind(ch)
|
||||||
case <-ctx.Done():
|
|
||||||
t.Fatal("expected signal to be fired when another backend triggers a change")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// signal the second backend that we're ready to receive a change
|
||||||
|
close(ready)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
eg.Go(func() error {
|
||||||
|
// wait for the first backend to be bound to the on change handler
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-ready:
|
||||||
|
}
|
||||||
|
|
||||||
|
backend, err := New(rawURL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() { _ = backend.Close() }()
|
||||||
|
|
||||||
|
// put a new value to trigger a change
|
||||||
|
_, err = backend.Put(ctx, &databroker.Record{
|
||||||
|
Type: "TYPE",
|
||||||
|
Id: "ID",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NoError(t, eg.Wait(), "expected signal to be fired when another backend triggers a change")
|
||||||
return nil
|
return nil
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue