pkg/storage/redis: handling connection to redis backend failure (#1174)

In case of connection to backend failure, we try reconnecting with
backoff, to re-subscribe to the notification channel.

Fixes #1167
This commit is contained in:
Cuong Manh Le 2020-07-31 22:17:11 +07:00 committed by GitHub
parent f7ebf54305
commit fabf773e8b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -7,9 +7,9 @@ import (
"fmt"
"net"
"strconv"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/gomodule/redigo/redis"
@ -245,6 +245,7 @@ func (db *DB) ClearDeleted(ctx context.Context, cutoff time.Time) {
// doNotifyLoop receives event from redis and send signal to the channel.
func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.PubSubConn) {
eb := backoff.NewExponentialBackOff()
for {
switch v := psc.Receive().(type) {
case redis.Message:
@ -259,13 +260,15 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.Pub
case ch <- struct{}{}:
}
case error:
log.Error().Err(v).Msg("failed to receive from redis channel")
log.Warn().Err(v).Msg("failed to receive from redis channel")
if _, ok := v.(net.Error); ok {
return
}
if strings.HasPrefix(v.Error(), "redigo: connection closed") {
return
}
time.Sleep(eb.NextBackOff())
log.Warn().Msg("retry with new connection")
_ = psc.Conn.Close()
psc.Conn = db.pool.Get()
_ = db.subscribeRedisChannel(psc)
}
}
}
@ -273,9 +276,12 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.Pub
// watch runs the doNotifyLoop. It returns when ctx was done or doNotifyLoop exits.
func (db *DB) watch(ctx context.Context, ch chan struct{}) {
psConn := db.pool.Get()
defer psConn.Close()
psc := redis.PubSubConn{Conn: psConn}
if err := psc.PSubscribe("__keyspace*__:" + db.versionSet); err != nil {
defer func(psc *redis.PubSubConn) {
psc.Conn.Close()
}(&psc)
if err := db.subscribeRedisChannel(&psc); err != nil {
log.Error().Err(err).Msg("failed to subscribe to version set channel")
return
}
@ -291,6 +297,10 @@ func (db *DB) watch(ctx context.Context, ch chan struct{}) {
}
}
func (db *DB) subscribeRedisChannel(psc *redis.PubSubConn) error {
return psc.PSubscribe("__keyspace*__:" + db.versionSet)
}
// Watch returns a channel to the caller, when there is a change to the version set,
// sending message to the channel to notify the caller.
func (db *DB) Watch(ctx context.Context) chan struct{} {