postgres: return unknown records instead of skipping them (#3876) (#3877)

This commit is contained in:
Caleb Doxsey 2023-01-09 19:32:35 -07:00 committed by GitHub
parent 1761a39fb2
commit 7c69e612b6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 41 deletions

View file

@ -173,10 +173,11 @@ func TestBackend(t *testing.T) {
_, err = backend.Get(ctx, "unknown", "1") _, err = backend.Get(ctx, "unknown", "1")
assert.ErrorIs(t, err, storage.ErrNotFound) assert.ErrorIs(t, err, storage.ErrNotFound)
_, _, stream, err := backend.SyncLatest(ctx, "unknown-test", nil) _, _, stream, err := backend.SyncLatest(ctx, "unknown", nil)
if assert.NoError(t, err) { if assert.NoError(t, err) {
_, err := storage.RecordStreamToList(stream) records, err := storage.RecordStreamToList(stream)
assert.NoError(t, err) assert.NoError(t, err)
assert.Len(t, records, 1)
stream.Close() stream.Close()
} }
}) })

View file

@ -97,51 +97,49 @@ func getLatestRecordVersion(ctx context.Context, q querier) (recordVersion uint6
} }
func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) { func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) {
for { var recordID string
var recordID string var version uint64
var version uint64 var data pgtype.JSONB
var data pgtype.JSONB var modifiedAt pgtype.Timestamptz
var modifiedAt pgtype.Timestamptz var deletedAt pgtype.Timestamptz
var deletedAt pgtype.Timestamptz query := `
query := `
SELECT type, id, version, data, modified_at, deleted_at SELECT type, id, version, data, modified_at, deleted_at
FROM ` + schemaName + `.` + recordChangesTableName + ` FROM ` + schemaName + `.` + recordChangesTableName + `
WHERE version > $1 WHERE version > $1
` `
args := []any{afterRecordVersion} args := []any{afterRecordVersion}
if recordType != "" { if recordType != "" {
query += ` AND type = $2` query += ` AND type = $2`
args = append(args, recordType) args = append(args, recordType)
} }
query += ` query += `
ORDER BY version ASC ORDER BY version ASC
LIMIT 1 LIMIT 1
` `
err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt) err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt)
if isNotFound(err) { if isNotFound(err) {
return nil, storage.ErrNotFound return nil, storage.ErrNotFound
} else if err != nil { } else if err != nil {
return nil, fmt.Errorf("error querying next changed record: %w", err) return nil, fmt.Errorf("error querying next changed record: %w", err)
}
afterRecordVersion = version
any, err := protoutil.UnmarshalAnyJSON(data.Bytes)
if isUnknownType(err) {
// ignore
continue
} else if err != nil {
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}
return &databroker.Record{
Version: version,
Type: recordType,
Id: recordID,
Data: any,
ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
DeletedAt: timestamppbFromTimestamptz(deletedAt),
}, nil
} }
any, err := protoutil.UnmarshalAnyJSON(data.Bytes)
if isUnknownType(err) {
any = protoutil.ToAny(protoutil.ToStruct(map[string]string{
"id": recordID,
}))
} else if err != nil {
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}
return &databroker.Record{
Version: version,
Type: recordType,
Id: recordID,
Data: any,
ModifiedAt: timestamppbFromTimestamptz(modifiedAt),
DeletedAt: timestamppbFromTimestamptz(deletedAt),
}, nil
} }
func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) { func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) {
@ -229,8 +227,9 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression,
any, err := protoutil.UnmarshalAnyJSON(data.Bytes) any, err := protoutil.UnmarshalAnyJSON(data.Bytes)
if isUnknownType(err) { if isUnknownType(err) {
// ignore records with an unknown type any = protoutil.ToAny(protoutil.ToStruct(map[string]string{
continue "id": id,
}))
} else if err != nil { } else if err != nil {
return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err) return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err)
} }