databroker: add support for putting multiple records (#3291)

* databroker: add support for putting multiple records

* add OptimumPutRequestsFromRecords function

* replace GetAll with SyncLatest

* fix stream when there are no records
This commit is contained in:
Caleb Doxsey 2022-04-26 22:41:38 +00:00 committed by GitHub
parent 343fa43ed4
commit f73c5c615f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 790 additions and 660 deletions

View file

@ -2,117 +2,74 @@ package inmemory
import (
"context"
"sync"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/storage"
)
type recordStream struct {
ctx context.Context
backend *Backend
changed chan context.Context
ready []*databroker.Record
recordVersion uint64
closeOnce sync.Once
closed chan struct{}
}
func newRecordStream(ctx context.Context, backend *Backend, recordVersion uint64) *recordStream {
stream := &recordStream{
ctx: ctx,
backend: backend,
changed: backend.onChange.Bind(),
recordVersion: recordVersion,
closed: make(chan struct{}),
}
// if the backend is closed, close the stream
go func() {
select {
case <-stream.closed:
case <-backend.closed:
_ = stream.Close()
}
}()
return stream
}
func (stream *recordStream) fill() {
stream.ready = stream.backend.getSince(stream.recordVersion)
if len(stream.ready) > 0 {
// records are sorted by version,
// so update the local version to the last record
stream.recordVersion = stream.ready[len(stream.ready)-1].GetVersion()
}
}
func (stream *recordStream) Close() error {
stream.closeOnce.Do(func() {
stream.backend.onChange.Unbind(stream.changed)
close(stream.closed)
})
return nil
}
func (stream *recordStream) Next(wait bool) bool {
if len(stream.ready) > 0 {
stream.ready = stream.ready[1:]
}
if len(stream.ready) > 0 {
return true
}
for {
stream.fill()
if len(stream.ready) > 0 {
return true
}
if wait {
select {
case <-stream.ctx.Done():
return false
case <-stream.closed:
return false
case <-stream.changed:
// query for records again
func newSyncLatestRecordStream(
ctx context.Context,
backend *Backend,
) storage.RecordStream {
var ready []*databroker.Record
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
func(ctx context.Context, block bool) (*databroker.Record, error) {
backend.mu.RLock()
for _, co := range backend.lookup {
ready = append(ready, co.List()...)
}
} else {
return false
}
}
backend.mu.RUnlock()
return nil, storage.ErrStreamDone
},
func(ctx context.Context, block bool) (*databroker.Record, error) {
if len(ready) == 0 {
return nil, storage.ErrStreamDone
}
record := ready[0]
ready = ready[1:]
return dup(record), nil
},
}, nil)
}
func (stream *recordStream) Record() *databroker.Record {
var r *databroker.Record
if len(stream.ready) > 0 {
r = stream.ready[0]
}
return r
}
func (stream *recordStream) Err() error {
select {
case <-stream.ctx.Done():
return stream.ctx.Err()
default:
}
select {
case <-stream.backend.closed:
return storage.ErrStreamClosed
default:
}
select {
case <-stream.closed:
return storage.ErrStreamClosed
default:
}
return nil
func newSyncRecordStream(
ctx context.Context,
backend *Backend,
recordVersion uint64,
) storage.RecordStream {
changed := backend.onChange.Bind()
var ready []*databroker.Record
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
func(ctx context.Context, block bool) (*databroker.Record, error) {
if len(ready) > 0 {
record := ready[0]
ready = ready[1:]
return record, nil
}
for {
ready = backend.getSince(recordVersion)
if len(ready) > 0 {
// records are sorted by version,
// so update the local version to the last record
recordVersion = ready[len(ready)-1].GetVersion()
record := ready[0]
ready = ready[1:]
return record, nil
} else if !block {
return nil, storage.ErrStreamDone
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-changed:
}
}
},
}, func() {
backend.onChange.Unbind(changed)
})
}