core/storage: fix nil data unmarshal (#4734)

This commit is contained in:
Caleb Doxsey 2023-11-10 13:16:22 -07:00 committed by GitHub
parent 15ca641b9c
commit d7ed62c350
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 17 deletions

View file

@ -51,18 +51,20 @@ func TestBackend(t *testing.T) {
}) })
t.Run("delete", func(t *testing.T) { t.Run("delete", func(t *testing.T) {
serverVersion, err := backend.Put(ctx, []*databroker.Record{ serverVersion, err := backend.Put(ctx, []*databroker.Record{{
{ Type: "test-1",
Type: "test-1", Id: "r3",
Id: "r3", DeletedAt: timestamppb.Now(),
Data: protoutil.NewAny(protoutil.NewStructMap(map[string]*structpb.Value{ }})
"k1": protoutil.NewStructString("v1"),
})),
DeletedAt: timestamppb.Now(),
},
})
assert.NotEqual(t, 0, serverVersion) assert.NotEqual(t, 0, serverVersion)
assert.NoError(t, err) assert.NoError(t, err)
stream, err := backend.Sync(ctx, "test-1", serverVersion, 0)
require.NoError(t, err)
t.Cleanup(func() { _ = stream.Close() })
records, err := storage.RecordStreamToList(stream)
require.NoError(t, err)
assert.NotEmpty(t, records)
}) })
t.Run("capacity", func(t *testing.T) { t.Run("capacity", func(t *testing.T) {

View file

@ -126,13 +126,17 @@ func getNextChangedRecord(ctx context.Context, q querier, recordType string, aft
return nil, fmt.Errorf("error querying next changed record: %w", err) return nil, fmt.Errorf("error querying next changed record: %w", err)
} }
a, err := protoutil.UnmarshalAnyJSON(data) // data may be nil if a record is deleted
if isUnknownType(err) { var a *anypb.Any
a = protoutil.ToAny(protoutil.ToStruct(map[string]string{ if len(data) != 0 {
"id": recordID, a, err = protoutil.UnmarshalAnyJSON(data)
})) if isUnknownType(err) {
} else if err != nil { a = protoutil.ToAny(protoutil.ToStruct(map[string]string{
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err) "id": recordID,
}))
} else if err != nil {
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}
} }
return &databroker.Record{ return &databroker.Record{