diff --git a/authorize/access_tracker_test.go b/authorize/access_tracker_test.go index e4500a901..a83ffd146 100644 --- a/authorize/access_tracker_test.go +++ b/authorize/access_tracker_test.go @@ -92,30 +92,30 @@ func TestAccessTracker(t *testing.T) { mu.Lock() defer mu.Unlock() - switch in.GetRecord().GetType() { - case "type.googleapis.com/session.Session": - data, _ := in.GetRecord().GetData().UnmarshalNew() - sessions[in.Record.GetId()] = data.(*session.Session) - return &databroker.PutResponse{ - Record: &databroker.Record{ - Type: in.GetRecord().GetType(), - Id: in.GetRecord().GetId(), + res := new(databroker.PutResponse) + for _, record := range in.GetRecords() { + switch record.GetType() { + case "type.googleapis.com/session.Session": + data, _ := record.GetData().UnmarshalNew() + sessions[record.GetId()] = data.(*session.Session) + res.Records = append(res.Records, &databroker.Record{ + Type: record.GetType(), + Id: record.GetId(), Data: protoutil.NewAny(data), - }, - }, nil - case "type.googleapis.com/user.ServiceAccount": - data, _ := in.GetRecord().GetData().UnmarshalNew() - serviceAccounts[in.Record.GetId()] = data.(*user.ServiceAccount) - return &databroker.PutResponse{ - Record: &databroker.Record{ - Type: in.GetRecord().GetType(), - Id: in.GetRecord().GetId(), + }) + case "type.googleapis.com/user.ServiceAccount": + data, _ := record.GetData().UnmarshalNew() + serviceAccounts[record.GetId()] = data.(*user.ServiceAccount) + res.Records = append(res.Records, &databroker.Record{ + Type: record.GetType(), + Id: record.GetId(), Data: protoutil.NewAny(data), - }, - }, nil - default: - return nil, status.Errorf(codes.InvalidArgument, "unknown type: %s", in.GetRecord().GetType()) + }) + default: + return nil, status.Errorf(codes.InvalidArgument, "unknown type: %s", record.GetType()) + } } + return res, nil }, }, }, 200, time.Second) diff --git a/databroker/databroker_test.go b/databroker/databroker_test.go index 196afa394..ee6b75821 100644 --- a/databroker/databroker_test.go +++ b/databroker/databroker_test.go @@ -56,11 +56,11 @@ func TestServerSync(t *testing.T) { for i := 0; i < numRecords; i++ { res, err := c.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: strconv.Itoa(i), Data: any, - }, + }}, }) require.NoError(t, err) serverVersion = res.GetServerVersion() @@ -106,11 +106,11 @@ func BenchmarkSync(b *testing.B) { for i := 0; i < numRecords; i++ { _, _ = c.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: strconv.Itoa(i), Data: any, - }, + }}, }) } diff --git a/databroker/directory.go b/databroker/directory.go index 12dfba109..9cbec0a1d 100644 --- a/databroker/directory.go +++ b/databroker/directory.go @@ -45,11 +45,11 @@ func (c *DataBroker) RefreshUser(ctx context.Context, req *directory.RefreshUser any := protoutil.NewAny(u) _, err = c.dataBrokerServer.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: u.GetId(), Data: any, - }, + }}, }) if err != nil { return nil, err diff --git a/internal/controlplane/events.go b/internal/controlplane/events.go index 8e469bd97..07667bf8e 100644 --- a/internal/controlplane/events.go +++ b/internal/controlplane/events.go @@ -50,11 +50,11 @@ func (srv *Server) storeEvent(ctx context.Context, evt proto.Message) error { } _, err = client.Put(ctx, &databrokerpb.PutRequest{ - Record: &databrokerpb.Record{ + Records: []*databrokerpb.Record{{ Type: any.GetTypeUrl(), Id: id, Data: any, - }, + }}, }) if err != nil { return err diff --git a/internal/databroker/config_source_test.go b/internal/databroker/config_source_test.go index 6d26c7a2e..a83d93528 100644 --- a/internal/databroker/config_source_test.go +++ b/internal/databroker/config_source_test.go @@ -64,11 +64,11 @@ func TestConfigSource(t *testing.T) { }, }) _, _ = dataBrokerServer.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: data.TypeUrl, Id: "1", Data: data, - }, + }}, }) select { diff --git a/internal/databroker/server.go b/internal/databroker/server.go index 5c4294a8f..76c8c9c4b 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -145,21 +145,29 @@ func (srv *Server) Query(ctx context.Context, req *databroker.QueryRequest) (*da return nil, err } - all, _, err := db.GetAll(ctx) + _, stream, err := db.SyncLatest(ctx) if err != nil { return nil, err } + defer stream.Close() var filtered []*databroker.Record - for _, record := range all { + for stream.Next(false) { + record := stream.Record() + if record.GetType() != req.GetType() { continue } + if query != "" && !storage.MatchAny(record.GetData(), query) { continue } + filtered = append(filtered, record) } + if stream.Err() != nil { + return nil, stream.Err() + } records, totalCount := databroker.ApplyOffsetAndLimit(filtered, int(req.GetOffset()), int(req.GetLimit())) return &databroker.QueryResponse{ @@ -172,11 +180,15 @@ func (srv *Server) Query(ctx context.Context, req *databroker.QueryRequest) (*da func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databroker.PutResponse, error) { _, span := trace.StartSpan(ctx, "databroker.grpc.Put") defer span.End() - record := req.GetRecord() + records := req.GetRecords() + var recordType string + for _, record := range records { + recordType = record.GetType() + } log.Info(ctx). - Str("type", record.GetType()). - Str("id", record.GetId()). + Int("record-count", len(records)). + Str("record-type", recordType). Msg("put") db, err := srv.getBackend() @@ -184,14 +196,16 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr return nil, err } - serverVersion, err := db.Put(ctx, record) + serverVersion, err := db.Put(ctx, records) if err != nil { return nil, err } - return &databroker.PutResponse{ + res := &databroker.PutResponse{ ServerVersion: serverVersion, - Record: record, - }, nil + Records: records, + } + + return res, nil } // ReleaseLease releases a lease. @@ -318,12 +332,17 @@ func (srv *Server) SyncLatest(req *databroker.SyncLatestRequest, stream databrok return err } - records, versions, err := backend.GetAll(ctx) + serverVersion, recordStream, err := backend.SyncLatest(ctx) if err != nil { return err } + recordVersion := uint64(0) - for _, record := range records { + for recordStream.Next(false) { + record := recordStream.Record() + if record.GetVersion() > recordVersion { + recordVersion = record.GetVersion() + } if req.GetType() == "" || req.GetType() == record.GetType() { err = stream.Send(&databroker.SyncLatestResponse{ Response: &databroker.SyncLatestResponse_Record{ @@ -335,11 +354,17 @@ func (srv *Server) SyncLatest(req *databroker.SyncLatestRequest, stream databrok } } } + if recordStream.Err() != nil { + return recordStream.Err() + } // always send the server version last in case there are no records return stream.Send(&databroker.SyncLatestResponse{ Response: &databroker.SyncLatestResponse_Versions{ - Versions: versions, + Versions: &databroker.Versions{ + ServerVersion: serverVersion, + LatestRecordVersion: recordVersion, + }, }, }) } diff --git a/internal/databroker/server_test.go b/internal/databroker/server_test.go index c00c4a0d4..54bd1397e 100644 --- a/internal/databroker/server_test.go +++ b/internal/databroker/server_test.go @@ -58,19 +58,19 @@ func TestServer_Get(t *testing.T) { s.Id = "1" any := protoutil.NewAny(s) _, err := srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, Data: any, - }, + }}, }) assert.NoError(t, err) _, err = srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, DeletedAt: timestamppb.Now(), - }, + }}, }) assert.NoError(t, err) _, err = srv.Get(context.Background(), &databroker.GetRequest{ @@ -90,11 +90,11 @@ func TestServer_Options(t *testing.T) { s.Id = "1" any := protoutil.NewAny(s) _, err := srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, Data: any, - }, + }}, }) assert.NoError(t, err) _, err = srv.SetOptions(context.Background(), &databroker.SetOptionsRequest{ @@ -139,11 +139,11 @@ func TestServer_Query(t *testing.T) { s.Id = "1" any := protoutil.NewAny(s) _, err := srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, Data: any, - }, + }}, }) assert.NoError(t, err) _, err = srv.Query(context.Background(), &databroker.QueryRequest{ @@ -160,11 +160,11 @@ func TestServer_Sync(t *testing.T) { s.Id = "1" any := protoutil.NewAny(s) _, err := srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, Data: any, - }, + }}, }) assert.NoError(t, err) @@ -216,11 +216,11 @@ func TestServer_Sync(t *testing.T) { } _, err = srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, Data: any, - }, + }}, }) assert.NoError(t, err) @@ -244,11 +244,11 @@ func TestServerInvalidStorage(t *testing.T) { s.Id = "1" any := protoutil.NewAny(s) _, err := srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, Data: any, - }, + }}, }) _ = assert.Error(t, err) && assert.Contains(t, err.Error(), "unsupported storage type") } @@ -265,11 +265,11 @@ func TestServerRedis(t *testing.T) { s.Id = "1" any := protoutil.NewAny(s) _, err := srv.Put(context.Background(), &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.TypeUrl, Id: s.Id, Data: any, - }, + }}, }) assert.NoError(t, err) diff --git a/internal/envoy/envoy.go b/internal/envoy/envoy.go index cd9f28293..c2f6914dd 100644 --- a/internal/envoy/envoy.go +++ b/internal/envoy/envoy.go @@ -22,7 +22,6 @@ import ( "github.com/cenkalti/backoff/v4" envoy_config_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3" envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - "github.com/golang/protobuf/proto" "github.com/google/go-cmp/cmp" "github.com/natefinch/atomic" "github.com/rs/zerolog" @@ -283,7 +282,7 @@ func (srv *Server) buildBootstrapConfig(cfg *config.Config) ([]byte, error) { LayeredRuntime: layeredRuntimeCfg, } - jsonBytes, err := protojson.Marshal(proto.MessageV2(bootstrapCfg)) + jsonBytes, err := protojson.Marshal(bootstrapCfg) if err != nil { return nil, err } diff --git a/internal/identity/manager/manager.go b/internal/identity/manager/manager.go index 54291e252..40877b69d 100644 --- a/internal/identity/manager/manager.go +++ b/internal/identity/manager/manager.go @@ -259,11 +259,11 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director defer mgr.dataBrokerSemaphore.Release(1) _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: id, Data: any, - }, + }}, }) if err != nil { return fmt.Errorf("failed to update directory group: %s", id) @@ -285,11 +285,11 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director defer mgr.dataBrokerSemaphore.Release(1) _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: id, DeletedAt: timestamppb.Now(), - }, + }}, }) if err != nil { return fmt.Errorf("failed to delete directory group: %s", id) @@ -325,11 +325,11 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory. client := mgr.cfg.Load().dataBrokerClient if _, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: id, Data: any, - }, + }}, }); err != nil { return fmt.Errorf("failed to update directory user: %s", id) } @@ -351,12 +351,12 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory. client := mgr.cfg.Load().dataBrokerClient if _, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: id, Data: any, DeletedAt: timestamppb.Now(), - }, + }}, }); err != nil { return fmt.Errorf("failed to delete directory user (%s): %w", id, err) } @@ -499,7 +499,7 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) { continue } - mgr.onUpdateUser(ctx, res.GetRecord(), u.User) + mgr.onUpdateUser(ctx, res.GetRecords()[0], u.User) } } diff --git a/internal/tests/xdserr/cmd/main.go b/internal/tests/xdserr/cmd/main.go index a622bd1d7..52a18fd40 100644 --- a/internal/tests/xdserr/cmd/main.go +++ b/internal/tests/xdserr/cmd/main.go @@ -181,14 +181,15 @@ func waitHealthy(ctx context.Context, client *http.Client, routes []*config.Rout return nil } + func saveConfig(ctx context.Context, client databroker.DataBrokerServiceClient, cfg *config.Config) error { any := protoutil.NewAny(cfg) r, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: "test_config", Data: any, - }}) + }}}) if err != nil { return err } diff --git a/pkg/grpc/databroker/databroker.go b/pkg/grpc/databroker/databroker.go index a22159e27..3e7bf4840 100644 --- a/pkg/grpc/databroker/databroker.go +++ b/pkg/grpc/databroker/databroker.go @@ -43,8 +43,12 @@ func Get(ctx context.Context, client DataBrokerServiceClient, object recordObjec } // Put puts a record into the databroker. -func Put(ctx context.Context, client DataBrokerServiceClient, object recordObject) (*PutResponse, error) { - return client.Put(ctx, &PutRequest{Record: NewRecord(object)}) +func Put(ctx context.Context, client DataBrokerServiceClient, objects ...recordObject) (*PutResponse, error) { + records := make([]*Record, len(objects)) + for i, object := range objects { + records[i] = NewRecord(object) + } + return client.Put(ctx, &PutRequest{Records: records}) } // ApplyOffsetAndLimit applies the offset and limit to the list of records. @@ -95,3 +99,45 @@ loop: return records, recordVersion, serverVersion, nil } + +// GetRecord gets the first record, or nil if there are none. +func (x *PutRequest) GetRecord() *Record { + records := x.GetRecords() + if len(records) == 0 { + return nil + } + return records[0] +} + +// GetRecord gets the first record, or nil if there are none. +func (x *PutResponse) GetRecord() *Record { + records := x.GetRecords() + if len(records) == 0 { + return nil + } + return records[0] +} + +// default is 4MB, but we'll do 1MB +const maxMessageSize = 1024 * 1024 * 1 + +// OptimumPutRequestsFromRecords creates one or more PutRequests from a slice of records. +// If the size of the request exceeds the max message size it will be split in half +// recursively until the requests are less than or equal to the max message size. +func OptimumPutRequestsFromRecords(records []*Record) []*PutRequest { + if len(records) <= 1 { + return []*PutRequest{{Records: records}} + } + + req := &PutRequest{ + Records: records, + } + if proto.Size(req) <= maxMessageSize { + return []*PutRequest{req} + } + + return append( + OptimumPutRequestsFromRecords(records[:len(records)/2]), + OptimumPutRequestsFromRecords(records[len(records)/2:])..., + ) +} diff --git a/pkg/grpc/databroker/databroker.pb.go b/pkg/grpc/databroker/databroker.pb.go index 38a64495c..ee9e198b0 100644 --- a/pkg/grpc/databroker/databroker.pb.go +++ b/pkg/grpc/databroker/databroker.pb.go @@ -454,7 +454,7 @@ type PutRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"` + Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"` } func (x *PutRequest) Reset() { @@ -489,9 +489,9 @@ func (*PutRequest) Descriptor() ([]byte, []int) { return file_databroker_proto_rawDescGZIP(), []int{7} } -func (x *PutRequest) GetRecord() *Record { +func (x *PutRequest) GetRecords() []*Record { if x != nil { - return x.Record + return x.Records } return nil } @@ -501,8 +501,8 @@ type PutResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ServerVersion uint64 `protobuf:"varint,1,opt,name=server_version,json=serverVersion,proto3" json:"server_version,omitempty"` - Record *Record `protobuf:"bytes,2,opt,name=record,proto3" json:"record,omitempty"` + ServerVersion uint64 `protobuf:"varint,1,opt,name=server_version,json=serverVersion,proto3" json:"server_version,omitempty"` + Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"` } func (x *PutResponse) Reset() { @@ -544,9 +544,9 @@ func (x *PutResponse) GetServerVersion() uint64 { return 0 } -func (x *PutResponse) GetRecord() *Record { +func (x *PutResponse) GetRecords() []*Record { if x != nil { - return x.Record + return x.Records } return nil } @@ -1164,110 +1164,110 @@ var file_databroker_proto_rawDesc = []byte{ 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, - 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x38, 0x0a, 0x0a, 0x50, 0x75, 0x74, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, - 0x64, 0x22, 0x60, 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, - 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, - 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x22, 0x56, 0x0a, 0x11, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x2d, 0x0a, 0x07, - 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, - 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x43, 0x0a, 0x12, 0x53, - 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x2d, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x22, 0x5b, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, - 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, - 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3a, 0x0a, - 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, + 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x3a, 0x0a, 0x0a, 0x50, 0x75, 0x74, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, + 0x72, 0x64, 0x73, 0x22, 0x62, 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x07, 0x72, 0x65, 0x63, + 0x6f, 0x72, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, + 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, + 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x22, 0x56, 0x0a, 0x11, 0x53, 0x65, 0x74, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x12, 0x2d, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x4f, + 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, + 0x43, 0x0a, 0x12, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x5b, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x0d, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x22, 0x3a, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, + 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x27, 0x0a, + 0x11, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x12, 0x53, 0x79, 0x6e, 0x63, 0x4c, + 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, - 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x27, 0x0a, 0x11, 0x53, 0x79, 0x6e, - 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, - 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, - 0x70, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x12, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, - 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x06, 0x72, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, - 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x48, 0x00, 0x52, - 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x32, 0x0a, 0x08, 0x76, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, 0x61, 0x74, 0x61, - 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x48, - 0x00, 0x52, 0x08, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x0a, 0x0a, 0x08, 0x72, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x60, 0x0a, 0x13, 0x41, 0x63, 0x71, 0x75, 0x69, - 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, - 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x12, 0x35, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, - 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x26, 0x0a, 0x14, 0x41, 0x63, 0x71, - 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, - 0x64, 0x22, 0x39, 0x0a, 0x13, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x65, 0x61, 0x73, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x6e, 0x0a, 0x11, - 0x52, 0x65, 0x6e, 0x65, 0x77, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x35, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x32, 0xfb, 0x04, 0x0a, - 0x11, 0x44, 0x61, 0x74, 0x61, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x51, 0x0a, 0x0c, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, - 0x73, 0x65, 0x12, 0x1f, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, - 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x2e, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, - 0x03, 0x50, 0x75, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x18, - 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, - 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, - 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x0c, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x65, - 0x61, 0x73, 0x65, 0x12, 0x1f, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x43, 0x0a, 0x0a, - 0x52, 0x65, 0x6e, 0x65, 0x77, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x12, 0x1d, 0x2e, 0x64, 0x61, 0x74, - 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x6e, 0x65, 0x77, 0x4c, 0x65, 0x61, - 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x12, 0x4b, 0x0a, 0x0a, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, - 0x1d, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, - 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, - 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x4f, - 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, - 0x0a, 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, - 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, - 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x4d, 0x0a, 0x0a, 0x53, - 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x2e, 0x64, 0x61, 0x74, 0x61, - 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, - 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, - 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, - 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, - 0x72, 0x70, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x64, 0x48, 0x00, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x32, 0x0a, 0x08, 0x76, + 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x08, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x42, + 0x0a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x60, 0x0a, 0x13, 0x41, + 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x35, 0x0a, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x26, 0x0a, + 0x14, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x39, 0x0a, 0x13, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, + 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, + 0x22, 0x6e, 0x0a, 0x11, 0x52, 0x65, 0x6e, 0x65, 0x77, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x35, 0x0a, 0x08, 0x64, 0x75, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x32, 0xfb, 0x04, 0x0a, 0x11, 0x44, 0x61, 0x74, 0x61, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x53, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x51, 0x0a, 0x0c, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, + 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x12, 0x1f, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x2e, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x41, 0x63, 0x71, 0x75, 0x69, 0x72, 0x65, 0x4c, 0x65, 0x61, 0x73, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, + 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, + 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x36, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x50, 0x75, + 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x05, 0x51, 0x75, 0x65, + 0x72, 0x79, 0x12, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, + 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x47, 0x0a, 0x0c, 0x52, 0x65, 0x6c, 0x65, 0x61, + 0x73, 0x65, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x12, 0x1f, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x4c, 0x65, 0x61, 0x73, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x12, 0x43, 0x0a, 0x0a, 0x52, 0x65, 0x6e, 0x65, 0x77, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x12, 0x1d, + 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x6e, 0x65, + 0x77, 0x4c, 0x65, 0x61, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x4b, 0x0a, 0x0a, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x12, 0x1d, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x2e, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, + 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x17, 0x2e, 0x64, 0x61, 0x74, + 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, + 0x4d, 0x0a, 0x0a, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x2e, + 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, + 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, + 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x32, + 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, + 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1314,8 +1314,8 @@ var file_databroker_proto_depIdxs = []int32{ 20, // 2: databroker.Record.deleted_at:type_name -> google.protobuf.Timestamp 0, // 3: databroker.GetResponse.record:type_name -> databroker.Record 0, // 4: databroker.QueryResponse.records:type_name -> databroker.Record - 0, // 5: databroker.PutRequest.record:type_name -> databroker.Record - 0, // 6: databroker.PutResponse.record:type_name -> databroker.Record + 0, // 5: databroker.PutRequest.records:type_name -> databroker.Record + 0, // 6: databroker.PutResponse.records:type_name -> databroker.Record 2, // 7: databroker.SetOptionsRequest.options:type_name -> databroker.Options 2, // 8: databroker.SetOptionsResponse.options:type_name -> databroker.Options 0, // 9: databroker.SyncResponse.record:type_name -> databroker.Record diff --git a/pkg/grpc/databroker/databroker.proto b/pkg/grpc/databroker/databroker.proto index 69f2bdf29..5f8c68406 100644 --- a/pkg/grpc/databroker/databroker.proto +++ b/pkg/grpc/databroker/databroker.proto @@ -46,10 +46,10 @@ message QueryResponse { int64 total_count = 2; } -message PutRequest { Record record = 1; } +message PutRequest { repeated Record records = 1; } message PutResponse { uint64 server_version = 1; - Record record = 2; + repeated Record records = 2; } message SetOptionsRequest { diff --git a/pkg/grpc/databroker/databroker_test.go b/pkg/grpc/databroker/databroker_test.go index 6ae7bc83d..0b5003a3d 100644 --- a/pkg/grpc/databroker/databroker_test.go +++ b/pkg/grpc/databroker/databroker_test.go @@ -2,12 +2,18 @@ package databroker import ( "context" + "fmt" "net" + "strings" "testing" "time" "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + + "github.com/pomerium/pomerium/pkg/protoutil" ) func TestApplyOffsetAndLimit(t *testing.T) { @@ -105,6 +111,26 @@ func TestInitialSync(t *testing.T) { assert.Equal(t, []*Record{r1, r2}, records) } +func TestOptimumPutRequestsFromRecords(t *testing.T) { + var records []*Record + for i := 0; i < 10_000; i++ { + s := structpb.NewStructValue(&structpb.Struct{ + Fields: map[string]*structpb.Value{ + "long_string": structpb.NewStringValue(strings.Repeat("x", 987)), + }, + }) + records = append(records, &Record{ + Id: fmt.Sprintf("%d", i), + Data: protoutil.NewAny(s), + }) + } + requests := OptimumPutRequestsFromRecords(records) + for _, request := range requests { + assert.LessOrEqual(t, proto.Size(request), maxMessageSize) + assert.GreaterOrEqual(t, proto.Size(request), maxMessageSize/2) + } +} + type mockServer struct { DataBrokerServiceServer diff --git a/pkg/grpc/device/device.go b/pkg/grpc/device/device.go index 3db1c4b54..1f6e2c2d9 100644 --- a/pkg/grpc/device/device.go +++ b/pkg/grpc/device/device.go @@ -29,12 +29,12 @@ func DeleteCredential( any := protoutil.NewAny(credential) _, err = client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: credentialID, Data: any, DeletedAt: timestamppb.Now(), - }, + }}, }) return credential, err } @@ -54,12 +54,12 @@ func DeleteEnrollment( any := protoutil.NewAny(enrollment) _, err = client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: enrollmentID, Data: any, DeletedAt: timestamppb.Now(), - }, + }}, }) return enrollment, err } @@ -174,11 +174,11 @@ func PutCredential( any := protoutil.NewAny(credential) _, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: credential.GetId(), Data: any, - }, + }}, }) return err } @@ -191,11 +191,11 @@ func PutEnrollment( ) error { any := protoutil.NewAny(enrollment) _, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: enrollment.GetId(), Data: any, - }, + }}, }) return err } @@ -208,11 +208,11 @@ func PutOwnerCredentialRecord( ) error { any := protoutil.NewAny(ownerCredentialRecord) _, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: base58.Encode(ownerCredentialRecord.GetId()), Data: any, - }, + }}, }) return err } diff --git a/pkg/grpc/session/session.go b/pkg/grpc/session/session.go index 2d713cd6c..009278d22 100644 --- a/pkg/grpc/session/session.go +++ b/pkg/grpc/session/session.go @@ -18,12 +18,12 @@ import ( func Delete(ctx context.Context, client databroker.DataBrokerServiceClient, sessionID string) error { any := protoutil.NewAny(new(Session)) _, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: sessionID, Data: any, DeletedAt: timestamppb.Now(), - }, + }}, }) return err } @@ -52,11 +52,11 @@ func Put(ctx context.Context, client databroker.DataBrokerServiceClient, s *Sess s = proto.Clone(s).(*Session) any := protoutil.NewAny(s) res, err := client.Put(ctx, &databroker.PutRequest{ - Record: &databroker.Record{ + Records: []*databroker.Record{{ Type: any.GetTypeUrl(), Id: s.Id, Data: any, - }, + }}, }) return res, err } diff --git a/pkg/storage/encrypted.go b/pkg/storage/encrypted.go index 904c8e49f..cbca8f1cd 100644 --- a/pkg/storage/encrypted.go +++ b/pkg/storage/encrypted.go @@ -81,20 +81,6 @@ func (e *encryptedBackend) Get(ctx context.Context, recordType, id string) (*dat return record, nil } -func (e *encryptedBackend) GetAll(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) { - records, versions, err := e.underlying.GetAll(ctx) - if err != nil { - return nil, versions, err - } - for i := range records { - records[i], err = e.decryptRecord(records[i]) - if err != nil { - return nil, versions, err - } - } - return records, versions, nil -} - func (e *encryptedBackend) GetOptions(ctx context.Context, recordType string) (*databroker.Options, error) { return e.underlying.GetOptions(ctx, recordType) } @@ -103,23 +89,29 @@ func (e *encryptedBackend) Lease(ctx context.Context, leaseName, leaseID string, return e.underlying.Lease(ctx, leaseName, leaseID, ttl) } -func (e *encryptedBackend) Put(ctx context.Context, record *databroker.Record) (uint64, error) { - encrypted, err := e.encrypt(record.GetData()) +func (e *encryptedBackend) Put(ctx context.Context, records []*databroker.Record) (uint64, error) { + encryptedRecords := make([]*databroker.Record, len(records)) + for i, record := range records { + encrypted, err := e.encrypt(record.GetData()) + if err != nil { + return 0, err + } + + newRecord := proto.Clone(record).(*databroker.Record) + newRecord.Data = encrypted + encryptedRecords[i] = newRecord + } + + serverVersion, err := e.underlying.Put(ctx, encryptedRecords) if err != nil { return 0, err } - newRecord := proto.Clone(record).(*databroker.Record) - newRecord.Data = encrypted - - serverVersion, err := e.underlying.Put(ctx, newRecord) - if err != nil { - return 0, err + for i, record := range records { + record.ModifiedAt = encryptedRecords[i].ModifiedAt + record.Version = encryptedRecords[i].Version } - record.ModifiedAt = newRecord.ModifiedAt - record.Version = newRecord.Version - return serverVersion, nil } @@ -138,6 +130,17 @@ func (e *encryptedBackend) Sync(ctx context.Context, serverVersion, recordVersio }, nil } +func (e *encryptedBackend) SyncLatest(ctx context.Context) (serverVersion uint64, stream RecordStream, err error) { + serverVersion, stream, err = e.underlying.SyncLatest(ctx) + if err != nil { + return serverVersion, nil, err + } + return serverVersion, &encryptedRecordStream{ + underlying: stream, + backend: e, + }, nil +} + func (e *encryptedBackend) decryptRecord(in *databroker.Record) (out *databroker.Record, err error) { data, err := e.decrypt(in.Data) if err != nil { diff --git a/pkg/storage/encrypted_test.go b/pkg/storage/encrypted_test.go index c1a83ddfb..fe6a7a579 100644 --- a/pkg/storage/encrypted_test.go +++ b/pkg/storage/encrypted_test.go @@ -20,10 +20,12 @@ func TestEncryptedBackend(t *testing.T) { m := map[string]*anypb.Any{} backend := &mockBackend{ - put: func(ctx context.Context, record *databroker.Record) (uint64, error) { - record.ModifiedAt = timestamppb.Now() - record.Version++ - m[record.GetId()] = record.GetData() + put: func(ctx context.Context, records []*databroker.Record) (uint64, error) { + for _, record := range records { + record.ModifiedAt = timestamppb.Now() + record.Version++ + m[record.GetId()] = record.GetData() + } return 0, nil }, get: func(ctx context.Context, recordType, id string) (*databroker.Record, error) { @@ -38,18 +40,6 @@ func TestEncryptedBackend(t *testing.T) { ModifiedAt: timestamppb.Now(), }, nil }, - getAll: func(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) { - var records []*databroker.Record - for id, data := range m { - records = append(records, &databroker.Record{ - Id: id, - Data: data, - Version: 1, - ModifiedAt: timestamppb.Now(), - }) - } - return records, &databroker.Versions{}, nil - }, } e, err := NewEncryptedBackend(cryptutil.NewKey(), backend) @@ -64,7 +54,7 @@ func TestEncryptedBackend(t *testing.T) { Id: "TEST-1", Data: any, } - _, err = e.Put(ctx, rec) + _, err = e.Put(ctx, []*databroker.Record{rec}) if !assert.NoError(t, err) { return } @@ -82,14 +72,4 @@ func TestEncryptedBackend(t *testing.T) { assert.Equal(t, any.TypeUrl, record.Data.TypeUrl, "type should be preserved") assert.Equal(t, any.Value, record.Data.Value, "value should be preserved") assert.NotEqual(t, any.TypeUrl, record.Type, "record type should be preserved") - - records, _, err := e.GetAll(ctx) - if !assert.NoError(t, err) { - return - } - if assert.Len(t, records, 1) { - assert.Equal(t, any.TypeUrl, records[0].Data.TypeUrl, "type should be preserved") - assert.Equal(t, any.Value, records[0].Data.Value, "value should be preserved") - assert.NotEqual(t, any.TypeUrl, records[0].Type, "record type should be preserved") - } } diff --git a/pkg/storage/inmemory/backend.go b/pkg/storage/inmemory/backend.go index c298dfc71..830f98001 100644 --- a/pkg/storage/inmemory/backend.go +++ b/pkg/storage/inmemory/backend.go @@ -142,23 +142,6 @@ func (backend *Backend) Get(_ context.Context, recordType, id string) (*databrok return dup(record), nil } -// GetAll gets all the records from the in-memory store. -func (backend *Backend) GetAll(_ context.Context) ([]*databroker.Record, *databroker.Versions, error) { - backend.mu.RLock() - defer backend.mu.RUnlock() - - var all []*databroker.Record - for _, rs := range backend.lookup { - for _, r := range rs.List() { - all = append(all, dup(r)) - } - } - return all, &databroker.Versions{ - ServerVersion: backend.serverVersion, - LatestRecordVersion: backend.lastVersion, - }, nil -} - // GetOptions returns the options for a type in the in-memory store. func (backend *Backend) GetOptions(_ context.Context, recordType string) (*databroker.Options, error) { backend.mu.RLock() @@ -204,37 +187,43 @@ func (backend *Backend) Lease(_ context.Context, leaseName, leaseID string, ttl } // Put puts a record into the in-memory store. -func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) { - if record == nil { - return backend.serverVersion, fmt.Errorf("records cannot be nil") +func (backend *Backend) Put(ctx context.Context, records []*databroker.Record) (serverVersion uint64, err error) { + recordTypes := map[string]struct{}{} + for _, record := range records { + if record == nil { + return backend.serverVersion, fmt.Errorf("records cannot be nil") + } + + ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { + return c.Str("db_op", "put"). + Str("db_id", record.Id). + Str("db_type", record.Type) + }) + + backend.mu.Lock() + defer backend.mu.Unlock() + defer backend.onChange.Broadcast(ctx) + + backend.recordChange(record) + + c, ok := backend.lookup[record.GetType()] + if !ok { + c = NewRecordCollection() + backend.lookup[record.GetType()] = c + } + + if record.GetDeletedAt() != nil { + c.Delete(record.GetId()) + } else { + c.Put(dup(record)) + } + + recordTypes[record.GetType()] = struct{}{} } - - ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { - return c.Str("db_op", "put"). - Str("db_id", record.Id). - Str("db_type", record.Type) - }) - - backend.mu.Lock() - defer backend.mu.Unlock() - defer backend.onChange.Broadcast(ctx) - - backend.recordChange(record) - - c, ok := backend.lookup[record.GetType()] - if !ok { - c = NewRecordCollection() - backend.lookup[record.GetType()] = c + for recordType := range recordTypes { + backend.enforceCapacity(recordType) } - if record.GetDeletedAt() != nil { - c.Delete(record.GetId()) - } else { - c.Put(dup(record)) - } - - backend.enforceCapacity(record.GetType()) - return backend.serverVersion, nil } @@ -255,10 +244,23 @@ func (backend *Backend) SetOptions(_ context.Context, recordType string, options // Sync returns a record stream for any changes after recordVersion. func (backend *Backend) Sync(ctx context.Context, serverVersion, recordVersion uint64) (storage.RecordStream, error) { - if serverVersion != backend.serverVersion { + backend.mu.RLock() + currentServerVersion := backend.serverVersion + backend.mu.RUnlock() + + if serverVersion != currentServerVersion { return nil, storage.ErrInvalidServerVersion } - return newRecordStream(ctx, backend, recordVersion), nil + return newSyncRecordStream(ctx, backend, recordVersion), nil +} + +// SyncLatest returns a record stream for all the records. +func (backend *Backend) SyncLatest(ctx context.Context) (serverVersion uint64, stream storage.RecordStream, err error) { + backend.mu.RLock() + currentServerVersion := backend.serverVersion + backend.mu.RUnlock() + + return currentServerVersion, newSyncLatestRecordStream(ctx, backend), nil } func (backend *Backend) recordChange(record *databroker.Record) { diff --git a/pkg/storage/inmemory/backend_test.go b/pkg/storage/inmemory/backend_test.go index f4cfeb7b4..7de160b5e 100644 --- a/pkg/storage/inmemory/backend_test.go +++ b/pkg/storage/inmemory/backend_test.go @@ -28,11 +28,11 @@ func TestBackend(t *testing.T) { }) t.Run("get record", func(t *testing.T) { data := new(anypb.Any) - sv, err := backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: "abcd", Data: data, - }) + }}) assert.NoError(t, err) assert.Equal(t, backend.serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") @@ -47,31 +47,17 @@ func TestBackend(t *testing.T) { } }) t.Run("delete record", func(t *testing.T) { - sv, err := backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: "abcd", DeletedAt: timestamppb.Now(), - }) + }}) assert.NoError(t, err) assert.Equal(t, backend.serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") assert.Error(t, err) assert.Nil(t, record) }) - t.Run("get all records", func(t *testing.T) { - for i := 0; i < 1000; i++ { - sv, err := backend.Put(ctx, &databroker.Record{ - Type: "TYPE", - Id: fmt.Sprint(i), - }) - assert.NoError(t, err) - assert.Equal(t, backend.serverVersion, sv) - } - records, versions, err := backend.GetAll(ctx) - assert.NoError(t, err) - assert.Len(t, records, 1000) - assert.Equal(t, uint64(1002), versions.LatestRecordVersion) - }) } func TestExpiry(t *testing.T) { @@ -80,10 +66,10 @@ func TestExpiry(t *testing.T) { defer func() { _ = backend.Close() }() for i := 0; i < 1000; i++ { - sv, err := backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }}) assert.NoError(t, err) assert.Equal(t, backend.serverVersion, sv) } @@ -115,15 +101,15 @@ func TestConcurrency(t *testing.T) { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { for i := 0; i < 1000; i++ { - _, _, _ = backend.GetAll(ctx) + _, _ = backend.Get(ctx, "", fmt.Sprint(i)) } return nil }) eg.Go(func() error { for i := 0; i < 1000; i++ { - _, _ = backend.Put(ctx, &databroker.Record{ + _, _ = backend.Put(ctx, []*databroker.Record{{ Id: fmt.Sprint(i), - }) + }}) } return nil }) @@ -152,10 +138,10 @@ func TestStream(t *testing.T) { }) eg.Go(func() error { for i := 0; i < 10000; i++ { - _, err := backend.Put(ctx, &databroker.Record{ + _, err := backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }}) assert.NoError(t, err) } return nil @@ -171,7 +157,7 @@ func TestStreamClose(t *testing.T) { require.NoError(t, err) require.NoError(t, backend.Close()) assert.False(t, stream.Next(true)) - assert.Equal(t, storage.ErrStreamClosed, stream.Err()) + assert.Error(t, stream.Err()) }) t.Run("by stream", func(t *testing.T) { backend := New() @@ -179,7 +165,7 @@ func TestStreamClose(t *testing.T) { require.NoError(t, err) require.NoError(t, stream.Close()) assert.False(t, stream.Next(true)) - assert.Equal(t, storage.ErrStreamClosed, stream.Err()) + assert.Error(t, stream.Err()) }) t.Run("by context", func(t *testing.T) { ctx, cancel := context.WithCancel(ctx) @@ -188,7 +174,7 @@ func TestStreamClose(t *testing.T) { require.NoError(t, err) cancel() assert.False(t, stream.Next(true)) - assert.Equal(t, context.Canceled, stream.Err()) + assert.Error(t, stream.Err()) }) } @@ -203,14 +189,17 @@ func TestCapacity(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - _, err = backend.Put(ctx, &databroker.Record{ + _, err = backend.Put(ctx, []*databroker.Record{{ Type: "EXAMPLE", Id: fmt.Sprint(i), - }) + }}) require.NoError(t, err) } - records, _, err := backend.GetAll(ctx) + _, stream, err := backend.SyncLatest(ctx) + require.NoError(t, err) + + records, err := storage.RecordStreamToList(stream) require.NoError(t, err) assert.Len(t, records, 3) diff --git a/pkg/storage/inmemory/stream.go b/pkg/storage/inmemory/stream.go index b48cf1961..8bef957b2 100644 --- a/pkg/storage/inmemory/stream.go +++ b/pkg/storage/inmemory/stream.go @@ -2,117 +2,74 @@ package inmemory import ( "context" - "sync" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" ) -type recordStream struct { - ctx context.Context - backend *Backend - - changed chan context.Context - ready []*databroker.Record - recordVersion uint64 - - closeOnce sync.Once - closed chan struct{} -} - -func newRecordStream(ctx context.Context, backend *Backend, recordVersion uint64) *recordStream { - stream := &recordStream{ - ctx: ctx, - backend: backend, - - changed: backend.onChange.Bind(), - recordVersion: recordVersion, - - closed: make(chan struct{}), - } - // if the backend is closed, close the stream - go func() { - select { - case <-stream.closed: - case <-backend.closed: - _ = stream.Close() - } - }() - return stream -} - -func (stream *recordStream) fill() { - stream.ready = stream.backend.getSince(stream.recordVersion) - if len(stream.ready) > 0 { - // records are sorted by version, - // so update the local version to the last record - stream.recordVersion = stream.ready[len(stream.ready)-1].GetVersion() - } -} - -func (stream *recordStream) Close() error { - stream.closeOnce.Do(func() { - stream.backend.onChange.Unbind(stream.changed) - close(stream.closed) - }) - return nil -} - -func (stream *recordStream) Next(wait bool) bool { - if len(stream.ready) > 0 { - stream.ready = stream.ready[1:] - } - if len(stream.ready) > 0 { - return true - } - - for { - stream.fill() - if len(stream.ready) > 0 { - return true - } - - if wait { - select { - case <-stream.ctx.Done(): - return false - case <-stream.closed: - return false - case <-stream.changed: - // query for records again +func newSyncLatestRecordStream( + ctx context.Context, + backend *Backend, +) storage.RecordStream { + var ready []*databroker.Record + return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{ + func(ctx context.Context, block bool) (*databroker.Record, error) { + backend.mu.RLock() + for _, co := range backend.lookup { + ready = append(ready, co.List()...) } - } else { - return false - } - } + backend.mu.RUnlock() + return nil, storage.ErrStreamDone + }, + func(ctx context.Context, block bool) (*databroker.Record, error) { + if len(ready) == 0 { + return nil, storage.ErrStreamDone + } + + record := ready[0] + ready = ready[1:] + return dup(record), nil + }, + }, nil) } -func (stream *recordStream) Record() *databroker.Record { - var r *databroker.Record - if len(stream.ready) > 0 { - r = stream.ready[0] - } - return r -} - -func (stream *recordStream) Err() error { - select { - case <-stream.ctx.Done(): - return stream.ctx.Err() - default: - } - - select { - case <-stream.backend.closed: - return storage.ErrStreamClosed - default: - } - - select { - case <-stream.closed: - return storage.ErrStreamClosed - default: - } - - return nil +func newSyncRecordStream( + ctx context.Context, + backend *Backend, + recordVersion uint64, +) storage.RecordStream { + changed := backend.onChange.Bind() + var ready []*databroker.Record + return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{ + func(ctx context.Context, block bool) (*databroker.Record, error) { + if len(ready) > 0 { + record := ready[0] + ready = ready[1:] + return record, nil + } + + for { + ready = backend.getSince(recordVersion) + + if len(ready) > 0 { + // records are sorted by version, + // so update the local version to the last record + recordVersion = ready[len(ready)-1].GetVersion() + record := ready[0] + ready = ready[1:] + return record, nil + } else if !block { + return nil, storage.ErrStreamDone + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-changed: + } + } + }, + }, func() { + backend.onChange.Unbind(changed) + }) } diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index 0bd139e7a..425163c86 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -10,7 +10,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/go-redis/redis/v8" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/log" @@ -136,54 +136,6 @@ func (backend *Backend) Get(ctx context.Context, recordType, id string) (_ *data return &record, nil } -// GetAll gets all the records from redis. -func (backend *Backend) GetAll(ctx context.Context) (records []*databroker.Record, versions *databroker.Versions, err error) { - ctx, span := trace.StartSpan(ctx, "databroker.redis.GetAll") - defer span.End() - defer func(start time.Time) { recordOperation(ctx, start, "getall", err) }(time.Now()) - - versions = new(databroker.Versions) - - versions.ServerVersion, err = backend.getOrCreateServerVersion(ctx) - if err != nil { - return nil, nil, err - } - - p := backend.client.Pipeline() - lastVersionCmd := p.Get(ctx, lastVersionKey) - resultsCmd := p.HVals(ctx, recordHashKey) - _, err = p.Exec(ctx) - if errors.Is(err, redis.Nil) { - // nil is returned when there are no records - return nil, versions, nil - } else if err != nil { - return nil, nil, fmt.Errorf("redis: error beginning GetAll pipeline: %w", err) - } - - versions.LatestRecordVersion, err = lastVersionCmd.Uint64() - if errors.Is(err, redis.Nil) { - } else if err != nil { - return nil, nil, fmt.Errorf("redis: error retrieving GetAll latest record version: %w", err) - } - - var results []string - results, err = resultsCmd.Result() - if err != nil { - return nil, nil, fmt.Errorf("redis: error retrieving GetAll records: %w", err) - } - - for _, result := range results { - var record databroker.Record - err := proto.Unmarshal([]byte(result), &record) - if err != nil { - log.Warn(ctx).Err(err).Msg("redis: invalid record detected") - continue - } - records = append(records, &record) - } - return records, versions, nil -} - // GetOptions gets the options for the given record type. func (backend *Backend) GetOptions(ctx context.Context, recordType string) (*databroker.Options, error) { raw, err := backend.client.HGet(ctx, optionsKey, recordType).Result() @@ -241,7 +193,7 @@ func (backend *Backend) Lease(ctx context.Context, leaseName, leaseID string, tt } // Put puts a record into redis. -func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) { +func (backend *Backend) Put(ctx context.Context, records []*databroker.Record) (serverVersion uint64, err error) { ctx, span := trace.StartSpan(ctx, "databroker.redis.Put") defer span.End() defer func(start time.Time) { recordOperation(ctx, start, "put", err) }(time.Now()) @@ -251,14 +203,20 @@ func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (ser return serverVersion, err } - err = backend.put(ctx, record) + err = backend.put(ctx, records) if err != nil { return serverVersion, err } - err = backend.enforceOptions(ctx, record.GetType()) - if err != nil { - return serverVersion, err + recordTypes := map[string]struct{}{} + for _, record := range records { + recordTypes[record.GetType()] = struct{}{} + } + for recordType := range recordTypes { + err = backend.enforceOptions(ctx, recordType) + if err != nil { + return serverVersion, err + } } return serverVersion, nil @@ -291,36 +249,50 @@ func (backend *Backend) SetOptions(ctx context.Context, recordType string, optio // Sync returns a record stream of any records changed after the specified recordVersion. func (backend *Backend) Sync(ctx context.Context, serverVersion, recordVersion uint64) (storage.RecordStream, error) { - return newRecordStream(ctx, backend, serverVersion, recordVersion), nil + return newSyncRecordStream(ctx, backend, serverVersion, recordVersion), nil } -func (backend *Backend) put(ctx context.Context, record *databroker.Record) error { +// SyncLatest returns a record stream of all the records. Some records may be returned twice if the are updated while the +// stream is streaming. +func (backend *Backend) SyncLatest(ctx context.Context) (serverVersion uint64, stream storage.RecordStream, err error) { + serverVersion, err = backend.getOrCreateServerVersion(ctx) + if err != nil { + return 0, nil, err + } + return serverVersion, newSyncLatestRecordStream(ctx, backend), nil +} + +func (backend *Backend) put(ctx context.Context, records []*databroker.Record) error { return backend.incrementVersion(ctx, func(tx *redis.Tx, version uint64) error { - record.ModifiedAt = timestamppb.Now() - record.Version = version + for i, record := range records { + record.ModifiedAt = timestamppb.Now() + record.Version = version + uint64(i) + } return nil }, func(p redis.Pipeliner, version uint64) error { - bs, err := proto.Marshal(record) - if err != nil { - return err - } + for i, record := range records { + bs, err := proto.Marshal(record) + if err != nil { + return err + } - key, field := getHashKey(record.GetType(), record.GetId()) - if record.DeletedAt != nil { - p.HDel(ctx, key, field) - } else { - p.HSet(ctx, key, field, bs) - p.ZAdd(ctx, getRecordTypeChangesKey(record.GetType()), &redis.Z{ - Score: float64(record.GetModifiedAt().GetSeconds()), - Member: record.GetId(), + key, field := getHashKey(record.GetType(), record.GetId()) + if record.DeletedAt != nil { + p.HDel(ctx, key, field) + } else { + p.HSet(ctx, key, field, bs) + p.ZAdd(ctx, getRecordTypeChangesKey(record.GetType()), &redis.Z{ + Score: float64(record.GetModifiedAt().GetSeconds()) + float64(i)/float64(len(records)), + Member: record.GetId(), + }) + } + p.ZAdd(ctx, changesSetKey, &redis.Z{ + Score: float64(version) + float64(i), + Member: bs, }) } - p.ZAdd(ctx, changesSetKey, &redis.Z{ - Score: float64(version), - Member: bs, - }) return nil }) } @@ -354,7 +326,7 @@ func (backend *Backend) enforceOptions(ctx context.Context, recordType string) e if err == nil { // mark the record as deleted and re-submit record.DeletedAt = timestamppb.Now() - err = backend.put(ctx, record) + err = backend.put(ctx, []*databroker.Record{record}) if err != nil { return err } diff --git a/pkg/storage/redis/redis_test.go b/pkg/storage/redis/redis_test.go index 9d0114804..f4f9acbd4 100644 --- a/pkg/storage/redis/redis_test.go +++ b/pkg/storage/redis/redis_test.go @@ -17,6 +17,7 @@ import ( "github.com/pomerium/pomerium/internal/testutil" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/storage" ) func TestBackend(t *testing.T) { @@ -44,11 +45,11 @@ func TestBackend(t *testing.T) { }) t.Run("get record", func(t *testing.T) { data := new(anypb.Any) - sv, err := backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: "abcd", Data: data, - }) + }}) assert.NoError(t, err) assert.Equal(t, serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") @@ -63,31 +64,17 @@ func TestBackend(t *testing.T) { } }) t.Run("delete record", func(t *testing.T) { - sv, err := backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: "abcd", DeletedAt: timestamppb.Now(), - }) + }}) assert.NoError(t, err) assert.Equal(t, serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") assert.Error(t, err) assert.Nil(t, record) }) - t.Run("get all records", func(t *testing.T) { - for i := 0; i < 1000; i++ { - sv, err := backend.Put(ctx, &databroker.Record{ - Type: "TYPE", - Id: fmt.Sprint(i), - }) - assert.NoError(t, err) - assert.Equal(t, serverVersion, sv) - } - records, versions, err := backend.GetAll(ctx) - assert.NoError(t, err) - assert.Len(t, records, 1000) - assert.Equal(t, uint64(1002), versions.LatestRecordVersion) - }) return nil } @@ -160,10 +147,10 @@ func TestChangeSignal(t *testing.T) { // put a new value to trigger a change for { - _, err = backend.Put(ctx, &databroker.Record{ + _, err = backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: "ID", - }) + }}) if err != nil { return err } @@ -197,10 +184,10 @@ func TestExpiry(t *testing.T) { require.NoError(t, err) for i := 0; i < 1000; i++ { - _, err := backend.Put(ctx, &databroker.Record{ + _, err := backend.Put(ctx, []*databroker.Record{{ Type: "TYPE", Id: fmt.Sprint(i), - }) + }}) assert.NoError(t, err) } stream, err := backend.Sync(ctx, serverVersion, 0) @@ -232,7 +219,9 @@ func TestCapacity(t *testing.T) { t.Skip("Github action can not run docker on MacOS") } - ctx := context.Background() + ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10) + defer clearTimeout() + require.NoError(t, testutil.WithTestRedis(false, func(rawURL string) error { backend, err := New(rawURL, WithExpiry(0)) require.NoError(t, err) @@ -244,14 +233,18 @@ func TestCapacity(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - _, err = backend.Put(ctx, &databroker.Record{ + _, err = backend.Put(ctx, []*databroker.Record{{ Type: "EXAMPLE", Id: fmt.Sprint(i), - }) + }}) require.NoError(t, err) } - records, _, err := backend.GetAll(ctx) + _, stream, err := backend.SyncLatest(ctx) + require.NoError(t, err) + defer stream.Close() + + records, err := storage.RecordStreamToList(stream) require.NoError(t, err) assert.Len(t, records, 3) diff --git a/pkg/storage/redis/stream.go b/pkg/storage/redis/stream.go index 3f766af2a..69ca9429e 100644 --- a/pkg/storage/redis/stream.go +++ b/pkg/storage/redis/stream.go @@ -2,127 +2,162 @@ package redis import ( "context" + "errors" "fmt" - "sync" "time" "github.com/go-redis/redis/v8" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" ) -type recordStream struct { - ctx context.Context - backend *Backend +func newSyncRecordStream( + ctx context.Context, + backend *Backend, + serverVersion uint64, + recordVersion uint64, +) storage.RecordStream { + changed := backend.onChange.Bind() + return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{ + // 1. stream all record changes + func(ctx context.Context, block bool) (*databroker.Record, error) { + ticker := time.NewTicker(watchPollInterval) + defer ticker.Stop() - changed chan context.Context - serverVersion uint64 - recordVersion uint64 - record *databroker.Record - err error + for { + currentServerVersion, err := backend.getOrCreateServerVersion(ctx) + if err != nil { + return nil, err + } + if serverVersion != currentServerVersion { + return nil, storage.ErrInvalidServerVersion + } - closeOnce sync.Once - closed chan struct{} -} + record, err := nextChangedRecord(ctx, backend, &recordVersion) + if err == nil { + return record, nil + } else if !errors.Is(err, storage.ErrStreamDone) { + return nil, err + } -func newRecordStream(ctx context.Context, backend *Backend, serverVersion, recordVersion uint64) *recordStream { - stream := &recordStream{ - ctx: ctx, - backend: backend, + if !block { + return nil, storage.ErrStreamDone + } - changed: backend.onChange.Bind(), - serverVersion: serverVersion, - recordVersion: recordVersion, - - closed: make(chan struct{}), - } - // if the backend is closed, close the stream - go func() { - select { - case <-stream.closed: - case <-backend.closed: - _ = stream.Close() - } - }() - return stream -} - -func (stream *recordStream) Close() error { - stream.closeOnce.Do(func() { - stream.backend.onChange.Unbind(stream.changed) - close(stream.closed) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + case <-changed: + } + } + }, + }, func() { + backend.onChange.Unbind(changed) }) - return nil } -func (stream *recordStream) Next(block bool) bool { - if stream.err != nil { - return false +func newSyncLatestRecordStream( + ctx context.Context, + backend *Backend, +) storage.RecordStream { + var recordVersion, cursor uint64 + scannedOnce := false + var scannedRecords []*databroker.Record + return storage.NewRecordStream(ctx, backend.closed, []storage.RecordStreamGenerator{ + // 1. get the current record version + func(ctx context.Context, block bool) (*databroker.Record, error) { + var err error + recordVersion, err = backend.client.Get(ctx, lastVersionKey).Uint64() + if errors.Is(err, redis.Nil) { + // this happens if there are no records + } else if err != nil { + return nil, err + } + return nil, storage.ErrStreamDone + }, + // 2. stream all the records + func(ctx context.Context, block bool) (*databroker.Record, error) { + for { + if len(scannedRecords) > 0 { + record := scannedRecords[0] + scannedRecords = scannedRecords[1:] + return record, nil + } + + // the cursor is reset to 0 after iteration is complete + if scannedOnce && cursor == 0 { + return nil, storage.ErrStreamDone + } + + var err error + scannedRecords, err = nextScannedRecords(ctx, backend, &cursor) + if err != nil { + return nil, err + } + + scannedOnce = true + } + }, + // 3. stream any records which have been updated in the interim + func(ctx context.Context, block bool) (*databroker.Record, error) { + return nextChangedRecord(ctx, backend, &recordVersion) + }, + }, nil) +} + +func nextScannedRecords(ctx context.Context, backend *Backend, cursor *uint64) ([]*databroker.Record, error) { + var values []string + var err error + values, *cursor, err = backend.client.HScan(ctx, recordHashKey, *cursor, "", 0).Result() + if errors.Is(err, redis.Nil) { + return nil, storage.ErrStreamDone + } else if err != nil { + return nil, err + } else if len(values) == 0 { + return nil, storage.ErrStreamDone } - ticker := time.NewTicker(watchPollInterval) - defer ticker.Stop() - - changeCtx := context.Background() - for { - serverVersion, err := stream.backend.getOrCreateServerVersion(stream.ctx) + var records []*databroker.Record + for i := 1; i < len(values); i += 2 { + var record databroker.Record + err := proto.Unmarshal([]byte(values[i]), &record) if err != nil { - stream.err = err - return false - } - if stream.serverVersion != serverVersion { - stream.err = storage.ErrInvalidServerVersion - return false + log.Warn(ctx).Err(err).Msg("redis: invalid record detected") + continue } + records = append(records, &record) + } + return records, nil +} - cmd := stream.backend.client.ZRangeByScore(stream.ctx, changesSetKey, &redis.ZRangeBy{ - Min: fmt.Sprintf("(%d", stream.recordVersion), +func nextChangedRecord(ctx context.Context, backend *Backend, recordVersion *uint64) (*databroker.Record, error) { + for { + cmd := backend.client.ZRangeByScore(ctx, changesSetKey, &redis.ZRangeBy{ + Min: fmt.Sprintf("(%d", *recordVersion), Max: "+inf", Offset: 0, Count: 1, }) results, err := cmd.Result() - if err != nil { - stream.err = err - return false + if errors.Is(err, redis.Nil) { + return nil, storage.ErrStreamDone + } else if err != nil { + return nil, err + } else if len(results) == 0 { + return nil, storage.ErrStreamDone } - if len(results) > 0 { - result := results[0] - var record databroker.Record - err = proto.Unmarshal([]byte(result), &record) - if err != nil { - log.Warn(changeCtx).Err(err).Msg("redis: invalid record detected") - } else { - stream.record = &record - } - stream.recordVersion++ - return true - } - - if block { - select { - case <-stream.ctx.Done(): - stream.err = stream.ctx.Err() - return false - case <-stream.closed: - return false - case <-ticker.C: // check again - case changeCtx = <-stream.changed: // check again - } - } else { - return false + result := results[0] + var record databroker.Record + err = proto.Unmarshal([]byte(result), &record) + *recordVersion++ + if err == nil { + return &record, nil } + log.Warn(ctx).Err(err).Msg("redis: invalid record detected") } } - -func (stream *recordStream) Record() *databroker.Record { - return stream.record -} - -func (stream *recordStream) Err() error { - return stream.err -} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index dbd1d7e0c..2ed3e353c 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -19,43 +19,28 @@ import ( // Errors var ( ErrNotFound = errors.New("record not found") - ErrStreamClosed = errors.New("record stream closed") + ErrStreamDone = errors.New("record stream done") ErrInvalidServerVersion = status.Error(codes.Aborted, "invalid server version") ) -// A RecordStream is a stream of records. -type RecordStream interface { - // Close closes the record stream and releases any underlying resources. - Close() error - // Next is called to retrieve the next record. If one is available it will - // be returned immediately. If none is available and block is true, the method - // will block until one is available or an error occurs. The error should be - // checked with a call to `.Err()`. - Next(block bool) bool - // Record returns the current record. - Record() *databroker.Record - // Err returns any error that occurred while streaming. - Err() error -} - // Backend is the interface required for a storage backend. type Backend interface { // Close closes the backend. Close() error // Get is used to retrieve a record. Get(ctx context.Context, recordType, id string) (*databroker.Record, error) - // GetAll gets all the records. - GetAll(ctx context.Context) (records []*databroker.Record, version *databroker.Versions, err error) // GetOptions gets the options for a type. GetOptions(ctx context.Context, recordType string) (*databroker.Options, error) // Lease acquires a lease, or renews an existing one. If the lease is acquired true is returned. Lease(ctx context.Context, leaseName, leaseID string, ttl time.Duration) (bool, error) - // Put is used to insert or update a record. - Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) + // Put is used to insert or update records. + Put(ctx context.Context, records []*databroker.Record) (serverVersion uint64, err error) // SetOptions sets the options for a type. SetOptions(ctx context.Context, recordType string, options *databroker.Options) error // Sync syncs record changes after the specified version. Sync(ctx context.Context, serverVersion, recordVersion uint64) (RecordStream, error) + // SyncLatest syncs all the records. + SyncLatest(ctx context.Context) (serverVersion uint64, stream RecordStream, err error) } // MatchAny searches any data with a query. diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 025d1d056..832bbd017 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -13,27 +13,22 @@ import ( type mockBackend struct { Backend - put func(ctx context.Context, record *databroker.Record) (uint64, error) - get func(ctx context.Context, recordType, id string) (*databroker.Record, error) - getAll func(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) + put func(ctx context.Context, records []*databroker.Record) (uint64, error) + get func(ctx context.Context, recordType, id string) (*databroker.Record, error) } func (m *mockBackend) Close() error { return nil } -func (m *mockBackend) Put(ctx context.Context, record *databroker.Record) (uint64, error) { - return m.put(ctx, record) +func (m *mockBackend) Put(ctx context.Context, records []*databroker.Record) (uint64, error) { + return m.put(ctx, records) } func (m *mockBackend) Get(ctx context.Context, recordType, id string) (*databroker.Record, error) { return m.get(ctx, recordType, id) } -func (m *mockBackend) GetAll(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) { - return m.getAll(ctx) -} - func TestMatchAny(t *testing.T) { u := &user.User{Id: "id", Name: "name", Email: "email"} data := protoutil.NewAny(u) diff --git a/pkg/storage/stream.go b/pkg/storage/stream.go new file mode 100644 index 000000000..732cf451d --- /dev/null +++ b/pkg/storage/stream.go @@ -0,0 +1,120 @@ +package storage + +import ( + "context" + "errors" + + "github.com/pomerium/pomerium/pkg/grpc/databroker" +) + +// A RecordStream is a stream of records. +type RecordStream interface { + // Close closes the record stream and releases any underlying resources. + Close() error + // Next is called to retrieve the next record. If one is available it will + // be returned immediately. If none is available and block is true, the method + // will block until one is available or an error occurs. The error should be + // checked with a call to `.Err()`. + Next(block bool) bool + // Record returns the current record. + Record() *databroker.Record + // Err returns any error that occurred while streaming. + Err() error +} + +// A RecordStreamGenerator generates records for a record stream. +type RecordStreamGenerator = func(ctx context.Context, block bool) (*databroker.Record, error) + +type recordStream struct { + generators []RecordStreamGenerator + + record *databroker.Record + err error + + closeCtx context.Context + close context.CancelFunc + onClose func() +} + +// NewRecordStream creates a new RecordStream from a list of generators and an onClose function. +func NewRecordStream( + ctx context.Context, + backendClosed chan struct{}, + generators []RecordStreamGenerator, + onClose func(), +) RecordStream { + stream := &recordStream{ + generators: generators, + onClose: onClose, + } + stream.closeCtx, stream.close = context.WithCancel(ctx) + if backendClosed != nil { + go func() { + defer stream.close() + select { + case <-backendClosed: + case <-stream.closeCtx.Done(): + } + }() + } + + return stream +} + +func (stream *recordStream) Close() error { + stream.close() + if stream.onClose != nil { + stream.onClose() + } + return nil +} + +func (stream *recordStream) Next(block bool) bool { + for { + if len(stream.generators) == 0 || stream.err != nil { + return false + } + + stream.record, stream.err = stream.generators[0](stream.closeCtx, block) + if errors.Is(stream.err, ErrStreamDone) { + stream.err = nil + stream.generators = stream.generators[1:] + continue + } + break + } + + return stream.err == nil +} + +func (stream *recordStream) Record() *databroker.Record { + return stream.record +} + +func (stream *recordStream) Err() error { + return stream.err +} + +// RecordStreamToList converts a record stream to a list. +func RecordStreamToList(recordStream RecordStream) ([]*databroker.Record, error) { + var all []*databroker.Record + for recordStream.Next(false) { + all = append(all, recordStream.Record()) + } + return all, recordStream.Err() +} + +// RecordListToStream converts a record list to a stream. +func RecordListToStream(ctx context.Context, records []*databroker.Record) RecordStream { + return NewRecordStream(ctx, nil, []RecordStreamGenerator{ + func(ctx context.Context, block bool) (*databroker.Record, error) { + if len(records) == 0 { + return nil, ErrStreamDone + } + + record := records[0] + records = records[1:] + return record, nil + }, + }, nil) +} diff --git a/pkg/webauthnutil/credential_storage_test.go b/pkg/webauthnutil/credential_storage_test.go index 2d3c49df8..426b68036 100644 --- a/pkg/webauthnutil/credential_storage_test.go +++ b/pkg/webauthnutil/credential_storage_test.go @@ -41,9 +41,11 @@ func TestCredentialStorage(t *testing.T) { }, nil }, put: func(ctx context.Context, in *databroker.PutRequest, opts ...grpc.CallOption) (*databroker.PutResponse, error) { - m[in.GetRecord().GetType()+"/"+in.GetRecord().GetId()] = in.GetRecord() + for _, record := range in.GetRecords() { + m[record.GetType()+"/"+record.GetId()] = record + } return &databroker.PutResponse{ - Record: in.GetRecord(), + Records: in.GetRecords(), }, nil }, }