redis: use pubsub instead of keyspace events (#1450)

This commit is contained in:
Caleb Doxsey 2020-09-23 14:40:05 -06:00 committed by GitHub
parent 852c96f22f
commit f4c61a0cdc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 39 deletions

View file

@ -465,7 +465,6 @@ func (srv *Server) newDB(recordType string) (db storage.Backend, err error) {
db, err = redis.New(
srv.cfg.storageConnectionString,
recordType,
int64(srv.cfg.deletePermanentlyAfter.Seconds()),
redis.WithTLSConfig(tlsConfig),
)
if err != nil {

View file

@ -26,34 +26,33 @@ import (
// Name is the storage type name for redis backend.
const Name = config.StorageRedisName
const watchAction = "zadd"
var _ storage.Backend = (*DB)(nil)
// DB wraps redis conn to interact with redis server.
type DB struct {
pool *redis.Pool
deletePermanentlyAfter int64
recordType string
lastVersionKey string
versionSet string
deletedSet string
tlsConfig *tls.Config
notifyChMu sync.Mutex
pool *redis.Pool
recordType string
lastVersionKey string
lastVersionChannelKey string
versionSet string
deletedSet string
tlsConfig *tls.Config
notifyChMu sync.Mutex
closeOnce sync.Once
closed chan struct{}
}
// New returns new DB instance.
func New(rawURL, recordType string, deletePermanentAfter int64, opts ...Option) (*DB, error) {
func New(rawURL, recordType string, opts ...Option) (*DB, error) {
db := &DB{
deletePermanentlyAfter: deletePermanentAfter,
recordType: recordType,
versionSet: recordType + "_version_set",
deletedSet: recordType + "_deleted_set",
lastVersionKey: recordType + "_last_version",
closed: make(chan struct{}),
recordType: recordType,
versionSet: recordType + "_version_set",
deletedSet: recordType + "_deleted_set",
lastVersionKey: recordType + "_last_version",
lastVersionChannelKey: recordType + "_last_version_ch",
closed: make(chan struct{}),
}
for _, o := range opts {
@ -122,6 +121,7 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) (err error) {
{"MULTI": nil},
{"HSET": {db.recordType, id, string(b)}},
{"ZADD": {db.versionSet, lastVersion, id}},
{"PUBLISH": {db.lastVersionChannelKey, lastVersion}},
}
if err := db.tx(c, cmds); err != nil {
return err
@ -217,6 +217,7 @@ func (db *DB) Delete(ctx context.Context, id string) (err error) {
{"HSET": {db.recordType, id, string(b)}},
{"SADD": {db.deletedSet, id}},
{"ZADD": {db.versionSet, lastVersion, id}},
{"PUBLISH": {db.lastVersionChannelKey, lastVersion}},
}
if err := db.tx(c, cmds); err != nil {
return err
@ -265,7 +266,7 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) {
psc.Conn.Close()
}(&psc)
if err := db.subscribeRedisChannel(&psc); err != nil {
if err := psc.Subscribe(db.lastVersionChannelKey); err != nil {
log.Error().Err(err).Msg("failed to subscribe to version set channel")
return
}
@ -281,9 +282,6 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) {
case redis.Message:
log.Debug().Str("action", string(v.Data)).Msg("got redis message")
recordOperation(ctx, time.Now(), "sub_received", nil)
if string(v.Data) != watchAction {
continue
}
select {
case <-db.closed:
@ -315,7 +313,7 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) {
log.Warn().Msg("retry with new connection")
_ = psc.Conn.Close()
psc.Conn = db.pool.Get()
_ = db.subscribeRedisChannel(&psc)
_ = psc.Subscribe(db.lastVersionChannelKey)
}
}
}
@ -339,26 +337,11 @@ func (db *DB) watch(ctx context.Context, ch chan struct{}) {
}
}
func (db *DB) subscribeRedisChannel(psc *redis.PubSubConn) error {
return psc.PSubscribe("__keyspace*__:" + db.versionSet)
}
// Watch returns a channel to the caller, when there is a change to the version set,
// sending message to the channel to notify the caller.
func (db *DB) Watch(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
go func() {
c := db.pool.Get()
// Setup notifications, we only care about changes to db.version_set.
if _, err := c.Do("CONFIG", "SET", "notify-keyspace-events", "Kz"); err != nil {
log.Error().Err(err).Msg("failed to setup redis notification")
c.Close()
return
}
c.Close()
db.watch(ctx, ch)
}()
go db.watch(ctx, ch)
return ch
}

View file

@ -73,7 +73,7 @@ func runWithRedisDockerImage(t *testing.T, runOpts *dockertest.RunOptions, withT
address := fmt.Sprintf(scheme+"://localhost:%s/0", resource.GetPort("6379/tcp"))
if err := pool.Retry(func() error {
var err error
db, err = New(address, "record_type", int64(time.Hour.Seconds()), WithTLSConfig(tlsConfig(address, t)))
db, err = New(address, "record_type", WithTLSConfig(tlsConfig(address, t)))
if err != nil {
return err
}