diff --git a/pkg/storage/postgres/backend_test.go b/pkg/storage/postgres/backend_test.go index 69a030731..16c6dace3 100644 --- a/pkg/storage/postgres/backend_test.go +++ b/pkg/storage/postgres/backend_test.go @@ -173,10 +173,11 @@ func TestBackend(t *testing.T) { _, err = backend.Get(ctx, "unknown", "1") assert.ErrorIs(t, err, storage.ErrNotFound) - _, _, stream, err := backend.SyncLatest(ctx, "unknown-test", nil) + _, _, stream, err := backend.SyncLatest(ctx, "unknown", nil) if assert.NoError(t, err) { - _, err := storage.RecordStreamToList(stream) + records, err := storage.RecordStreamToList(stream) assert.NoError(t, err) + assert.Len(t, records, 1) stream.Close() } }) diff --git a/pkg/storage/postgres/postgres.go b/pkg/storage/postgres/postgres.go index e0b199b78..ed561fcc6 100644 --- a/pkg/storage/postgres/postgres.go +++ b/pkg/storage/postgres/postgres.go @@ -97,51 +97,49 @@ func getLatestRecordVersion(ctx context.Context, q querier) (recordVersion uint6 } func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) { - for { - var recordID string - var version uint64 - var data []byte - var modifiedAt pgtype.Timestamptz - var deletedAt pgtype.Timestamptz - query := ` + var recordID string + var version uint64 + var data []byte + var modifiedAt pgtype.Timestamptz + var deletedAt pgtype.Timestamptz + query := ` SELECT type, id, version, data, modified_at, deleted_at FROM ` + schemaName + `.` + recordChangesTableName + ` WHERE version > $1 ` - args := []any{afterRecordVersion} - if recordType != "" { - query += ` AND type = $2` - args = append(args, recordType) - } - query += ` + args := []any{afterRecordVersion} + if recordType != "" { + query += ` AND type = $2` + args = append(args, recordType) + } + query += ` ORDER BY version ASC LIMIT 1 ` - err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt) - if isNotFound(err) { - return nil, storage.ErrNotFound - } else if err != nil { - return nil, fmt.Errorf("error querying next changed record: %w", err) - } - afterRecordVersion = version - - any, err := protoutil.UnmarshalAnyJSON(data) - if isUnknownType(err) { - // ignore - continue - } else if err != nil { - return nil, fmt.Errorf("error unmarshaling changed record data: %w", err) - } - - return &databroker.Record{ - Version: version, - Type: recordType, - Id: recordID, - Data: any, - ModifiedAt: timestamppbFromTimestamptz(modifiedAt), - DeletedAt: timestamppbFromTimestamptz(deletedAt), - }, nil + err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt) + if isNotFound(err) { + return nil, storage.ErrNotFound + } else if err != nil { + return nil, fmt.Errorf("error querying next changed record: %w", err) } + + any, err := protoutil.UnmarshalAnyJSON(data) + if isUnknownType(err) { + any = protoutil.ToAny(protoutil.ToStruct(map[string]string{ + "id": recordID, + })) + } else if err != nil { + return nil, fmt.Errorf("error unmarshaling changed record data: %w", err) + } + + return &databroker.Record{ + Version: version, + Type: recordType, + Id: recordID, + Data: any, + ModifiedAt: timestamppbFromTimestamptz(modifiedAt), + DeletedAt: timestamppbFromTimestamptz(deletedAt), + }, nil } func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) { @@ -229,8 +227,9 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression, any, err := protoutil.UnmarshalAnyJSON(data) if isUnknownType(err) { - // ignore records with an unknown type - continue + any = protoutil.ToAny(protoutil.ToStruct(map[string]string{ + "id": id, + })) } else if err != nil { return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err) }