mirror of
https://github.com/pomerium/pomerium.git
synced 2025-04-30 10:56:28 +02:00
redis: enforce capacity via ZREVRANGE to avoid race (#2267)
This commit is contained in:
parent
4af12c4bbb
commit
d705b219ea
1 changed files with 19 additions and 29 deletions
|
@ -312,15 +312,15 @@ func (backend *Backend) put(ctx context.Context, record *databroker.Record) erro
|
||||||
p.HDel(ctx, key, field)
|
p.HDel(ctx, key, field)
|
||||||
} else {
|
} else {
|
||||||
p.HSet(ctx, key, field, bs)
|
p.HSet(ctx, key, field, bs)
|
||||||
|
p.ZAdd(ctx, getRecordTypeChangesKey(record.GetType()), &redis.Z{
|
||||||
|
Score: float64(record.GetModifiedAt().GetSeconds()),
|
||||||
|
Member: record.GetId(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
p.ZAdd(ctx, changesSetKey, &redis.Z{
|
p.ZAdd(ctx, changesSetKey, &redis.Z{
|
||||||
Score: float64(version),
|
Score: float64(version),
|
||||||
Member: bs,
|
Member: bs,
|
||||||
})
|
})
|
||||||
p.ZAdd(ctx, getRecordTypeChangesKey(record.GetType()), &redis.Z{
|
|
||||||
Score: float64(record.GetModifiedAt().GetSeconds()),
|
|
||||||
Member: record.GetId(),
|
|
||||||
})
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -342,40 +342,30 @@ func (backend *Backend) enforceOptions(ctx context.Context, recordType string) e
|
||||||
|
|
||||||
key := getRecordTypeChangesKey(recordType)
|
key := getRecordTypeChangesKey(recordType)
|
||||||
|
|
||||||
// enforce capacity by retrieving the size of the collection and removing excess items, oldest first
|
// find oldest records that exceed the capacity
|
||||||
|
recordIDs, err := backend.client.ZRevRange(ctx, key, int64(*options.Capacity), -1).Result()
|
||||||
sz, err := backend.client.ZCard(ctx, key).Uint64()
|
|
||||||
if err == redis.Nil {
|
|
||||||
return nil
|
|
||||||
} else if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
removeCnt := sz - *options.Capacity
|
|
||||||
if removeCnt <= 0 {
|
|
||||||
// nothing to do
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove the oldest records
|
|
||||||
zs, err := backend.client.ZPopMin(ctx, key, int64(removeCnt)).Result()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, z := range zs {
|
// for each record, delete it
|
||||||
recordID := z.Member.(string)
|
for _, recordID := range recordIDs {
|
||||||
|
|
||||||
record, err := backend.Get(ctx, recordType, recordID)
|
record, err := backend.Get(ctx, recordType, recordID)
|
||||||
if errors.Is(err, storage.ErrNotFound) {
|
if err == nil {
|
||||||
continue
|
// mark the record as deleted and re-submit
|
||||||
|
record.DeletedAt = timestamppb.Now()
|
||||||
|
err = backend.put(ctx, record)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if errors.Is(err, storage.ErrNotFound) {
|
||||||
|
// ignore
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark the record as deleted and re-submit
|
// remove the member from the collection
|
||||||
record.DeletedAt = timestamppb.Now()
|
_, err = backend.client.ZRem(ctx, key, recordID).Result()
|
||||||
err = backend.put(ctx, record)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue