pkg/storage/redis: fix multiple data race (#1210)

There are two data race in current code:

 - Call to close pub sub conn and renew pub sub conn
 - Call to close notify channel and send data to it

Fixing them by:

 - Moving pub sub conn creation/renew in the doNotifyLoop
 - Add a lock to guard before close/send data to channel, and also add
 another check for context was done at the beginning of notify loop.

Verifying by running:

	for _ in {1..100}; do
	  go test -race -count=1 ./pkg/storage/redis/...
	done

with no failure.
This commit is contained in:
Cuong Manh Le 2020-08-05 22:35:14 +07:00 committed by GitHub
parent 73abed0d21
commit f4cb5ea6e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
@ -38,6 +39,7 @@ type DB struct {
versionSet string versionSet string
deletedSet string deletedSet string
tlsConfig *tls.Config tlsConfig *tls.Config
notifyChMu sync.Mutex
} }
// New returns new DB instance. // New returns new DB instance.
@ -243,9 +245,23 @@ func (db *DB) ClearDeleted(ctx context.Context, cutoff time.Time) {
} }
// doNotifyLoop receives event from redis and send signal to the channel. // doNotifyLoop receives event from redis and send signal to the channel.
func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.PubSubConn) { func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) {
eb := backoff.NewExponentialBackOff() eb := backoff.NewExponentialBackOff()
psConn := db.pool.Get()
psc := redis.PubSubConn{Conn: psConn}
defer func(psc *redis.PubSubConn) {
psc.Conn.Close()
}(&psc)
if err := db.subscribeRedisChannel(&psc); err != nil {
log.Error().Err(err).Msg("failed to subscribe to version set channel")
return
}
for { for {
select {
case <-ctx.Done():
default:
}
switch v := psc.Receive().(type) { switch v := psc.Receive().(type) {
case redis.Message: case redis.Message:
log.Debug().Str("action", string(v.Data)).Msg("got redis message") log.Debug().Str("action", string(v.Data)).Msg("got redis message")
@ -253,12 +269,22 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.Pub
if string(v.Data) != watchAction { if string(v.Data) != watchAction {
continue continue
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Warn().Err(ctx.Err()).Msg("context done, stop receive from redis channel") log.Warn().Err(ctx.Err()).Msg("context done, stop receive from redis channel")
return return
default:
db.notifyChMu.Lock()
select {
case <-ctx.Done():
db.notifyChMu.Unlock()
log.Warn().Err(ctx.Err()).Msg("context done while holding notify lock, stop receive from redis channel")
return
case ch <- struct{}{}: case ch <- struct{}{}:
} }
db.notifyChMu.Unlock()
}
case error: case error:
log.Warn().Err(v).Msg("failed to receive from redis channel") log.Warn().Err(v).Msg("failed to receive from redis channel")
recordOperation(ctx, time.Now(), "sub_received", v) recordOperation(ctx, time.Now(), "sub_received", v)
@ -269,28 +295,22 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}, psc *redis.Pub
log.Warn().Msg("retry with new connection") log.Warn().Msg("retry with new connection")
_ = psc.Conn.Close() _ = psc.Conn.Close()
psc.Conn = db.pool.Get() psc.Conn = db.pool.Get()
_ = db.subscribeRedisChannel(psc) _ = db.subscribeRedisChannel(&psc)
} }
} }
} }
// watch runs the doNotifyLoop. It returns when ctx was done or doNotifyLoop exits. // watch runs the doNotifyLoop. It returns when ctx was done or doNotifyLoop exits.
func (db *DB) watch(ctx context.Context, ch chan struct{}) { func (db *DB) watch(ctx context.Context, ch chan struct{}) {
psConn := db.pool.Get() defer func() {
psc := redis.PubSubConn{Conn: psConn} db.notifyChMu.Lock()
defer func(psc *redis.PubSubConn) { close(ch)
psc.Conn.Close() db.notifyChMu.Unlock()
}(&psc) }()
if err := db.subscribeRedisChannel(&psc); err != nil {
log.Error().Err(err).Msg("failed to subscribe to version set channel")
return
}
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
db.doNotifyLoop(ctx, ch, &psc) db.doNotifyLoop(ctx, ch)
}() }()
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -308,10 +328,6 @@ func (db *DB) Watch(ctx context.Context) chan struct{} {
ch := make(chan struct{}) ch := make(chan struct{})
go func() { go func() {
c := db.pool.Get() c := db.pool.Get()
defer func() {
close(ch)
}()
// Setup notifications, we only care about changes to db.version_set. // Setup notifications, we only care about changes to db.version_set.
if _, err := c.Do("CONFIG", "SET", "notify-keyspace-events", "Kz"); err != nil { if _, err := c.Do("CONFIG", "SET", "notify-keyspace-events", "Kz"); err != nil {
log.Error().Err(err).Msg("failed to setup redis notification") log.Error().Err(err).Msg("failed to setup redis notification")