diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index 561966e33..e266a58c7 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "strconv" - "sync/atomic" "time" "github.com/golang/protobuf/proto" @@ -27,7 +26,7 @@ type DB struct { pool *redis.Pool deletePermanentlyAfter int64 recordType string - lastVersion uint64 + lastVersionKey string versionSet string deletedSet string } @@ -61,6 +60,7 @@ func New(address, recordType string, deletePermanentAfter int64) (*DB, error) { recordType: recordType, versionSet: "version_set", deletedSet: "deleted_set", + lastVersionKey: recordType + "_last_version", } return db, nil } @@ -75,11 +75,15 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) error { record.CreatedAt = ptypes.TimestampNow() } + lastVersion, err := redis.Int64(c.Do("INCR", db.lastVersionKey)) + if err != nil { + return err + } record.Data = data record.ModifiedAt = ptypes.TimestampNow() record.Type = db.recordType record.Id = id - record.Version = fmt.Sprintf("%012X", atomic.AddUint64(&db.lastVersion, 1)) + record.Version = fmt.Sprintf("%012X", lastVersion) b, err := proto.Marshal(record) if err != nil { return err @@ -87,7 +91,7 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) error { cmds := []map[string][]interface{}{ {"MULTI": nil}, {"HSET": {db.recordType, id, string(b)}}, - {"ZADD": {db.versionSet, db.lastVersion, id}}, + {"ZADD": {db.versionSet, lastVersion, id}}, } if err := db.tx(c, cmds); err != nil { return err @@ -154,8 +158,14 @@ func (db *DB) Delete(ctx context.Context, id string) error { if err != nil { return fmt.Errorf("failed to get record: %w", err) } + + lastVersion, err := redis.Int64(c.Do("INCR", db.lastVersionKey)) + if err != nil { + return err + } + r.DeletedAt = ptypes.TimestampNow() - r.Version = fmt.Sprintf("%012X", atomic.AddUint64(&db.lastVersion, 1)) + r.Version = fmt.Sprintf("%012X", lastVersion) b, err := proto.Marshal(r) if err != nil { return err @@ -164,7 +174,7 @@ func (db *DB) Delete(ctx context.Context, id string) error { {"MULTI": nil}, {"HSET": {db.recordType, id, string(b)}}, {"SADD": {db.deletedSet, id}}, - {"ZADD": {db.versionSet, db.lastVersion, id}}, + {"ZADD": {db.versionSet, lastVersion, id}}, } if err := db.tx(c, cmds); err != nil { return err diff --git a/pkg/storage/redis/redis_test.go b/pkg/storage/redis/redis_test.go index 05173a3c5..c70ce8bad 100644 --- a/pkg/storage/redis/redis_test.go +++ b/pkg/storage/redis/redis_test.go @@ -38,6 +38,8 @@ func TestDB(t *testing.T) { defer c.Close() cleanup(c, db, t) + _, err = c.Do("DEL", db.lastVersionKey) + require.NoError(t, err) t.Run("get missing record", func(t *testing.T) { record, err := db.Get(ctx, id)