pkg/storage/redis: move last version to redis (#1134)

So we can support multiple databroker servers, we can't do it if we
store last version inside Server struct.
This commit is contained in:
Cuong Manh Le 2020-07-24 11:27:11 +07:00 committed by GitHub
parent aedfbc4c71
commit a8fd23a2be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 6 deletions

View file

@ -5,7 +5,6 @@ import (
"context" "context"
"fmt" "fmt"
"strconv" "strconv"
"sync/atomic"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -27,7 +26,7 @@ type DB struct {
pool *redis.Pool pool *redis.Pool
deletePermanentlyAfter int64 deletePermanentlyAfter int64
recordType string recordType string
lastVersion uint64 lastVersionKey string
versionSet string versionSet string
deletedSet string deletedSet string
} }
@ -61,6 +60,7 @@ func New(address, recordType string, deletePermanentAfter int64) (*DB, error) {
recordType: recordType, recordType: recordType,
versionSet: "version_set", versionSet: "version_set",
deletedSet: "deleted_set", deletedSet: "deleted_set",
lastVersionKey: recordType + "_last_version",
} }
return db, nil return db, nil
} }
@ -75,11 +75,15 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) error {
record.CreatedAt = ptypes.TimestampNow() record.CreatedAt = ptypes.TimestampNow()
} }
lastVersion, err := redis.Int64(c.Do("INCR", db.lastVersionKey))
if err != nil {
return err
}
record.Data = data record.Data = data
record.ModifiedAt = ptypes.TimestampNow() record.ModifiedAt = ptypes.TimestampNow()
record.Type = db.recordType record.Type = db.recordType
record.Id = id record.Id = id
record.Version = fmt.Sprintf("%012X", atomic.AddUint64(&db.lastVersion, 1)) record.Version = fmt.Sprintf("%012X", lastVersion)
b, err := proto.Marshal(record) b, err := proto.Marshal(record)
if err != nil { if err != nil {
return err return err
@ -87,7 +91,7 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) error {
cmds := []map[string][]interface{}{ cmds := []map[string][]interface{}{
{"MULTI": nil}, {"MULTI": nil},
{"HSET": {db.recordType, id, string(b)}}, {"HSET": {db.recordType, id, string(b)}},
{"ZADD": {db.versionSet, db.lastVersion, id}}, {"ZADD": {db.versionSet, lastVersion, id}},
} }
if err := db.tx(c, cmds); err != nil { if err := db.tx(c, cmds); err != nil {
return err return err
@ -154,8 +158,14 @@ func (db *DB) Delete(ctx context.Context, id string) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to get record: %w", err) return fmt.Errorf("failed to get record: %w", err)
} }
lastVersion, err := redis.Int64(c.Do("INCR", db.lastVersionKey))
if err != nil {
return err
}
r.DeletedAt = ptypes.TimestampNow() r.DeletedAt = ptypes.TimestampNow()
r.Version = fmt.Sprintf("%012X", atomic.AddUint64(&db.lastVersion, 1)) r.Version = fmt.Sprintf("%012X", lastVersion)
b, err := proto.Marshal(r) b, err := proto.Marshal(r)
if err != nil { if err != nil {
return err return err
@ -164,7 +174,7 @@ func (db *DB) Delete(ctx context.Context, id string) error {
{"MULTI": nil}, {"MULTI": nil},
{"HSET": {db.recordType, id, string(b)}}, {"HSET": {db.recordType, id, string(b)}},
{"SADD": {db.deletedSet, id}}, {"SADD": {db.deletedSet, id}},
{"ZADD": {db.versionSet, db.lastVersion, id}}, {"ZADD": {db.versionSet, lastVersion, id}},
} }
if err := db.tx(c, cmds); err != nil { if err := db.tx(c, cmds); err != nil {
return err return err

View file

@ -38,6 +38,8 @@ func TestDB(t *testing.T) {
defer c.Close() defer c.Close()
cleanup(c, db, t) cleanup(c, db, t)
_, err = c.Do("DEL", db.lastVersionKey)
require.NoError(t, err)
t.Run("get missing record", func(t *testing.T) { t.Run("get missing record", func(t *testing.T) {
record, err := db.Get(ctx, id) record, err := db.Get(ctx, id)