mirror of
https://github.com/pomerium/pomerium.git
synced 2025-06-05 04:13:11 +02:00
pkg/storage/redis: do not use timeout to signal redis conn to stop (#1155)
Instead, we run the loop in goroutine, and when context was done, closing the underlying connection of PubSubConn, so the Receive will return. Fixes #1154
This commit is contained in:
parent
557aef2a33
commit
05545b3e1d
1 changed files with 35 additions and 56 deletions
|
@ -8,7 +8,6 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
|
@ -212,68 +211,48 @@ func (db *DB) ClearDeleted(_ context.Context, cutoff time.Time) {
|
|||
}
|
||||
}
|
||||
|
||||
// doNotify receives event from redis and signal the channel that something happened.
|
||||
func doNotify(ctx context.Context, psc *redis.PubSubConn, ch chan struct{}) error {
|
||||
switch v := psc.ReceiveWithTimeout(time.Second).(type) {
|
||||
case redis.Message:
|
||||
log.Debug().Str("action", string(v.Data)).Msg("got redis message")
|
||||
if string(v.Data) != watchAction {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Warn().Err(ctx.Err()).Msg("unable to notify channel")
|
||||
return ctx.Err()
|
||||
case ch <- struct{}{}:
|
||||
}
|
||||
case error:
|
||||
log.Debug().Err(v).Msg("redis subscribe error")
|
||||
return v
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// doNotifyLoop tries to run doNotify forever.
|
||||
//
|
||||
// Because redis.PubSubConn does not support context, so it will block until it receives event, we can not use
|
||||
// context to signal it stops. We mitigate this case by using PubSubConn.ReceiveWithTimeout. In case of timeout
|
||||
// occurred, we return a nil error, so the caller of doNotifyLoop will re-create new connection to start new loop.
|
||||
func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.PubSubConn, eb *backoff.ExponentialBackOff) error {
|
||||
// doNotifyLoop receives event from redis and send signal to the channel.
|
||||
func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.PubSubConn) {
|
||||
for {
|
||||
err, ok := doNotify(ctx, psc, ch).(net.Error)
|
||||
if !ok && err != nil {
|
||||
log.Error().Err(ctx.Err()).Msg("failed to notify channel")
|
||||
return err
|
||||
}
|
||||
if ok && err.Timeout() {
|
||||
switch v := psc.Receive().(type) {
|
||||
case redis.Message:
|
||||
log.Debug().Str("action", string(v.Data)).Msg("got redis message")
|
||||
if string(v.Data) != watchAction {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(eb.NextBackOff()):
|
||||
log.Warn().Err(ctx.Err()).Msg("context done, stop receive from redis channel")
|
||||
return
|
||||
case ch <- struct{}{}:
|
||||
}
|
||||
case error:
|
||||
log.Error().Err(v).Msg("failed to receive from redis channel")
|
||||
if _, ok := v.(net.Error); ok {
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watchLoop runs the doNotifyLoop forever.
|
||||
//
|
||||
// If doNotifyLoop returns a nil error, watchLoop re-create the PubSubConn and start new iteration.
|
||||
func (db *DB) watchLoop(ctx context.Context, ch chan struct{}) {
|
||||
var psConn redis.Conn
|
||||
eb := backoff.NewExponentialBackOff()
|
||||
for {
|
||||
psConn = db.pool.Get()
|
||||
psc := redis.PubSubConn{Conn: psConn}
|
||||
if err := psc.PSubscribe("__keyspace*__:" + db.versionSet); err != nil {
|
||||
log.Error().Err(err).Msg("failed to subscribe to version set channel")
|
||||
psConn.Close()
|
||||
return
|
||||
}
|
||||
if err := db.doNotifyLoop(ctx, ch, &psc, eb); err != nil {
|
||||
psConn.Close()
|
||||
return
|
||||
}
|
||||
// watch runs the doNotifyLoop. It returns when ctx was done or doNotifyLoop exits.
|
||||
func (db *DB) watch(ctx context.Context, ch chan struct{}) {
|
||||
psConn := db.pool.Get()
|
||||
defer psConn.Close()
|
||||
psc := redis.PubSubConn{Conn: psConn}
|
||||
if err := psc.PSubscribe("__keyspace*__:" + db.versionSet); err != nil {
|
||||
log.Error().Err(err).Msg("failed to subscribe to version set channel")
|
||||
return
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
db.doNotifyLoop(ctx, ch, &psc)
|
||||
}()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,7 +273,7 @@ func (db *DB) Watch(ctx context.Context) chan struct{} {
|
|||
return
|
||||
}
|
||||
c.Close()
|
||||
db.watchLoop(ctx, ch)
|
||||
db.watch(ctx, ch)
|
||||
}()
|
||||
|
||||
return ch
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue