diff --git a/pkg/storage/postgres/backend_test.go b/pkg/storage/postgres/backend_test.go index 0448fd817..e32dc7d70 100644 --- a/pkg/storage/postgres/backend_test.go +++ b/pkg/storage/postgres/backend_test.go @@ -163,6 +163,24 @@ func TestBackend(t *testing.T) { assert.NoError(t, stream.Err()) }) + t.Run("unknown type", func(t *testing.T) { + _, err := backend.pool.Exec(ctx, ` + INSERT INTO `+schemaName+"."+recordsTableName+` (type, id, version, data) + VALUES ('unknown', '1', 1000, '{"@type":"UNKNOWN","value":{}}') + `) + assert.NoError(t, err) + + _, err = backend.Get(ctx, "unknown", "1") + assert.ErrorIs(t, err, storage.ErrNotFound) + + _, _, stream, err := backend.SyncLatest(ctx, "unknown-test", nil) + if assert.NoError(t, err) { + _, err := storage.RecordStreamToList(stream) + assert.NoError(t, err) + stream.Close() + } + }) + return nil })) } diff --git a/pkg/storage/postgres/postgres.go b/pkg/storage/postgres/postgres.go index e3e2fe60d..797bbd0f7 100644 --- a/pkg/storage/postgres/postgres.go +++ b/pkg/storage/postgres/postgres.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "strings" "time" "github.com/jackc/pgconn" @@ -12,6 +13,7 @@ import ( "github.com/jackc/pgx/v4" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoregistry" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -94,46 +96,52 @@ func getLatestRecordVersion(ctx context.Context, q querier) (recordVersion uint6 } func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) { - var recordID string - var version uint64 - var data pgtype.JSONB - var modifiedAt pgtype.Timestamptz - var deletedAt pgtype.Timestamptz - query := ` - SELECT type, id, version, data, modified_at, deleted_at - FROM ` + schemaName + `.` + recordChangesTableName + ` - WHERE version > $1 - ` - args := []any{afterRecordVersion} - if recordType != "" { - query += ` AND type = $2` - args = append(args, recordType) - } - query += ` - ORDER BY version ASC - LIMIT 1 - ` - err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt) - if isNotFound(err) { - return nil, storage.ErrNotFound - } else if err != nil { - return nil, fmt.Errorf("error querying next changed record: %w", err) - } + for { + var recordID string + var version uint64 + var data pgtype.JSONB + var modifiedAt pgtype.Timestamptz + var deletedAt pgtype.Timestamptz + query := ` + SELECT type, id, version, data, modified_at, deleted_at + FROM ` + schemaName + `.` + recordChangesTableName + ` + WHERE version > $1 + ` + args := []any{afterRecordVersion} + if recordType != "" { + query += ` AND type = $2` + args = append(args, recordType) + } + query += ` + ORDER BY version ASC + LIMIT 1 + ` + err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt) + if isNotFound(err) { + return nil, storage.ErrNotFound + } else if err != nil { + return nil, fmt.Errorf("error querying next changed record: %w", err) + } + afterRecordVersion = version - var any anypb.Any - err = protojson.Unmarshal(data.Bytes, &any) - if err != nil { - return nil, fmt.Errorf("error unmarshaling changed record data: %w", err) - } + var any anypb.Any + err = protojson.Unmarshal(data.Bytes, &any) + if isUnknownType(err) { + // ignore + continue + } else if err != nil { + return nil, fmt.Errorf("error unmarshaling changed record data: %w", err) + } - return &databroker.Record{ - Version: version, - Type: recordType, - Id: recordID, - Data: &any, - ModifiedAt: timestamppbFromTimestamptz(modifiedAt), - DeletedAt: timestamppbFromTimestamptz(deletedAt), - }, nil + return &databroker.Record{ + Version: version, + Type: recordType, + Id: recordID, + Data: &any, + ModifiedAt: timestamppbFromTimestamptz(modifiedAt), + DeletedAt: timestamppbFromTimestamptz(deletedAt), + }, nil + } } func getOptions(ctx context.Context, q querier, recordType string) (*databroker.Options, error) { @@ -165,13 +173,15 @@ func getRecord(ctx context.Context, q querier, recordType, recordID string) (*da if isNotFound(err) { return nil, storage.ErrNotFound } else if err != nil { - return nil, err + return nil, fmt.Errorf("postgres: failed to execute query: %w", err) } var any anypb.Any err = protojson.Unmarshal(data.Bytes, &any) - if err != nil { - return nil, err + if isUnknownType(err) { + return nil, storage.ErrNotFound + } else if err != nil { + return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err) } return &databroker.Record{ @@ -193,7 +203,7 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression, query += "WHERE " err := addFilterExpressionToQuery(&query, &args, expr) if err != nil { - return nil, err + return nil, fmt.Errorf("postgres: failed to add filter to query: %w", err) } } query += ` @@ -203,7 +213,7 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression, ` rows, err := q.Query(ctx, query, args...) if err != nil { - return nil, err + return nil, fmt.Errorf("postgres: failed to execute query: %w", err) } defer rows.Close() @@ -215,13 +225,16 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression, var modifiedAt pgtype.Timestamptz err = rows.Scan(&recordType, &id, &version, &data, &modifiedAt) if err != nil { - return nil, err + return nil, fmt.Errorf("postgres: failed to scan row: %w", err) } var any anypb.Any err = protojson.Unmarshal(data.Bytes, &any) - if err != nil { - return nil, err + if isUnknownType(err) { + // ignore records with an unknown type + continue + } else if err != nil { + return nil, fmt.Errorf("postgres: failed to unmarshal data: %w", err) } records = append(records, &databroker.Record{ @@ -232,7 +245,12 @@ func listRecords(ctx context.Context, q querier, expr storage.FilterExpression, ModifiedAt: timestamppbFromTimestamptz(modifiedAt), }) } - return records, rows.Err() + err = rows.Err() + if err != nil { + return nil, fmt.Errorf("postgres: error iterating over rows: %w", err) + } + + return records, nil } func listServices(ctx context.Context, q querier) ([]*registry.Service, error) { @@ -245,7 +263,7 @@ func listServices(ctx context.Context, q querier) ([]*registry.Service, error) { ` rows, err := q.Query(ctx, query) if err != nil { - return nil, err + return nil, fmt.Errorf("postgres: failed to execute query: %w", err) } defer rows.Close() @@ -253,7 +271,7 @@ func listServices(ctx context.Context, q querier) ([]*registry.Service, error) { var kind, endpoint string err = rows.Scan(&kind, &endpoint) if err != nil { - return nil, err + return nil, fmt.Errorf("postgres: failed to scan row: %w", err) } services = append(services, ®istry.Service{ @@ -263,7 +281,7 @@ func listServices(ctx context.Context, q querier) ([]*registry.Service, error) { } err = rows.Err() if err != nil { - return nil, err + return nil, fmt.Errorf("postgres: error iterating over rows: %w", err) } return services, nil @@ -287,7 +305,7 @@ func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string func putRecordAndChange(ctx context.Context, q querier, record *databroker.Record) error { data, err := jsonbFromAny(record.GetData()) if err != nil { - return err + return fmt.Errorf("postgres: failed to convert any to json: %w", err) } modifiedAt := timestamptzFromTimestamppb(record.GetModifiedAt()) @@ -325,7 +343,7 @@ func putRecordAndChange(ctx context.Context, q querier, record *databroker.Recor } err = q.QueryRow(ctx, query, args...).Scan(&record.Version) if err != nil && !isNotFound(err) { - return err + return fmt.Errorf("postgres: failed to execute query: %w", err) } return nil @@ -398,3 +416,12 @@ func timestamptzFromTimestamppb(ts *timestamppb.Timestamp) pgtype.Timestamptz { func isNotFound(err error) bool { return errors.Is(err, pgx.ErrNoRows) || errors.Is(err, storage.ErrNotFound) } + +func isUnknownType(err error) bool { + if err == nil { + return false + } + + return errors.Is(err, protoregistry.NotFound) || + strings.Contains(err.Error(), "unable to resolve") // protojson doesn't wrap errors so check for the string +}