redis: use pubsub instead of keyspace events (#1451)

This commit is contained in:
Caleb Doxsey 2020-09-23 15:52:18 -06:00 committed by GitHub
parent 70671a51d6
commit 5802204013
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 38 deletions

View file

@ -355,7 +355,6 @@ func (srv *Server) newDB(recordType string) (db storage.Backend, err error) {
db, err = redis.New(
srv.cfg.storageConnectionString,
recordType,
int64(srv.cfg.deletePermanentlyAfter.Seconds()),
redis.WithTLSConfig(srv.cfg.storageTLSConfig),
)
if err != nil {

View file

@ -26,30 +26,29 @@ import (
// Name is the storage type name for redis backend.
const Name = config.StorageRedisName
const watchAction = "zadd"
var _ storage.Backend = (*DB)(nil)
// DB wraps redis conn to interact with redis server.
type DB struct {
pool *redis.Pool
deletePermanentlyAfter int64
recordType string
lastVersionKey string
versionSet string
deletedSet string
tlsConfig *tls.Config
notifyChMu sync.Mutex
pool *redis.Pool
recordType string
lastVersionKey string
lastVersionChannelKey string
versionSet string
deletedSet string
tlsConfig *tls.Config
notifyChMu sync.Mutex
}
// New returns new DB instance.
func New(rawURL, recordType string, deletePermanentAfter int64, opts ...Option) (*DB, error) {
func New(rawURL, recordType string, opts ...Option) (*DB, error) {
db := &DB{
deletePermanentlyAfter: deletePermanentAfter,
recordType: recordType,
versionSet: recordType + "_version_set",
deletedSet: recordType + "_deleted_set",
lastVersionKey: recordType + "_last_version",
recordType: recordType,
versionSet: recordType + "_version_set",
deletedSet: recordType + "_deleted_set",
lastVersionKey: recordType + "_last_version",
lastVersionChannelKey: recordType + "_last_version_ch",
}
for _, o := range opts {
@ -110,6 +109,7 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) (err error) {
{"MULTI": nil},
{"HSET": {db.recordType, id, string(b)}},
{"ZADD": {db.versionSet, lastVersion, id}},
{"PUBLISH": {db.lastVersionChannelKey, lastVersion}},
}
if err := db.tx(c, cmds); err != nil {
return err
@ -205,6 +205,7 @@ func (db *DB) Delete(ctx context.Context, id string) (err error) {
{"HSET": {db.recordType, id, string(b)}},
{"SADD": {db.deletedSet, id}},
{"ZADD": {db.versionSet, lastVersion, id}},
{"PUBLISH": {db.lastVersionChannelKey, lastVersion}},
}
if err := db.tx(c, cmds); err != nil {
return err
@ -253,7 +254,7 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) {
psc.Conn.Close()
}(&psc)
if err := db.subscribeRedisChannel(&psc); err != nil {
if err := psc.Subscribe(db.lastVersionChannelKey); err != nil {
log.Error().Err(err).Msg("failed to subscribe to version set channel")
return
}
@ -266,9 +267,6 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) {
case redis.Message:
log.Debug().Str("action", string(v.Data)).Msg("got redis message")
recordOperation(ctx, time.Now(), "sub_received", nil)
if string(v.Data) != watchAction {
continue
}
select {
case <-ctx.Done():
@ -295,7 +293,7 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) {
log.Warn().Msg("retry with new connection")
_ = psc.Conn.Close()
psc.Conn = db.pool.Get()
_ = db.subscribeRedisChannel(&psc)
_ = psc.Subscribe(db.lastVersionChannelKey)
}
}
}
@ -318,26 +316,11 @@ func (db *DB) watch(ctx context.Context, ch chan struct{}) {
}
}
func (db *DB) subscribeRedisChannel(psc *redis.PubSubConn) error {
return psc.PSubscribe("__keyspace*__:" + db.versionSet)
}
// Watch returns a channel to the caller, when there is a change to the version set,
// sending message to the channel to notify the caller.
func (db *DB) Watch(ctx context.Context) <-chan struct{} {
ch := make(chan struct{})
go func() {
c := db.pool.Get()
// Setup notifications, we only care about changes to db.version_set.
if _, err := c.Do("CONFIG", "SET", "notify-keyspace-events", "Kz"); err != nil {
log.Error().Err(err).Msg("failed to setup redis notification")
c.Close()
return
}
c.Close()
db.watch(ctx, ch)
}()
go db.watch(ctx, ch)
return ch
}

View file

@ -71,7 +71,7 @@ func runWithRedisDockerImage(t *testing.T, runOpts *dockertest.RunOptions, withT
address := fmt.Sprintf(scheme+"://localhost:%s/0", resource.GetPort("6379/tcp"))
if err := pool.Retry(func() error {
var err error
db, err = New(address, "record_type", int64(time.Hour.Seconds()), WithTLSConfig(tlsConfig(address, t)))
db, err = New(address, "record_type", WithTLSConfig(tlsConfig(address, t)))
if err != nil {
return err
}