storage: add filtering to SyncLatest (#3368)

* wip

* storage: add filtering to SyncLatest

* don't increment the record version, so intermediate changes are requested

* fix stream filter
This commit is contained in:
Caleb Doxsey 2022-05-17 22:00:23 +00:00 committed by GitHub
parent 363dd82802
commit 1669b601ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 88 additions and 49 deletions

View file

@ -145,7 +145,7 @@ func (srv *Server) Query(ctx context.Context, req *databroker.QueryRequest) (*da
return nil, err return nil, err
} }
_, stream, err := db.SyncLatest(ctx) _, _, stream, err := db.SyncLatest(ctx, req.GetType(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -332,17 +332,13 @@ func (srv *Server) SyncLatest(req *databroker.SyncLatestRequest, stream databrok
return err return err
} }
serverVersion, recordStream, err := backend.SyncLatest(ctx) serverVersion, recordVersion, recordStream, err := backend.SyncLatest(ctx, req.GetType(), nil)
if err != nil { if err != nil {
return err return err
} }
recordVersion := uint64(0)
for recordStream.Next(false) { for recordStream.Next(false) {
record := recordStream.Record() record := recordStream.Record()
if record.GetVersion() > recordVersion {
recordVersion = record.GetVersion()
}
if req.GetType() == "" || req.GetType() == record.GetType() { if req.GetType() == "" || req.GetType() == record.GetType() {
err = stream.Send(&databroker.SyncLatestResponse{ err = stream.Send(&databroker.SyncLatestResponse{
Response: &databroker.SyncLatestResponse_Record{ Response: &databroker.SyncLatestResponse_Record{

View file

@ -130,12 +130,16 @@ func (e *encryptedBackend) Sync(ctx context.Context, serverVersion, recordVersio
}, nil }, nil
} }
func (e *encryptedBackend) SyncLatest(ctx context.Context) (serverVersion uint64, stream RecordStream, err error) { func (e *encryptedBackend) SyncLatest(
serverVersion, stream, err = e.underlying.SyncLatest(ctx) ctx context.Context,
recordType string,
filter FilterExpression,
) (serverVersion, recordVersion uint64, stream RecordStream, err error) {
serverVersion, recordVersion, stream, err = e.underlying.SyncLatest(ctx, recordType, filter)
if err != nil { if err != nil {
return serverVersion, nil, err return serverVersion, recordVersion, nil, err
} }
return serverVersion, &encryptedRecordStream{ return serverVersion, recordVersion, &encryptedRecordStream{
underlying: stream, underlying: stream,
backend: e, backend: e,
}, nil }, nil

View file

@ -255,12 +255,18 @@ func (backend *Backend) Sync(ctx context.Context, serverVersion, recordVersion u
} }
// SyncLatest returns a record stream for all the records. // SyncLatest returns a record stream for all the records.
func (backend *Backend) SyncLatest(ctx context.Context) (serverVersion uint64, stream storage.RecordStream, err error) { func (backend *Backend) SyncLatest(
ctx context.Context,
recordType string,
expr storage.FilterExpression,
) (serverVersion, recordVersion uint64, stream storage.RecordStream, err error) {
backend.mu.RLock() backend.mu.RLock()
currentServerVersion := backend.serverVersion serverVersion = backend.serverVersion
recordVersion = backend.lastVersion
backend.mu.RUnlock() backend.mu.RUnlock()
return currentServerVersion, newSyncLatestRecordStream(ctx, backend), nil stream, err = newSyncLatestRecordStream(ctx, backend, recordType, expr)
return serverVersion, recordVersion, stream, err
} }
func (backend *Backend) recordChange(record *databroker.Record) { func (backend *Backend) recordChange(record *databroker.Record) {

View file

@ -210,7 +210,7 @@ func TestCapacity(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
_, stream, err := backend.SyncLatest(ctx) _, _, stream, err := backend.SyncLatest(ctx, "EXAMPLE", nil)
require.NoError(t, err) require.NoError(t, err)
records, err := storage.RecordStreamToList(stream) records, err := storage.RecordStreamToList(stream)

View file

@ -10,17 +10,35 @@ import (
func newSyncLatestRecordStream( func newSyncLatestRecordStream(
ctx context.Context, ctx context.Context,
backend *Backend, backend *Backend,
) storage.RecordStream { recordType string,
expr storage.FilterExpression,
) (storage.RecordStream, error) {
filter, err := storage.RecordStreamFilterFromFilterExpression(expr)
if err != nil {
return nil, err
}
if recordType != "" {
filter = filter.And(func(record *databroker.Record) (keep bool) {
return record.GetType() == recordType
})
}
var ready []*databroker.Record var ready []*databroker.Record
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{ generator := func(ctx context.Context, block bool) (*databroker.Record, error) {
func(ctx context.Context, block bool) (*databroker.Record, error) { backend.mu.RLock()
backend.mu.RLock() for _, co := range backend.lookup {
for _, co := range backend.lookup { for _, record := range co.List() {
ready = append(ready, co.List()...) if filter(record) {
ready = append(ready, record)
}
} }
backend.mu.RUnlock() }
return nil, storage.ErrStreamDone backend.mu.RUnlock()
}, return nil, storage.ErrStreamDone
}
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
generator,
func(ctx context.Context, block bool) (*databroker.Record, error) { func(ctx context.Context, block bool) (*databroker.Record, error) {
if len(ready) == 0 { if len(ready) == 0 {
return nil, storage.ErrStreamDone return nil, storage.ErrStreamDone
@ -30,7 +48,7 @@ func newSyncLatestRecordStream(
ready = ready[1:] ready = ready[1:]
return dup(record), nil return dup(record), nil
}, },
}, nil) }, nil), nil
} }
func newSyncRecordStream( func newSyncRecordStream(

View file

@ -254,12 +254,26 @@ func (backend *Backend) Sync(ctx context.Context, serverVersion, recordVersion u
// SyncLatest returns a record stream of all the records. Some records may be returned twice if the are updated while the // SyncLatest returns a record stream of all the records. Some records may be returned twice if the are updated while the
// stream is streaming. // stream is streaming.
func (backend *Backend) SyncLatest(ctx context.Context) (serverVersion uint64, stream storage.RecordStream, err error) { func (backend *Backend) SyncLatest(
ctx context.Context,
recordType string,
expr storage.FilterExpression,
) (serverVersion, recordVersion uint64, stream storage.RecordStream, err error) {
serverVersion, err = backend.getOrCreateServerVersion(ctx) serverVersion, err = backend.getOrCreateServerVersion(ctx)
if err != nil { if err != nil {
return 0, nil, err return serverVersion, recordVersion, nil, err
} }
return serverVersion, newSyncLatestRecordStream(ctx, backend), nil
recordVersion, err = backend.client.Get(ctx, lastVersionKey).Uint64()
if errors.Is(err, redis.Nil) {
// this happens if there are no records
err = nil
} else if err != nil {
return serverVersion, recordVersion, nil, err
}
stream, err = newSyncLatestRecordStream(ctx, backend, recordType, expr)
return serverVersion, recordVersion, stream, err
} }
func (backend *Backend) put(ctx context.Context, records []*databroker.Record) error { func (backend *Backend) put(ctx context.Context, records []*databroker.Record) error {

View file

@ -240,7 +240,7 @@ func TestCapacity(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
_, stream, err := backend.SyncLatest(ctx) _, _, stream, err := backend.SyncLatest(ctx, "EXAMPLE", nil)
require.NoError(t, err) require.NoError(t, err)
defer stream.Close() defer stream.Close()

View file

@ -63,23 +63,23 @@ func newSyncRecordStream(
func newSyncLatestRecordStream( func newSyncLatestRecordStream(
ctx context.Context, ctx context.Context,
backend *Backend, backend *Backend,
) storage.RecordStream { recordType string,
var recordVersion, cursor uint64 expr storage.FilterExpression,
) (storage.RecordStream, error) {
filter, err := storage.RecordStreamFilterFromFilterExpression(expr)
if err != nil {
return nil, err
}
if recordType != "" {
filter = filter.And(func(record *databroker.Record) (keep bool) {
return record.GetType() == recordType
})
}
var cursor uint64
scannedOnce := false scannedOnce := false
var scannedRecords []*databroker.Record var scannedRecords []*databroker.Record
return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{ generator := storage.FilteredRecordStreamGenerator(
// 1. get the current record version
func(ctx context.Context, block bool) (*databroker.Record, error) {
var err error
recordVersion, err = backend.client.Get(ctx, lastVersionKey).Uint64()
if errors.Is(err, redis.Nil) {
// this happens if there are no records
} else if err != nil {
return nil, err
}
return nil, storage.ErrStreamDone
},
// 2. stream all the records
func(ctx context.Context, block bool) (*databroker.Record, error) { func(ctx context.Context, block bool) (*databroker.Record, error) {
for { for {
if len(scannedRecords) > 0 { if len(scannedRecords) > 0 {
@ -102,11 +102,12 @@ func newSyncLatestRecordStream(
scannedOnce = true scannedOnce = true
} }
}, },
// 3. stream any records which have been updated in the interim filter,
func(ctx context.Context, block bool) (*databroker.Record, error) { )
return nextChangedRecord(ctx, backend, &recordVersion)
}, return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{
}, nil) generator,
}, nil), nil
} }
func nextScannedRecords(ctx context.Context, backend *Backend, cursor *uint64) ([]*databroker.Record, error) { func nextScannedRecords(ctx context.Context, backend *Backend, cursor *uint64) ([]*databroker.Record, error) {

View file

@ -40,7 +40,7 @@ type Backend interface {
// Sync syncs record changes after the specified version. // Sync syncs record changes after the specified version.
Sync(ctx context.Context, serverVersion, recordVersion uint64) (RecordStream, error) Sync(ctx context.Context, serverVersion, recordVersion uint64) (RecordStream, error)
// SyncLatest syncs all the records. // SyncLatest syncs all the records.
SyncLatest(ctx context.Context) (serverVersion uint64, stream RecordStream, err error) SyncLatest(ctx context.Context, recordType string, filter FilterExpression) (serverVersion, recordVersion uint64, stream RecordStream, err error)
} }
// MatchAny searches any data with a query. // MatchAny searches any data with a query.