diff --git a/internal/databroker/server.go b/internal/databroker/server.go index 290d873b6..dec3d742a 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -465,7 +465,6 @@ func (srv *Server) newDB(recordType string) (db storage.Backend, err error) { db, err = redis.New( srv.cfg.storageConnectionString, recordType, - int64(srv.cfg.deletePermanentlyAfter.Seconds()), redis.WithTLSConfig(tlsConfig), ) if err != nil { diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index dd61ec61a..4c2824b90 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -26,34 +26,33 @@ import ( // Name is the storage type name for redis backend. const Name = config.StorageRedisName -const watchAction = "zadd" var _ storage.Backend = (*DB)(nil) // DB wraps redis conn to interact with redis server. type DB struct { - pool *redis.Pool - deletePermanentlyAfter int64 - recordType string - lastVersionKey string - versionSet string - deletedSet string - tlsConfig *tls.Config - notifyChMu sync.Mutex + pool *redis.Pool + recordType string + lastVersionKey string + lastVersionChannelKey string + versionSet string + deletedSet string + tlsConfig *tls.Config + notifyChMu sync.Mutex closeOnce sync.Once closed chan struct{} } // New returns new DB instance. -func New(rawURL, recordType string, deletePermanentAfter int64, opts ...Option) (*DB, error) { +func New(rawURL, recordType string, opts ...Option) (*DB, error) { db := &DB{ - deletePermanentlyAfter: deletePermanentAfter, - recordType: recordType, - versionSet: recordType + "_version_set", - deletedSet: recordType + "_deleted_set", - lastVersionKey: recordType + "_last_version", - closed: make(chan struct{}), + recordType: recordType, + versionSet: recordType + "_version_set", + deletedSet: recordType + "_deleted_set", + lastVersionKey: recordType + "_last_version", + lastVersionChannelKey: recordType + "_last_version_ch", + closed: make(chan struct{}), } for _, o := range opts { @@ -122,6 +121,7 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) (err error) { {"MULTI": nil}, {"HSET": {db.recordType, id, string(b)}}, {"ZADD": {db.versionSet, lastVersion, id}}, + {"PUBLISH": {db.lastVersionChannelKey, lastVersion}}, } if err := db.tx(c, cmds); err != nil { return err @@ -217,6 +217,7 @@ func (db *DB) Delete(ctx context.Context, id string) (err error) { {"HSET": {db.recordType, id, string(b)}}, {"SADD": {db.deletedSet, id}}, {"ZADD": {db.versionSet, lastVersion, id}}, + {"PUBLISH": {db.lastVersionChannelKey, lastVersion}}, } if err := db.tx(c, cmds); err != nil { return err @@ -265,7 +266,7 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) { psc.Conn.Close() }(&psc) - if err := db.subscribeRedisChannel(&psc); err != nil { + if err := psc.Subscribe(db.lastVersionChannelKey); err != nil { log.Error().Err(err).Msg("failed to subscribe to version set channel") return } @@ -281,9 +282,6 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) { case redis.Message: log.Debug().Str("action", string(v.Data)).Msg("got redis message") recordOperation(ctx, time.Now(), "sub_received", nil) - if string(v.Data) != watchAction { - continue - } select { case <-db.closed: @@ -315,7 +313,7 @@ func (db *DB) doNotifyLoop(ctx context.Context, ch chan struct{}) { log.Warn().Msg("retry with new connection") _ = psc.Conn.Close() psc.Conn = db.pool.Get() - _ = db.subscribeRedisChannel(&psc) + _ = psc.Subscribe(db.lastVersionChannelKey) } } } @@ -339,26 +337,11 @@ func (db *DB) watch(ctx context.Context, ch chan struct{}) { } } -func (db *DB) subscribeRedisChannel(psc *redis.PubSubConn) error { - return psc.PSubscribe("__keyspace*__:" + db.versionSet) -} - // Watch returns a channel to the caller, when there is a change to the version set, // sending message to the channel to notify the caller. func (db *DB) Watch(ctx context.Context) <-chan struct{} { ch := make(chan struct{}) - go func() { - c := db.pool.Get() - // Setup notifications, we only care about changes to db.version_set. - if _, err := c.Do("CONFIG", "SET", "notify-keyspace-events", "Kz"); err != nil { - log.Error().Err(err).Msg("failed to setup redis notification") - c.Close() - return - } - c.Close() - db.watch(ctx, ch) - }() - + go db.watch(ctx, ch) return ch } diff --git a/pkg/storage/redis/redis_test.go b/pkg/storage/redis/redis_test.go index ed1c603d1..1a0d19613 100644 --- a/pkg/storage/redis/redis_test.go +++ b/pkg/storage/redis/redis_test.go @@ -73,7 +73,7 @@ func runWithRedisDockerImage(t *testing.T, runOpts *dockertest.RunOptions, withT address := fmt.Sprintf(scheme+"://localhost:%s/0", resource.GetPort("6379/tcp")) if err := pool.Retry(func() error { var err error - db, err = New(address, "record_type", int64(time.Hour.Seconds()), WithTLSConfig(tlsConfig(address, t))) + db, err = New(address, "record_type", WithTLSConfig(tlsConfig(address, t))) if err != nil { return err }