mirror of
https://github.com/pomerium/pomerium.git
synced 2025-05-13 17:17:43 +02:00
postgres: fix record deletion (#3446)
This commit is contained in:
parent
270f8b4f2c
commit
1727d178ef
2 changed files with 22 additions and 2 deletions
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/types/known/structpb"
|
"google.golang.org/protobuf/types/known/structpb"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/testutil"
|
"github.com/pomerium/pomerium/internal/testutil"
|
||||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
@ -44,6 +45,21 @@ func TestBackend(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("delete", func(t *testing.T) {
|
||||||
|
serverVersion, err := backend.Put(ctx, []*databroker.Record{
|
||||||
|
{
|
||||||
|
Type: "test-1",
|
||||||
|
Id: "r3",
|
||||||
|
Data: protoutil.NewAny(protoutil.NewStructMap(map[string]*structpb.Value{
|
||||||
|
"k1": protoutil.NewStructString("v1"),
|
||||||
|
})),
|
||||||
|
DeletedAt: timestamppb.Now(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
assert.NotEqual(t, 0, serverVersion)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
t.Run("capacity", func(t *testing.T) {
|
t.Run("capacity", func(t *testing.T) {
|
||||||
err := backend.SetOptions(ctx, "capacity-test", &databroker.Options{
|
err := backend.SetOptions(ctx, "capacity-test", &databroker.Options{
|
||||||
Capacity: proto.Uint64(3),
|
Capacity: proto.Uint64(3),
|
||||||
|
|
|
@ -256,6 +256,9 @@ func putRecordAndChange(ctx context.Context, q querier, record *databroker.Recor
|
||||||
RETURNING *
|
RETURNING *
|
||||||
)
|
)
|
||||||
`
|
`
|
||||||
|
args := []any{
|
||||||
|
record.GetType(), record.GetId(), data, modifiedAt, deletedAt,
|
||||||
|
}
|
||||||
if record.GetDeletedAt() == nil {
|
if record.GetDeletedAt() == nil {
|
||||||
query += `
|
query += `
|
||||||
INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
|
INSERT INTO ` + schemaName + `.` + recordsTableName + ` (type, id, version, data, modified_at, index_cidr)
|
||||||
|
@ -264,6 +267,7 @@ func putRecordAndChange(ctx context.Context, q querier, record *databroker.Recor
|
||||||
SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
|
SET version=(SELECT version FROM t1), data=$3, modified_at=$4, index_cidr=$6
|
||||||
RETURNING ` + schemaName + `.` + recordsTableName + `.version
|
RETURNING ` + schemaName + `.` + recordsTableName + `.version
|
||||||
`
|
`
|
||||||
|
args = append(args, indexCIDR)
|
||||||
} else {
|
} else {
|
||||||
query += `
|
query += `
|
||||||
DELETE FROM ` + schemaName + `.` + recordsTableName + `
|
DELETE FROM ` + schemaName + `.` + recordsTableName + `
|
||||||
|
@ -271,8 +275,8 @@ func putRecordAndChange(ctx context.Context, q querier, record *databroker.Recor
|
||||||
RETURNING ` + schemaName + `.` + recordsTableName + `.version
|
RETURNING ` + schemaName + `.` + recordsTableName + `.version
|
||||||
`
|
`
|
||||||
}
|
}
|
||||||
err = q.QueryRow(ctx, query, record.GetType(), record.GetId(), data, modifiedAt, deletedAt, indexCIDR).Scan(&record.Version)
|
err = q.QueryRow(ctx, query, args...).Scan(&record.Version)
|
||||||
if err != nil {
|
if err != nil && !isNotFound(err) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue