databroker: add options for maximum capacity (#2095)

* databroker: add options

* implement redis

* add trace for enforce options
This commit is contained in:
Caleb Doxsey 2021-04-26 17:14:54 -06:00 committed by GitHub
parent b3216ae854
commit 636b3d6846
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1085 additions and 419 deletions

View file

@ -19,11 +19,6 @@ import (
"github.com/pomerium/pomerium/pkg/storage"
)
type recordKey struct {
Type string
ID string
}
type recordChange struct {
record *databroker.Record
}
@ -46,9 +41,10 @@ type Backend struct {
closeOnce sync.Once
closed chan struct{}
mu sync.RWMutex
lookup map[recordKey]*databroker.Record
changes *btree.BTree
mu sync.RWMutex
lookup map[string]*RecordCollection
capacity map[string]*uint64
changes *btree.BTree
}
// New creates a new in-memory backend storage.
@ -58,7 +54,8 @@ func New(options ...Option) *Backend {
cfg: cfg,
onChange: signal.New(),
closed: make(chan struct{}),
lookup: make(map[recordKey]*databroker.Record),
lookup: make(map[string]*RecordCollection),
capacity: map[string]*uint64{},
changes: btree.New(cfg.degree),
}
if cfg.expiry != 0 {
@ -110,7 +107,8 @@ func (backend *Backend) Close() error {
backend.mu.Lock()
defer backend.mu.Unlock()
backend.lookup = map[recordKey]*databroker.Record{}
backend.lookup = map[string]*RecordCollection{}
backend.capacity = map[string]*uint64{}
backend.changes = btree.New(backend.cfg.degree)
})
return nil
@ -121,9 +119,13 @@ func (backend *Backend) Get(_ context.Context, recordType, id string) (*databrok
backend.mu.RLock()
defer backend.mu.RUnlock()
key := recordKey{Type: recordType, ID: id}
record, ok := backend.lookup[key]
if !ok {
records := backend.lookup[recordType]
if records == nil {
return nil, storage.ErrNotFound
}
record := records.Get(id)
if record == nil {
return nil, storage.ErrNotFound
}
@ -135,11 +137,26 @@ func (backend *Backend) GetAll(_ context.Context) ([]*databroker.Record, uint64,
backend.mu.RLock()
defer backend.mu.RUnlock()
var records []*databroker.Record
for _, record := range backend.lookup {
records = append(records, dup(record))
var all []*databroker.Record
for _, rs := range backend.lookup {
for _, r := range rs.List() {
all = append(all, dup(r))
}
}
return records, backend.lastVersion, nil
return all, backend.lastVersion, nil
}
// GetOptions returns the options for a type in the in-memory store.
func (backend *Backend) GetOptions(_ context.Context, recordType string) (*databroker.Options, error) {
backend.mu.RLock()
defer backend.mu.RUnlock()
options := new(databroker.Options)
if capacity := backend.capacity[recordType]; capacity != nil {
options.Capacity = proto.Uint64(*capacity)
}
return options, nil
}
// Put puts a record into the in-memory store.
@ -158,15 +175,35 @@ func (backend *Backend) Put(ctx context.Context, record *databroker.Record) erro
defer backend.mu.Unlock()
defer backend.onChange.Broadcast(ctx)
record.ModifiedAt = timestamppb.Now()
record.Version = backend.nextVersion()
backend.changes.ReplaceOrInsert(recordChange{record: dup(record)})
backend.recordChange(record)
c, ok := backend.lookup[record.GetType()]
if !ok {
c = NewRecordCollection()
backend.lookup[record.GetType()] = c
}
key := recordKey{Type: record.GetType(), ID: record.GetId()}
if record.GetDeletedAt() != nil {
delete(backend.lookup, key)
c.Delete(record.GetId())
} else {
backend.lookup[key] = dup(record)
c.Put(dup(record))
}
backend.enforceCapacity(record.GetType())
return nil
}
// SetOptions sets the options for a type in the in-memory store.
func (backend *Backend) SetOptions(_ context.Context, recordType string, options *databroker.Options) error {
backend.mu.Lock()
defer backend.mu.Unlock()
if options.Capacity == nil {
delete(backend.capacity, recordType)
} else {
backend.capacity[recordType] = proto.Uint64(options.GetCapacity())
backend.enforceCapacity(recordType)
}
return nil
@ -177,6 +214,41 @@ func (backend *Backend) Sync(ctx context.Context, version uint64) (storage.Recor
return newRecordStream(ctx, backend, version), nil
}
func (backend *Backend) recordChange(record *databroker.Record) {
record.ModifiedAt = timestamppb.Now()
record.Version = backend.nextVersion()
backend.changes.ReplaceOrInsert(recordChange{record: dup(record)})
}
func (backend *Backend) enforceCapacity(recordType string) {
collection, ok := backend.lookup[recordType]
if !ok {
return
}
ptr := backend.capacity[recordType]
if ptr == nil {
return
}
capacity := *ptr
if collection.Len() <= int(capacity) {
return
}
records := collection.List()
for len(records) > int(capacity) {
// delete the record
record := dup(records[0])
record.DeletedAt = timestamppb.Now()
backend.recordChange(record)
collection.Delete(record.GetId())
// move forward
records = records[1:]
}
}
func (backend *Backend) getSince(version uint64) []*databroker.Record {
backend.mu.RLock()
defer backend.mu.RUnlock()