fix record version in query response

This commit is contained in:
Caleb Doxsey 2025-04-10 10:32:04 -06:00
parent dd62c32739
commit 66178c1fbe
4 changed files with 22 additions and 13 deletions

View file

@ -45,11 +45,6 @@ func WithQuerier(ctx context.Context, querier Querier) context.Context {
return context.WithValue(ctx, querierKey{}, querier) return context.WithValue(ctx, querierKey{}, querier)
} }
// Query queries for records.
func (q *staticQuerier) Query(_ context.Context, req *databroker.QueryRequest, _ ...grpc.CallOption) (*databroker.QueryResponse, error) {
return QueryRecordCollections(q.records, req)
}
// MarshalQueryRequest marshales the query request. // MarshalQueryRequest marshales the query request.
func MarshalQueryRequest(req *databroker.QueryRequest) ([]byte, error) { func MarshalQueryRequest(req *databroker.QueryRequest) ([]byte, error) {
return (&proto.MarshalOptions{ return (&proto.MarshalOptions{

View file

@ -38,6 +38,7 @@ func TestCachingQuerier(t *testing.T) {
assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{
Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}},
TotalCount: 1, TotalCount: 1,
RecordVersion: 1,
}, res, protocmp.Transform())) }, res, protocmp.Transform()))
res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{ res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{
@ -48,6 +49,7 @@ func TestCachingQuerier(t *testing.T) {
assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{
Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}},
TotalCount: 1, TotalCount: 1,
RecordVersion: 1,
}, res, protocmp.Transform()), "should use the cached version") }, res, protocmp.Transform()), "should use the cached version")
res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{ res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{
@ -59,6 +61,7 @@ func TestCachingQuerier(t *testing.T) {
assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{
Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}}, Records: []*databrokerpb.Record{{Version: 1, Type: "t1", Id: "r1"}},
TotalCount: 1, TotalCount: 1,
RecordVersion: 1,
}, res, protocmp.Transform()), "should use the cached version") }, res, protocmp.Transform()), "should use the cached version")
res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{ res, err = storage.NewCachingQuerier(q2, cache).Query(ctx, &databrokerpb.QueryRequest{
@ -70,5 +73,6 @@ func TestCachingQuerier(t *testing.T) {
assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{ assert.Empty(t, cmp.Diff(&databrokerpb.QueryResponse{
Records: []*databrokerpb.Record{{Version: 2, Type: "t1", Id: "r1"}}, Records: []*databrokerpb.Record{{Version: 2, Type: "t1", Id: "r1"}},
TotalCount: 1, TotalCount: 1,
RecordVersion: 2,
}, res, protocmp.Transform()), "should query the new version") }, res, protocmp.Transform()), "should query the new version")
} }

View file

@ -6,6 +6,7 @@ import (
"strconv" "strconv"
"github.com/google/uuid" "github.com/google/uuid"
grpc "google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb" timestamppb "google.golang.org/protobuf/types/known/timestamppb"
@ -75,3 +76,8 @@ func NewStaticRecord(typeURL string, msg proto.Message) *databroker.Record {
} }
func (q *staticQuerier) InvalidateCache(_ context.Context, _ *databroker.QueryRequest) {} func (q *staticQuerier) InvalidateCache(_ context.Context, _ *databroker.QueryRequest) {}
// Query queries for records.
func (q *staticQuerier) Query(_ context.Context, req *databroker.QueryRequest, _ ...grpc.CallOption) (*databroker.QueryResponse, error) {
return QueryRecordCollections(q.records, req)
}

View file

@ -328,6 +328,10 @@ func QueryRecordCollections(
res := new(databroker.QueryResponse) res := new(databroker.QueryResponse)
for _, c := range cs { for _, c := range cs {
if record, ok := c.Newest(); ok {
res.RecordVersion = max(res.RecordVersion, record.Version)
}
records, err := c.List(filter) records, err := c.List(filter)
if err != nil { if err != nil {
return nil, err return nil, err