pomerium/pkg/storage/inmemory/stream.go
Caleb Doxsey 1a5b8b606f
core/lint: upgrade golangci-lint, replace interface{} with any (#5099)
* core/lint: upgrade golangci-lint, replace interface{} with any

* regen proto
2024-05-02 14:33:52 -06:00

94 lines
2.1 KiB
Go

package inmemory
import (
"context"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/storage"
)
func newSyncLatestRecordStream(
ctx context.Context,
backend *Backend,
recordType string,
expr storage.FilterExpression,
) (storage.RecordStream, error) {
filter, err := storage.RecordStreamFilterFromFilterExpression(expr)
if err != nil {
return nil, err
}
if recordType != "" {
filter = filter.And(func(record *databroker.Record) (keep bool) {
return record.GetType() == recordType
})
}
var ready []*databroker.Record
generator := func(_ context.Context, _ bool) (*databroker.Record, error) {
backend.mu.RLock()
for _, co := range backend.lookup {
for _, record := range co.List() {
if filter(record) {
ready = append(ready, record)
}
}
}
backend.mu.RUnlock()
return nil, storage.ErrStreamDone
}
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
generator,
func(_ context.Context, _ bool) (*databroker.Record, error) {
if len(ready) == 0 {
return nil, storage.ErrStreamDone
}
record := ready[0]
ready = ready[1:]
return dup(record), nil
},
}, nil), nil
}
func newSyncRecordStream(
ctx context.Context,
backend *Backend,
recordType string,
recordVersion uint64,
) storage.RecordStream {
changed := backend.onChange.Bind()
var ready []*databroker.Record
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
func(ctx context.Context, block bool) (*databroker.Record, error) {
if len(ready) > 0 {
record := ready[0]
ready = ready[1:]
return record, nil
}
for {
ready = backend.getSince(recordType, recordVersion)
if len(ready) > 0 {
// records are sorted by version,
// so update the local version to the last record
recordVersion = ready[len(ready)-1].GetVersion()
record := ready[0]
ready = ready[1:]
return record, nil
} else if !block {
return nil, storage.ErrStreamDone
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-changed:
}
}
},
}, func() {
backend.onChange.Unbind(changed)
})
}