postgres: return unknown records instead of skipping them (#3876)

This commit is contained in:
Caleb Doxsey 2023-01-09 15:10:52 -07:00 committed by GitHub
parent 9677e18bbd
commit 92b50683ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 41 deletions

View file

@ -173,10 +173,11 @@ func TestBackend(t *testing.T) {
_, err = backend.Get(ctx, "unknown", "1") _, err = backend.Get(ctx, "unknown", "1")
assert.ErrorIs(t, err, storage.ErrNotFound) assert.ErrorIs(t, err, storage.ErrNotFound)
_, _, stream, err := backend.SyncLatest(ctx, "unknown-test", nil) _, _, stream, err := backend.SyncLatest(ctx, "unknown", nil)
if assert.NoError(t, err) { if assert.NoError(t, err) {
_, err := storage.RecordStreamToList(stream) records, err := storage.RecordStreamToList(stream)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, records, 1)
stream.Close() stream.Close()
} }
}) })

View file

@ -97,51 +97,49 @@ func getLatestRecordVersion(ctx context.Context, q querier) (recordVersion uint6
} }
func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) { func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) {
for { var recordID string
var recordID string var version uint64
var version uint64 var data []byte
var data []byte var modifiedAt pgtype.Timestamptz
var modifiedAt pgtype.Timestamptz var deletedAt pgtype.Timestamptz
var deletedAt pgtype.Timestamptz query := `
query := `
SELECT type, id, version, data, modified_at, deleted_at SELECT type, id, version, data, modified_at, deleted_at
FROM ` + schemaName + `.` + recordChangesTableName + ` FROM ` + schemaName + `.` + recordChangesTableName + `
WHERE version > $1 WHERE version > $1
` `
args := []any{afterRecordVersion} args := []any{afterRecordVersion}
if recordType != "" { if recordType != "" {
query += ` AND type = $2` query += ` AND type = $2`
args = append(args, recordType) args = append(args, recordType)
} }
query += ` query += `
ORDER BY version ASC ORDER BY version ASC
LIMIT 1 LIMIT 1
` `
err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt) err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt)
if isNotFound(err) { if isNotFound(err) {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} else if err != nil { } else if err != nil {
return nil, fmt.Errorf("error querying next changed record: %w", err) return nil, fmt.Errorf("error querying next changed record: %w", err)
}
afterRecordVersion = version
any, err := protoutil.UnmarshalAnyJSON(data)
if isUnknownType(err) {
// ignore
continue
} else if err != nil {
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}
return &databroker.Record{
Version: version,
Type: recordType,
Id: recordID,
Data: any,
ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
DeletedAt: timestamppbFromTimestamptz(deletedAt),
}, nil
} }
any, err := protoutil.UnmarshalAnyJSON(data)
if isUnknownType(err) {
any = protoutil.ToAny(protoutil.ToStruct(map[string]string{
"id": recordID,
}))
} else if err != nil {
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}
return &databroker.Record{
Version: version,
Type: recordType,
Id: recordID,
Data: any,
ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
DeletedAt: timestamppbFromTimestamptz(deletedAt),
}, nil
} }
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) { func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
@ -229,8 +227,9 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression,
any, err := protoutil.UnmarshalAnyJSON(data) any, err := protoutil.UnmarshalAnyJSON(data)
if isUnknownType(err) { if isUnknownType(err) {
// ignore records with an unknown type any = protoutil.ToAny(protoutil.ToStruct(map[string]string{
continue "id": id,
}))
} else if err != nil { } else if err != nil {
return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err) return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
} }