mirror of
https://github.com/pomerium/pomerium.git
synced 2025-04-30 10:56:28 +02:00
94 lines
2.1 KiB
Go
94 lines
2.1 KiB
Go
package inmemory
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
|
"github.com/pomerium/pomerium/pkg/storage"
|
|
)
|
|
|
|
func newSyncLatestRecordStream(
|
|
ctx context.Context,
|
|
backend *Backend,
|
|
recordType string,
|
|
expr storage.FilterExpression,
|
|
) (storage.RecordStream, error) {
|
|
filter, err := storage.RecordStreamFilterFromFilterExpression(expr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if recordType != "" {
|
|
filter = filter.And(func(record *databroker.Record) (keep bool) {
|
|
return record.GetType() == recordType
|
|
})
|
|
}
|
|
|
|
var ready []*databroker.Record
|
|
generator := func(_ context.Context, _ bool) (*databroker.Record, error) {
|
|
backend.mu.RLock()
|
|
for _, co := range backend.lookup {
|
|
for _, record := range co.List() {
|
|
if filter(record) {
|
|
ready = append(ready, record)
|
|
}
|
|
}
|
|
}
|
|
backend.mu.RUnlock()
|
|
return nil, storage.ErrStreamDone
|
|
}
|
|
|
|
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
|
|
generator,
|
|
func(_ context.Context, _ bool) (*databroker.Record, error) {
|
|
if len(ready) == 0 {
|
|
return nil, storage.ErrStreamDone
|
|
}
|
|
|
|
record := ready[0]
|
|
ready = ready[1:]
|
|
return dup(record), nil
|
|
},
|
|
}, nil), nil
|
|
}
|
|
|
|
func newSyncRecordStream(
|
|
ctx context.Context,
|
|
backend *Backend,
|
|
recordType string,
|
|
recordVersion uint64,
|
|
) storage.RecordStream {
|
|
changed := backend.onChange.Bind()
|
|
var ready []*databroker.Record
|
|
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
|
|
func(ctx context.Context, block bool) (*databroker.Record, error) {
|
|
if len(ready) > 0 {
|
|
record := ready[0]
|
|
ready = ready[1:]
|
|
return record, nil
|
|
}
|
|
|
|
for {
|
|
ready = backend.getSince(recordType, recordVersion)
|
|
|
|
if len(ready) > 0 {
|
|
// records are sorted by version,
|
|
// so update the local version to the last record
|
|
recordVersion = ready[len(ready)-1].GetVersion()
|
|
record := ready[0]
|
|
ready = ready[1:]
|
|
return record, nil
|
|
} else if !block {
|
|
return nil, storage.ErrStreamDone
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case <-changed:
|
|
}
|
|
}
|
|
},
|
|
}, func() {
|
|
backend.onChange.Unbind(changed)
|
|
})
|
|
}
|