From d705b219ea88879b9d1194159ca49fd62b82fe17 Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Fri, 4 Jun 2021 07:03:55 -0600 Subject: [PATCH] redis: enforce capacity via ZREVRANGE to avoid race (#2267) --- pkg/storage/redis/redis.go | 48 +++++++++++++++----------------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index 5e54dd9b5..0bd139e7a 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -312,15 +312,15 @@ func (backend *Backend) put(ctx context.Context, record *databroker.Record) erro p.HDel(ctx, key, field) } else { p.HSet(ctx, key, field, bs) + p.ZAdd(ctx, getRecordTypeChangesKey(record.GetType()), &redis.Z{ + Score: float64(record.GetModifiedAt().GetSeconds()), + Member: record.GetId(), + }) } p.ZAdd(ctx, changesSetKey, &redis.Z{ Score: float64(version), Member: bs, }) - p.ZAdd(ctx, getRecordTypeChangesKey(record.GetType()), &redis.Z{ - Score: float64(record.GetModifiedAt().GetSeconds()), - Member: record.GetId(), - }) return nil }) } @@ -342,40 +342,30 @@ func (backend *Backend) enforceOptions(ctx context.Context, recordType string) e key := getRecordTypeChangesKey(recordType) - // enforce capacity by retrieving the size of the collection and removing excess items, oldest first - - sz, err := backend.client.ZCard(ctx, key).Uint64() - if err == redis.Nil { - return nil - } else if err != nil { - return err - } - - removeCnt := sz - *options.Capacity - if removeCnt <= 0 { - // nothing to do - return nil - } - - // remove the oldest records - zs, err := backend.client.ZPopMin(ctx, key, int64(removeCnt)).Result() + // find oldest records that exceed the capacity + recordIDs, err := backend.client.ZRevRange(ctx, key, int64(*options.Capacity), -1).Result() if err != nil { return err } - for _, z := range zs { - recordID := z.Member.(string) - + // for each record, delete it + for _, recordID := range recordIDs { record, err := backend.Get(ctx, recordType, recordID) - if errors.Is(err, storage.ErrNotFound) { - continue + if err == nil { + // mark the record as deleted and re-submit + record.DeletedAt = timestamppb.Now() + err = backend.put(ctx, record) + if err != nil { + return err + } + } else if errors.Is(err, storage.ErrNotFound) { + // ignore } else if err != nil { return err } - // mark the record as deleted and re-submit - record.DeletedAt = timestamppb.Now() - err = backend.put(ctx, record) + // remove the member from the collection + _, err = backend.client.ZRem(ctx, key, recordID).Result() if err != nil { return err }