diff --git a/pkg/storage/querier.go b/pkg/storage/querier.go index c5e88ea4a..108e60f47 100644 --- a/pkg/storage/querier.go +++ b/pkg/storage/querier.go @@ -45,11 +45,6 @@ func WithQuerier(ctx context.Context, querier Querier) context.Context { return context.WithValue(ctx, querierKey{}, querier) } -// Query queries for records. -func (q *staticQuerier) Query(_ context.Context, req *databroker.QueryRequest, _ ...grpc.CallOption) (*databroker.QueryResponse, error) { - return QueryRecordCollections(q.records, req) -} - // MarshalQueryRequest marshales the query request. func MarshalQueryRequest(req *databroker.QueryRequest) ([]byte, error) { return (&proto.MarshalOptions{ diff --git a/pkg/storage/querier_caching_test.go b/pkg/storage/querier_caching_test.go index 186257055..48be9831c 100644 --- a/pkg/storage/querier_caching_test.go +++ b/pkg/storage/querier_caching_test.go @@ -36,8 +36,9 @@ func TestCachingQuerier(t *testing.T) { }) assert.NoError(t, err) assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ - Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, - TotalCount: 1, + Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, + TotalCount: 1, + RecordVersion: 1, }, res, protocmp.Transform())) res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{ @@ -46,8 +47,9 @@ func TestCachingQuerier(t *testing.T) { }) assert.NoError(t, err) assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ - Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, - TotalCount: 1, + Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, + TotalCount: 1, + RecordVersion: 1, }, res, protocmp.Transform()), "should use the cached version") res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{ @@ -57,8 +59,9 @@ func TestCachingQuerier(t *testing.T) { }) assert.NoError(t, err) assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ - Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, - TotalCount: 1, + Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, + TotalCount: 1, + RecordVersion: 1, }, res, protocmp.Transform()), "should use the cached version") res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{ @@ -68,7 +71,8 @@ func TestCachingQuerier(t *testing.T) { }) assert.NoError(t, err) assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ - Records: []*databrokerpb.Record{{Version: 2, Type: "t1", Id: "r1"}}, - TotalCount: 1, + Records: []*databrokerpb.Record{{Version: 2, Type: "t1", Id: "r1"}}, + TotalCount: 1, + RecordVersion: 2, }, res, protocmp.Transform()), "should query the new version") } diff --git a/pkg/storage/querier_static.go b/pkg/storage/querier_static.go index b42131899..6ce958b82 100644 --- a/pkg/storage/querier_static.go +++ b/pkg/storage/querier_static.go @@ -6,6 +6,7 @@ import ( "strconv" "github.com/google/uuid" + grpc "google.golang.org/grpc" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -75,3 +76,8 @@ func NewStaticRecord(typeURL string, msg proto.Message) *databroker.Record { } func (q *staticQuerier) InvalidateCache(_ context.Context, _ *databroker.QueryRequest) {} + +// Query queries for records. +func (q *staticQuerier) Query(_ context.Context, req *databroker.QueryRequest, _ ...grpc.CallOption) (*databroker.QueryResponse, error) { + return QueryRecordCollections(q.records, req) +} diff --git a/pkg/storage/record_collection.go b/pkg/storage/record_collection.go index a63fd2a99..c9257144d 100644 --- a/pkg/storage/record_collection.go +++ b/pkg/storage/record_collection.go @@ -328,6 +328,10 @@ func QueryRecordCollections( res := new(databroker.QueryResponse) for _, c := range cs { + if record, ok := c.Newest(); ok { + res.RecordVersion = max(res.RecordVersion, record.Version) + } + records, err := c.List(filter) if err != nil { return nil, err