From 2dc778035d2296b0cb498fc7eff2d1aa7f389fd8 Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Tue, 29 Mar 2022 16:36:40 -0600 Subject: [PATCH] databroker: add support for field masks on Put (#3210) * databroker: add support for field masks on Put * return errors * clean up go.mod --- go.mod | 4 +- go.sum | 2 + internal/databroker/server.go | 2 +- pkg/grpc/databroker/databroker.pb.go | 184 +++++++++++++++------------ pkg/grpc/databroker/databroker.proto | 6 +- pkg/protoutil/mask.go | 31 +++++ pkg/protoutil/mask_test.go | 35 +++++ pkg/storage/encrypted.go | 5 +- pkg/storage/encrypted_test.go | 5 +- pkg/storage/inmemory/backend.go | 22 +++- pkg/storage/inmemory/backend_test.go | 59 ++++++++- pkg/storage/redis/redis.go | 85 +++++++++---- pkg/storage/redis/redis_test.go | 65 +++++++++- pkg/storage/storage.go | 3 +- pkg/storage/storage_test.go | 7 +- 15 files changed, 381 insertions(+), 134 deletions(-) create mode 100644 pkg/protoutil/mask.go create mode 100644 pkg/protoutil/mask_test.go diff --git a/go.mod b/go.mod index 79a2a4bc1..93dc9aebe 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 contrib.go.opencensus.io/exporter/prometheus v0.4.1 contrib.go.opencensus.io/exporter/zipkin v0.1.2 + github.com/CAFxX/httpcompression v0.0.8 github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20200406135749-5c268882acf0 github.com/caddyserver/certmagic v0.16.0 github.com/cenkalti/backoff/v4 v4.1.2 @@ -33,6 +34,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru v0.5.4 github.com/martinlindhe/base36 v1.1.0 + github.com/mennanov/fmutils v0.1.1 github.com/mholt/acmez v1.0.2 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/mitchellh/mapstructure v1.4.3 @@ -75,8 +77,6 @@ require ( sigs.k8s.io/yaml v1.3.0 ) -require github.com/CAFxX/httpcompression v0.0.8 - require ( 4d63.com/gochecknoglobals v0.1.0 // indirect cloud.google.com/go/compute v1.5.0 // indirect diff --git a/go.sum b/go.sum index 70a6956d3..180828ea8 100644 --- a/go.sum +++ b/go.sum @@ -947,6 +947,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182aff github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mbilski/exhaustivestruct v1.2.0 h1:wCBmUnSYufAHO6J4AVWY6ff+oxWxsVFrwgOdMUQePUo= github.com/mbilski/exhaustivestruct v1.2.0/go.mod h1:OeTBVxQWoEmB2J2JCHmXWPJ0aksxSUOUy+nvtVEfzXc= +github.com/mennanov/fmutils v0.1.1 h1:7GAoy/WI1ZUJDmuyB/i33DrL+E9ruj6BXv2GMqIBtj0= +github.com/mennanov/fmutils v0.1.1/go.mod h1:DE+qeI9Xy5s1GA4trgq8H26jr5DgJ4a9+0D1DPVCqyk= github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg= github.com/mgechev/revive v1.1.4 h1:sZOjY6GU35Kr9jKa/wsKSHgrFz8eASIB5i3tqWZMp0A= github.com/mgechev/revive v1.1.4/go.mod h1:ZZq2bmyssGh8MSPz3VVziqRNIMYTJXzP8MUKG90vZ9A= diff --git a/internal/databroker/server.go b/internal/databroker/server.go index 5c4294a8f..4245226de 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -184,7 +184,7 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr return nil, err } - serverVersion, err := db.Put(ctx, record) + serverVersion, err := db.Put(ctx, record, req.Mask) if err != nil { return nil, err } diff --git a/pkg/grpc/databroker/databroker.pb.go b/pkg/grpc/databroker/databroker.pb.go index 38a64495c..4a5c38aee 100644 --- a/pkg/grpc/databroker/databroker.pb.go +++ b/pkg/grpc/databroker/databroker.pb.go @@ -16,6 +16,7 @@ import ( anypb "google.golang.org/protobuf/types/known/anypb" durationpb "google.golang.org/protobuf/types/known/durationpb" emptypb "google.golang.org/protobuf/types/known/emptypb" + fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" @@ -454,7 +455,8 @@ type PutRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"` + Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"` + Mask *fieldmaskpb.FieldMask `protobuf:"bytes,2,opt,name=mask,proto3,oneof" json:"mask,omitempty"` } func (x *PutRequest) Reset() { @@ -496,6 +498,13 @@ func (x *PutRequest) GetRecord() *Record { return nil } +func (x *PutRequest) GetMask() *fieldmaskpb.FieldMask { + if x != nil { + return x.Mask + } + return nil +} + type PutResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1118,57 +1127,63 @@ var file_databroker_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe8, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, - 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, - 0x12, 0x28, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3b, 0x0a, 0x0b, 0x6d, 0x6f, - 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x6d, 0x6f, 0x64, - 0x69, 0x66, 0x69, 0x65, 0x64, 0x41, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x64, 0x65, 0x6c, 0x65, 0x74, - 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, - 0x41, 0x74, 0x22, 0x65, 0x0a, 0x08, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x25, - 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x15, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x5f, - 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x04, 0x52, 0x13, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x37, 0x0a, 0x07, 0x4f, 0x70, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1f, 0x0a, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, - 0x74, 0x79, 0x88, 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, - 0x74, 0x79, 0x22, 0x30, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x02, 0x69, 0x64, 0x22, 0x39, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, - 0x66, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, - 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, - 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, - 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x5e, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, - 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, - 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, - 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, - 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x38, 0x0a, 0x0a, 0x50, 0x75, 0x74, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x5f, 0x6d, 0x61, + 0x73, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe8, 0x01, 0x0a, 0x06, 0x52, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, + 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x28, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x3b, 0x0a, 0x0b, + 0x6d, 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0a, 0x6d, + 0x6f, 0x64, 0x69, 0x66, 0x69, 0x65, 0x64, 0x41, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x64, 0x65, 0x6c, + 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x64, 0x65, 0x6c, 0x65, 0x74, + 0x65, 0x64, 0x41, 0x74, 0x22, 0x65, 0x0a, 0x08, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x15, 0x6c, 0x61, 0x74, 0x65, 0x73, + 0x74, 0x5f, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x13, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x37, 0x0a, 0x07, 0x4f, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1f, 0x0a, 0x08, 0x63, 0x61, 0x70, 0x61, 0x63, 0x69, + 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, 0x08, 0x63, 0x61, 0x70, 0x61, + 0x63, 0x69, 0x74, 0x79, 0x88, 0x01, 0x01, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x63, 0x61, 0x70, 0x61, + 0x63, 0x69, 0x74, 0x79, 0x22, 0x30, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x39, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, - 0x64, 0x22, 0x60, 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x64, 0x22, 0x66, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x6f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, + 0x73, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x5e, 0x0a, 0x0d, 0x51, 0x75, 0x65, + 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x72, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, + 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, + 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x6f, 0x74, 0x61, + 0x6c, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x74, + 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x76, 0x0a, 0x0a, 0x50, 0x75, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, + 0x6f, 0x72, 0x64, 0x12, 0x33, 0x0a, 0x04, 0x6d, 0x61, 0x73, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x4d, 0x61, 0x73, 0x6b, 0x48, 0x00, 0x52, + 0x04, 0x6d, 0x61, 0x73, 0x6b, 0x88, 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x6d, 0x61, 0x73, + 0x6b, 0x22, 0x60, 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, @@ -1305,8 +1320,9 @@ var file_databroker_proto_goTypes = []interface{}{ (*RenewLeaseRequest)(nil), // 18: databroker.RenewLeaseRequest (*anypb.Any)(nil), // 19: google.protobuf.Any (*timestamppb.Timestamp)(nil), // 20: google.protobuf.Timestamp - (*durationpb.Duration)(nil), // 21: google.protobuf.Duration - (*emptypb.Empty)(nil), // 22: google.protobuf.Empty + (*fieldmaskpb.FieldMask)(nil), // 21: google.protobuf.FieldMask + (*durationpb.Duration)(nil), // 22: google.protobuf.Duration + (*emptypb.Empty)(nil), // 23: google.protobuf.Empty } var file_databroker_proto_depIdxs = []int32{ 19, // 0: databroker.Record.data:type_name -> google.protobuf.Any @@ -1315,37 +1331,38 @@ var file_databroker_proto_depIdxs = []int32{ 0, // 3: databroker.GetResponse.record:type_name -> databroker.Record 0, // 4: databroker.QueryResponse.records:type_name -> databroker.Record 0, // 5: databroker.PutRequest.record:type_name -> databroker.Record - 0, // 6: databroker.PutResponse.record:type_name -> databroker.Record - 2, // 7: databroker.SetOptionsRequest.options:type_name -> databroker.Options - 2, // 8: databroker.SetOptionsResponse.options:type_name -> databroker.Options - 0, // 9: databroker.SyncResponse.record:type_name -> databroker.Record - 0, // 10: databroker.SyncLatestResponse.record:type_name -> databroker.Record - 1, // 11: databroker.SyncLatestResponse.versions:type_name -> databroker.Versions - 21, // 12: databroker.AcquireLeaseRequest.duration:type_name -> google.protobuf.Duration - 21, // 13: databroker.RenewLeaseRequest.duration:type_name -> google.protobuf.Duration - 15, // 14: databroker.DataBrokerService.AcquireLease:input_type -> databroker.AcquireLeaseRequest - 3, // 15: databroker.DataBrokerService.Get:input_type -> databroker.GetRequest - 7, // 16: databroker.DataBrokerService.Put:input_type -> databroker.PutRequest - 5, // 17: databroker.DataBrokerService.Query:input_type -> databroker.QueryRequest - 17, // 18: databroker.DataBrokerService.ReleaseLease:input_type -> databroker.ReleaseLeaseRequest - 18, // 19: databroker.DataBrokerService.RenewLease:input_type -> databroker.RenewLeaseRequest - 9, // 20: databroker.DataBrokerService.SetOptions:input_type -> databroker.SetOptionsRequest - 11, // 21: databroker.DataBrokerService.Sync:input_type -> databroker.SyncRequest - 13, // 22: databroker.DataBrokerService.SyncLatest:input_type -> databroker.SyncLatestRequest - 16, // 23: databroker.DataBrokerService.AcquireLease:output_type -> databroker.AcquireLeaseResponse - 4, // 24: databroker.DataBrokerService.Get:output_type -> databroker.GetResponse - 8, // 25: databroker.DataBrokerService.Put:output_type -> databroker.PutResponse - 6, // 26: databroker.DataBrokerService.Query:output_type -> databroker.QueryResponse - 22, // 27: databroker.DataBrokerService.ReleaseLease:output_type -> google.protobuf.Empty - 22, // 28: databroker.DataBrokerService.RenewLease:output_type -> google.protobuf.Empty - 10, // 29: databroker.DataBrokerService.SetOptions:output_type -> databroker.SetOptionsResponse - 12, // 30: databroker.DataBrokerService.Sync:output_type -> databroker.SyncResponse - 14, // 31: databroker.DataBrokerService.SyncLatest:output_type -> databroker.SyncLatestResponse - 23, // [23:32] is the sub-list for method output_type - 14, // [14:23] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 21, // 6: databroker.PutRequest.mask:type_name -> google.protobuf.FieldMask + 0, // 7: databroker.PutResponse.record:type_name -> databroker.Record + 2, // 8: databroker.SetOptionsRequest.options:type_name -> databroker.Options + 2, // 9: databroker.SetOptionsResponse.options:type_name -> databroker.Options + 0, // 10: databroker.SyncResponse.record:type_name -> databroker.Record + 0, // 11: databroker.SyncLatestResponse.record:type_name -> databroker.Record + 1, // 12: databroker.SyncLatestResponse.versions:type_name -> databroker.Versions + 22, // 13: databroker.AcquireLeaseRequest.duration:type_name -> google.protobuf.Duration + 22, // 14: databroker.RenewLeaseRequest.duration:type_name -> google.protobuf.Duration + 15, // 15: databroker.DataBrokerService.AcquireLease:input_type -> databroker.AcquireLeaseRequest + 3, // 16: databroker.DataBrokerService.Get:input_type -> databroker.GetRequest + 7, // 17: databroker.DataBrokerService.Put:input_type -> databroker.PutRequest + 5, // 18: databroker.DataBrokerService.Query:input_type -> databroker.QueryRequest + 17, // 19: databroker.DataBrokerService.ReleaseLease:input_type -> databroker.ReleaseLeaseRequest + 18, // 20: databroker.DataBrokerService.RenewLease:input_type -> databroker.RenewLeaseRequest + 9, // 21: databroker.DataBrokerService.SetOptions:input_type -> databroker.SetOptionsRequest + 11, // 22: databroker.DataBrokerService.Sync:input_type -> databroker.SyncRequest + 13, // 23: databroker.DataBrokerService.SyncLatest:input_type -> databroker.SyncLatestRequest + 16, // 24: databroker.DataBrokerService.AcquireLease:output_type -> databroker.AcquireLeaseResponse + 4, // 25: databroker.DataBrokerService.Get:output_type -> databroker.GetResponse + 8, // 26: databroker.DataBrokerService.Put:output_type -> databroker.PutResponse + 6, // 27: databroker.DataBrokerService.Query:output_type -> databroker.QueryResponse + 23, // 28: databroker.DataBrokerService.ReleaseLease:output_type -> google.protobuf.Empty + 23, // 29: databroker.DataBrokerService.RenewLease:output_type -> google.protobuf.Empty + 10, // 30: databroker.DataBrokerService.SetOptions:output_type -> databroker.SetOptionsResponse + 12, // 31: databroker.DataBrokerService.Sync:output_type -> databroker.SyncResponse + 14, // 32: databroker.DataBrokerService.SyncLatest:output_type -> databroker.SyncLatestResponse + 24, // [24:33] is the sub-list for method output_type + 15, // [15:24] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_databroker_proto_init() } @@ -1584,6 +1601,7 @@ func file_databroker_proto_init() { } } file_databroker_proto_msgTypes[2].OneofWrappers = []interface{}{} + file_databroker_proto_msgTypes[7].OneofWrappers = []interface{}{} file_databroker_proto_msgTypes[14].OneofWrappers = []interface{}{ (*SyncLatestResponse_Record)(nil), (*SyncLatestResponse_Versions)(nil), diff --git a/pkg/grpc/databroker/databroker.proto b/pkg/grpc/databroker/databroker.proto index 69f2bdf29..1eaeafced 100644 --- a/pkg/grpc/databroker/databroker.proto +++ b/pkg/grpc/databroker/databroker.proto @@ -6,6 +6,7 @@ option go_package = "github.com/pomerium/pomerium/pkg/grpc/databroker"; import "google/protobuf/any.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/empty.proto"; +import "google/protobuf/field_mask.proto"; import "google/protobuf/timestamp.proto"; message Record { @@ -46,7 +47,10 @@ message QueryResponse { int64 total_count = 2; } -message PutRequest { Record record = 1; } +message PutRequest { + Record record = 1; + optional google.protobuf.FieldMask mask = 2; +} message PutResponse { uint64 server_version = 1; Record record = 2; diff --git a/pkg/protoutil/mask.go b/pkg/protoutil/mask.go new file mode 100644 index 000000000..2441e65ef --- /dev/null +++ b/pkg/protoutil/mask.go @@ -0,0 +1,31 @@ +package protoutil + +import ( + "github.com/mennanov/fmutils" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +// MergeAnyWithFieldMask merges the data in src with the data in dst, +// but only the fields identified by the given mask. +func MergeAnyWithFieldMask(dst, src *anypb.Any, mask *fieldmaskpb.FieldMask) (*anypb.Any, error) { + if mask == nil { + return src, nil + } + + srcMsg, err := src.UnmarshalNew() + if err != nil { + return nil, err + } + + dstMsg, err := dst.UnmarshalNew() + if err != nil { + return nil, err + } + + fmutils.Filter(srcMsg, mask.GetPaths()) + proto.Merge(dstMsg, srcMsg) + + return anypb.New(dstMsg) +} diff --git a/pkg/protoutil/mask_test.go b/pkg/protoutil/mask_test.go new file mode 100644 index 000000000..ded0691d4 --- /dev/null +++ b/pkg/protoutil/mask_test.go @@ -0,0 +1,35 @@ +package protoutil + +import ( + "testing" + + envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +func TestMergeWithFieldMask(t *testing.T) { + m1 := NewAny(&envoy_type_v3.SemanticVersion{ + MajorNumber: 1, + MinorNumber: 1, + Patch: 1, + }) + m2 := NewAny(&envoy_type_v3.SemanticVersion{ + MajorNumber: 2, + MinorNumber: 2, + Patch: 2, + }) + expect := NewAny(&envoy_type_v3.SemanticVersion{ + MajorNumber: 2, + MinorNumber: 1, + Patch: 2, + }) + actual, err := MergeAnyWithFieldMask(m1, m2, &fieldmaskpb.FieldMask{ + Paths: []string{"major_number", "patch"}, + }) + require.NoError(t, err) + assert.Empty(t, cmp.Diff(expect, actual, protocmp.Transform())) +} diff --git a/pkg/storage/encrypted.go b/pkg/storage/encrypted.go index 904c8e49f..05bb11120 100644 --- a/pkg/storage/encrypted.go +++ b/pkg/storage/encrypted.go @@ -7,6 +7,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/fieldmaskpb" "google.golang.org/protobuf/types/known/wrapperspb" "github.com/pomerium/pomerium/pkg/cryptutil" @@ -103,7 +104,7 @@ func (e *encryptedBackend) Lease(ctx context.Context, leaseName, leaseID string, return e.underlying.Lease(ctx, leaseName, leaseID, ttl) } -func (e *encryptedBackend) Put(ctx context.Context, record *databroker.Record) (uint64, error) { +func (e *encryptedBackend) Put(ctx context.Context, record *databroker.Record, mask *fieldmaskpb.FieldMask) (uint64, error) { encrypted, err := e.encrypt(record.GetData()) if err != nil { return 0, err @@ -112,7 +113,7 @@ func (e *encryptedBackend) Put(ctx context.Context, record *databroker.Record) ( newRecord := proto.Clone(record).(*databroker.Record) newRecord.Data = encrypted - serverVersion, err := e.underlying.Put(ctx, newRecord) + serverVersion, err := e.underlying.Put(ctx, newRecord, mask) if err != nil { return 0, err } diff --git a/pkg/storage/encrypted_test.go b/pkg/storage/encrypted_test.go index c1a83ddfb..26d36c94b 100644 --- a/pkg/storage/encrypted_test.go +++ b/pkg/storage/encrypted_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/fieldmaskpb" "google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/wrapperspb" @@ -20,7 +21,7 @@ func TestEncryptedBackend(t *testing.T) { m := map[string]*anypb.Any{} backend := &mockBackend{ - put: func(ctx context.Context, record *databroker.Record) (uint64, error) { + put: func(ctx context.Context, record *databroker.Record, mask *fieldmaskpb.FieldMask) (uint64, error) { record.ModifiedAt = timestamppb.Now() record.Version++ m[record.GetId()] = record.GetData() @@ -64,7 +65,7 @@ func TestEncryptedBackend(t *testing.T) { Id: "TEST-1", Data: any, } - _, err = e.Put(ctx, rec) + _, err = e.Put(ctx, rec, nil) if !assert.NoError(t, err) { return } diff --git a/pkg/storage/inmemory/backend.go b/pkg/storage/inmemory/backend.go index c298dfc71..7cc4c00ce 100644 --- a/pkg/storage/inmemory/backend.go +++ b/pkg/storage/inmemory/backend.go @@ -11,12 +11,14 @@ import ( "github.com/google/btree" "github.com/rs/zerolog" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/fieldmaskpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/signal" "github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/protoutil" "github.com/pomerium/pomerium/pkg/storage" ) @@ -204,7 +206,11 @@ func (backend *Backend) Lease(_ context.Context, leaseName, leaseID string, ttl } // Put puts a record into the in-memory store. -func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) { +func (backend *Backend) Put( + ctx context.Context, + record *databroker.Record, + mask *fieldmaskpb.FieldMask, +) (serverVersion uint64, err error) { if record == nil { return backend.serverVersion, fmt.Errorf("records cannot be nil") } @@ -219,14 +225,24 @@ func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (ser defer backend.mu.Unlock() defer backend.onChange.Broadcast(ctx) - backend.recordChange(record) - c, ok := backend.lookup[record.GetType()] if !ok { c = NewRecordCollection() backend.lookup[record.GetType()] = c } + if mask != nil { + oldRecord := c.Get(record.GetId()) + if oldRecord != nil { + record.Data, err = protoutil.MergeAnyWithFieldMask(oldRecord.Data, record.Data, mask) + if err != nil { + return serverVersion, err + } + } + } + + backend.recordChange(record) + if record.GetDeletedAt() != nil { c.Delete(record.GetId()) } else { diff --git a/pkg/storage/inmemory/backend_test.go b/pkg/storage/inmemory/backend_test.go index f4cfeb7b4..698881ba2 100644 --- a/pkg/storage/inmemory/backend_test.go +++ b/pkg/storage/inmemory/backend_test.go @@ -6,14 +6,18 @@ import ( "testing" "time" + envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/fieldmaskpb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/pomerium/pomerium/internal/testutil" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/protoutil" "github.com/pomerium/pomerium/pkg/storage" ) @@ -32,7 +36,7 @@ func TestBackend(t *testing.T) { Type: "TYPE", Id: "abcd", Data: data, - }) + }, nil) assert.NoError(t, err) assert.Equal(t, backend.serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") @@ -51,7 +55,7 @@ func TestBackend(t *testing.T) { Type: "TYPE", Id: "abcd", DeletedAt: timestamppb.Now(), - }) + }, nil) assert.NoError(t, err) assert.Equal(t, backend.serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") @@ -63,7 +67,7 @@ func TestBackend(t *testing.T) { sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }, nil) assert.NoError(t, err) assert.Equal(t, backend.serverVersion, sv) } @@ -83,7 +87,7 @@ func TestExpiry(t *testing.T) { sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }, nil) assert.NoError(t, err) assert.Equal(t, backend.serverVersion, sv) } @@ -123,7 +127,7 @@ func TestConcurrency(t *testing.T) { for i := 0; i < 1000; i++ { _, _ = backend.Put(ctx, &databroker.Record{ Id: fmt.Sprint(i), - }) + }, nil) } return nil }) @@ -155,7 +159,7 @@ func TestStream(t *testing.T) { _, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }, nil) assert.NoError(t, err) } return nil @@ -206,7 +210,7 @@ func TestCapacity(t *testing.T) { _, err = backend.Put(ctx, &databroker.Record{ Type: "EXAMPLE", Id: fmt.Sprint(i), - }) + }, nil) require.NoError(t, err) } @@ -245,3 +249,44 @@ func TestLease(t *testing.T) { assert.True(t, ok, "expected b to to acquire the lease") } } + +func TestFieldMask(t *testing.T) { + ctx := context.Background() + backend := New() + + _, _ = backend.Put(ctx, &databroker.Record{ + Type: "example", + Id: "example", + Data: protoutil.NewAny(&envoy_type_v3.SemanticVersion{ + MajorNumber: 1, + MinorNumber: 1, + Patch: 1, + }), + }, nil) + + _, _ = backend.Put(ctx, &databroker.Record{ + Type: "example", + Id: "example", + Data: protoutil.NewAny(&envoy_type_v3.SemanticVersion{ + MajorNumber: 2, + MinorNumber: 2, + Patch: 2, + }), + }, &fieldmaskpb.FieldMask{ + Paths: []string{"major_number", "patch"}, + }) + + record, _ := backend.Get(ctx, "example", "example") + record.ModifiedAt = nil + testutil.AssertProtoJSONEqual(t, `{ + "data": { + "@type": "type.googleapis.com/envoy.type.v3.SemanticVersion", + "majorNumber": 2, + "minorNumber": 1, + "patch": 2 + }, + "id": "example", + "type": "example", + "version": "2" + }`, record) +} diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index 0bd139e7a..33bd68da2 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -10,7 +10,8 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/go-redis/redis/v8" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/fieldmaskpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/log" @@ -20,6 +21,7 @@ import ( "github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/protoutil" "github.com/pomerium/pomerium/pkg/storage" ) @@ -117,23 +119,7 @@ func (backend *Backend) Get(ctx context.Context, recordType, id string) (_ *data _, span := trace.StartSpan(ctx, "databroker.redis.Get") defer span.End() defer func(start time.Time) { recordOperation(ctx, start, "get", err) }(time.Now()) - - key, field := getHashKey(recordType, id) - cmd := backend.client.HGet(ctx, key, field) - raw, err := cmd.Result() - if err == redis.Nil { - return nil, storage.ErrNotFound - } else if err != nil { - return nil, err - } - - var record databroker.Record - err = proto.Unmarshal([]byte(raw), &record) - if err != nil { - return nil, err - } - - return &record, nil + return backend.get(ctx, backend.client, recordType, id) } // GetAll gets all the records from redis. @@ -241,7 +227,11 @@ func (backend *Backend) Lease(ctx context.Context, leaseName, leaseID string, tt } // Put puts a record into redis. -func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) { +func (backend *Backend) Put( + ctx context.Context, + record *databroker.Record, + mask *fieldmaskpb.FieldMask, +) (serverVersion uint64, err error) { ctx, span := trace.StartSpan(ctx, "databroker.redis.Put") defer span.End() defer func(start time.Time) { recordOperation(ctx, start, "put", err) }(time.Now()) @@ -251,7 +241,7 @@ func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (ser return serverVersion, err } - err = backend.put(ctx, record) + err = backend.put(ctx, record, mask) if err != nil { return serverVersion, err } @@ -294,19 +284,68 @@ func (backend *Backend) Sync(ctx context.Context, serverVersion, recordVersion u return newRecordStream(ctx, backend, serverVersion, recordVersion), nil } -func (backend *Backend) put(ctx context.Context, record *databroker.Record) error { +func (backend *Backend) get( + ctx context.Context, + cmdable redis.Cmdable, + recordType, recordID string, +) (*databroker.Record, error) { + key, field := getHashKey(recordType, recordID) + cmd := cmdable.HGet(ctx, key, field) + raw, err := cmd.Result() + if err == redis.Nil { + return nil, storage.ErrNotFound + } else if err != nil { + return nil, err + } + + var record databroker.Record + err = proto.Unmarshal([]byte(raw), &record) + if err != nil { + return nil, err + } + + return &record, nil +} + +func (backend *Backend) put( + ctx context.Context, + record *databroker.Record, + mask *fieldmaskpb.FieldMask, +) error { + var oldRecord *databroker.Record return backend.incrementVersion(ctx, func(tx *redis.Tx, version uint64) error { + if mask != nil { + var err error + oldRecord, err = backend.get(ctx, tx, record.GetType(), record.GetId()) + if errors.Is(err, storage.ErrNotFound) { + // ignore + } else if err != nil { + return err + } + } + record.ModifiedAt = timestamppb.Now() record.Version = version return nil }, func(p redis.Pipeliner, version uint64) error { + if oldRecord != nil { + var err error + record.Data, err = protoutil.MergeAnyWithFieldMask( + oldRecord.GetData(), + record.GetData(), + mask, + ) + if err != nil { + return err + } + } + bs, err := proto.Marshal(record) if err != nil { return err } - key, field := getHashKey(record.GetType(), record.GetId()) if record.DeletedAt != nil { p.HDel(ctx, key, field) @@ -354,7 +393,7 @@ func (backend *Backend) enforceOptions(ctx context.Context, recordType string) e if err == nil { // mark the record as deleted and re-submit record.DeletedAt = timestamppb.Now() - err = backend.put(ctx, record) + err = backend.put(ctx, record, nil) if err != nil { return err } diff --git a/pkg/storage/redis/redis_test.go b/pkg/storage/redis/redis_test.go index 9d0114804..dd0996744 100644 --- a/pkg/storage/redis/redis_test.go +++ b/pkg/storage/redis/redis_test.go @@ -8,15 +8,18 @@ import ( "testing" "time" + envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/fieldmaskpb" "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/testutil" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/protoutil" ) func TestBackend(t *testing.T) { @@ -48,7 +51,7 @@ func TestBackend(t *testing.T) { Type: "TYPE", Id: "abcd", Data: data, - }) + }, nil) assert.NoError(t, err) assert.Equal(t, serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") @@ -67,7 +70,7 @@ func TestBackend(t *testing.T) { Type: "TYPE", Id: "abcd", DeletedAt: timestamppb.Now(), - }) + }, nil) assert.NoError(t, err) assert.Equal(t, serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") @@ -79,7 +82,7 @@ func TestBackend(t *testing.T) { sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }, nil) assert.NoError(t, err) assert.Equal(t, serverVersion, sv) } @@ -163,7 +166,7 @@ func TestChangeSignal(t *testing.T) { _, err = backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: "ID", - }) + }, nil) if err != nil { return err } @@ -200,7 +203,7 @@ func TestExpiry(t *testing.T) { _, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }, nil) assert.NoError(t, err) } stream, err := backend.Sync(ctx, serverVersion, 0) @@ -247,7 +250,7 @@ func TestCapacity(t *testing.T) { _, err = backend.Put(ctx, &databroker.Record{ Type: "EXAMPLE", Id: fmt.Sprint(i), - }) + }, nil) require.NoError(t, err) } @@ -300,3 +303,53 @@ func TestLease(t *testing.T) { return nil })) } + +func TestFieldMask(t *testing.T) { + if os.Getenv("GITHUB_ACTION") != "" && runtime.GOOS == "darwin" { + t.Skip("Github action can not run docker on MacOS") + } + + ctx := context.Background() + require.NoError(t, testutil.WithTestRedis(false, func(rawURL string) error { + backend, err := New(rawURL) + require.NoError(t, err) + defer func() { _ = backend.Close() }() + + _, _ = backend.Put(ctx, &databroker.Record{ + Type: "example", + Id: "example", + Data: protoutil.NewAny(&envoy_type_v3.SemanticVersion{ + MajorNumber: 1, + MinorNumber: 1, + Patch: 1, + }), + }, nil) + + _, _ = backend.Put(ctx, &databroker.Record{ + Type: "example", + Id: "example", + Data: protoutil.NewAny(&envoy_type_v3.SemanticVersion{ + MajorNumber: 2, + MinorNumber: 2, + Patch: 2, + }), + }, &fieldmaskpb.FieldMask{ + Paths: []string{"major_number", "patch"}, + }) + + record, _ := backend.Get(ctx, "example", "example") + record.ModifiedAt = nil + testutil.AssertProtoJSONEqual(t, `{ + "data": { + "@type": "type.googleapis.com/envoy.type.v3.SemanticVersion", + "majorNumber": 2, + "minorNumber": 1, + "patch": 2 + }, + "id": "example", + "type": "example", + "version": "2" + }`, record) + return nil + })) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index dbd1d7e0c..9dd8beeae 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/fieldmaskpb" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/pkg/grpc/databroker" @@ -51,7 +52,7 @@ type Backend interface { // Lease acquires a lease, or renews an existing one. If the lease is acquired true is returned. Lease(ctx context.Context, leaseName, leaseID string, ttl time.Duration) (bool, error) // Put is used to insert or update a record. - Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) + Put(ctx context.Context, record *databroker.Record, mask *fieldmaskpb.FieldMask) (serverVersion uint64, err error) // SetOptions sets the options for a type. SetOptions(ctx context.Context, recordType string, options *databroker.Options) error // Sync syncs record changes after the specified version. diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 025d1d056..eb2444f57 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/fieldmaskpb" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/user" @@ -13,7 +14,7 @@ import ( type mockBackend struct { Backend - put func(ctx context.Context, record *databroker.Record) (uint64, error) + put func(ctx context.Context, record *databroker.Record, mask *fieldmaskpb.FieldMask) (uint64, error) get func(ctx context.Context, recordType, id string) (*databroker.Record, error) getAll func(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) } @@ -22,8 +23,8 @@ func (m *mockBackend) Close() error { return nil } -func (m *mockBackend) Put(ctx context.Context, record *databroker.Record) (uint64, error) { - return m.put(ctx, record) +func (m *mockBackend) Put(ctx context.Context, record *databroker.Record, mask *fieldmaskpb.FieldMask) (uint64, error) { + return m.put(ctx, record, mask) } func (m *mockBackend) Get(ctx context.Context, recordType, id string) (*databroker.Record, error) {