diff --git a/authorize/run.go b/authorize/run.go index deb8b8de3..32fc0c07a 100644 --- a/authorize/run.go +++ b/authorize/run.go @@ -91,7 +91,7 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.GetAll") backoff := backoff.NewExponentialBackOff() for { - res, err := databroker.GetAllPages(ctx, a.state.Load().dataBrokerClient, &databroker.GetAllRequest{ + res, err := databroker.InitialSync(ctx, a.state.Load().dataBrokerClient, &databroker.SyncRequest{ Type: typeURL, }) if err != nil { @@ -105,10 +105,10 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error } serverVersion = res.GetServerVersion() - recordVersion = res.GetRecordVersion() for _, record := range res.GetRecords() { a.updateRecord(record) + recordVersion = record.GetVersion() } break diff --git a/go.sum b/go.sum index 17029b111..4d29ee947 100644 --- a/go.sum +++ b/go.sum @@ -249,6 +249,7 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-jsonnet v0.17.0 h1:/9NIEfhK1NQRKl3sP2536b2+x5HnZMdql7x3yK/l8JY= github.com/google/go-jsonnet v0.17.0/go.mod h1:sOcuej3UW1vpPTZOr8L7RQimqai1a57bt5j22LzGZCw= @@ -984,6 +985,7 @@ gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/databroker/server.go b/internal/databroker/server.go index 6f7f2325f..7d9f9f3e3 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -309,9 +309,6 @@ func (srv *Server) doSync(ctx context.Context, if len(updated) == 0 { return nil } - sort.Slice(updated, func(i, j int) bool { - return updated[i].Version < updated[j].Version - }) *recordVersion = updated[len(updated)-1].Version for i := 0; i < len(updated); i += syncBatchSize { j := i + syncBatchSize @@ -346,6 +343,7 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke recordVersion := req.GetRecordVersion() // reset record version if the server versions don't match if req.GetServerVersion() != serverVersion { + serverVersion = req.GetServerVersion() recordVersion = "" // send the new server version to the client err := stream.Send(&databroker.SyncResponse{ @@ -357,13 +355,23 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke } ctx := stream.Context() - ch := db.Watch(ctx) + ctx, cancel := context.WithCancel(ctx) + defer cancel() - // Do first sync, so we won't missed anything. + var ch <-chan struct{} + if !req.GetNoWait() { + ch = db.Watch(ctx) + } + + // Do first sync, so we won't miss anything. if err := srv.doSync(ctx, serverVersion, &recordVersion, db, stream); err != nil { return err } + if req.GetNoWait() { + return nil + } + for range ch { if err := srv.doSync(ctx, serverVersion, &recordVersion, db, stream); err != nil { return err diff --git a/internal/encoding/base64.go b/internal/encoding/base64.go new file mode 100644 index 000000000..71a9c0711 --- /dev/null +++ b/internal/encoding/base64.go @@ -0,0 +1,23 @@ +package encoding + +import ( + "encoding/base64" + "encoding/json" + "strings" +) + +// DecodeBase64OrJSON decodes a JSON string that can optionally be base64 encoded. +func DecodeBase64OrJSON(in string, out interface{}) error { + in = strings.TrimSpace(in) + + // the data can be base64 encoded + if !json.Valid([]byte(in)) { + bs, err := base64.StdEncoding.DecodeString(in) + if err != nil { + return err + } + in = string(bs) + } + + return json.Unmarshal([]byte(in), out) +} diff --git a/internal/encoding/base64_test.go b/internal/encoding/base64_test.go new file mode 100644 index 000000000..77e2054b9 --- /dev/null +++ b/internal/encoding/base64_test.go @@ -0,0 +1,20 @@ +package encoding + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDecodeBase64OrJSON(t *testing.T) { + var obj struct { + X string `json:"x"` + } + err := DecodeBase64OrJSON(` {"x": "y"} `, &obj) + assert.NoError(t, err) + assert.Equal(t, "y", obj.X) + + err = DecodeBase64OrJSON(` eyJ4IjoieiJ9Cg== `, &obj) + assert.NoError(t, err) + assert.Equal(t, "z", obj.X) +} diff --git a/internal/identity/manager/manager.go b/internal/identity/manager/manager.go index 2c9dadf3d..7d5d3cb2a 100644 --- a/internal/identity/manager/manager.go +++ b/internal/identity/manager/manager.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "golang.org/x/oauth2" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" "github.com/pomerium/pomerium/internal/directory" "github.com/pomerium/pomerium/internal/identity/identity" @@ -23,6 +24,10 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/user" ) +const ( + dataBrokerParallelism = 10 +) + // Authenticator is an identity.Provider with only the methods needed by the manager. type Authenticator interface { Refresh(context.Context, *oauth2.Token, identity.State) (*oauth2.Token, error) @@ -61,6 +66,8 @@ type Manager struct { directoryGroupsRecordVersion string directoryNextRefresh time.Time + + dataBrokerSemaphore *semaphore.Weighted } // New creates a new identity manager. @@ -79,6 +86,8 @@ func New( BTree: btree.New(8), }, userScheduler: scheduler.New(), + + dataBrokerSemaphore: semaphore.NewWeighted(dataBrokerParallelism), } mgr.UpdateConfig(options...) return mgr @@ -228,6 +237,8 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) { } func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) { + eg, ctx := errgroup.WithContext(ctx) + lookup := map[string]*directory.Group{} for _, dg := range directoryGroups { lookup[dg.GetId()] = dg @@ -236,44 +247,62 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director for groupID, newDG := range lookup { curDG, ok := mgr.directoryGroups[groupID] if !ok || !proto.Equal(newDG, curDG) { + id := newDG.GetId() any, err := ptypes.MarshalAny(newDG) if err != nil { mgr.log.Warn().Err(err).Msg("failed to marshal directory group") return } - _, err = mgr.cfg.Load().dataBrokerClient.Set(ctx, &databroker.SetRequest{ - Type: any.GetTypeUrl(), - Id: newDG.GetId(), - Data: any, + eg.Go(func() error { + if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { + return err + } + defer mgr.dataBrokerSemaphore.Release(1) + + _, err = mgr.cfg.Load().dataBrokerClient.Set(ctx, &databroker.SetRequest{ + Type: any.GetTypeUrl(), + Id: id, + Data: any, + }) + if err != nil { + return fmt.Errorf("failed to update directory group: %s", id) + } + return nil }) - if err != nil { - mgr.log.Warn().Err(err).Msg("failed to update directory group") - return - } } } for groupID, curDG := range mgr.directoryGroups { _, ok := lookup[groupID] if !ok { + id := curDG.GetId() any, err := ptypes.MarshalAny(curDG) if err != nil { mgr.log.Warn().Err(err).Msg("failed to marshal directory group") return } - _, err = mgr.cfg.Load().dataBrokerClient.Delete(ctx, &databroker.DeleteRequest{ - Type: any.GetTypeUrl(), - Id: curDG.GetId(), + eg.Go(func() error { + if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { + return err + } + defer mgr.dataBrokerSemaphore.Release(1) + + _, err = mgr.cfg.Load().dataBrokerClient.Delete(ctx, &databroker.DeleteRequest{ + Type: any.GetTypeUrl(), + Id: id, + }) + if err != nil { + return fmt.Errorf("failed to delete directory group: %s", id) + } + return nil }) - if err != nil { - mgr.log.Warn().Err(err).Msg("failed to delete directory group") - return - } } } } func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) { + eg, ctx := errgroup.WithContext(ctx) + lookup := map[string]*directory.User{} for _, du := range directoryUsers { lookup[du.GetId()] = du @@ -282,41 +311,61 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory. for userID, newDU := range lookup { curDU, ok := mgr.directoryUsers[userID] if !ok || !proto.Equal(newDU, curDU) { + id := newDU.GetId() any, err := ptypes.MarshalAny(newDU) if err != nil { mgr.log.Warn().Err(err).Msg("failed to marshal directory user") return } - _, err = mgr.cfg.Load().dataBrokerClient.Set(ctx, &databroker.SetRequest{ - Type: any.GetTypeUrl(), - Id: newDU.GetId(), - Data: any, + eg.Go(func() error { + if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { + return err + } + defer mgr.dataBrokerSemaphore.Release(1) + + client := mgr.cfg.Load().dataBrokerClient + if _, err := client.Set(ctx, &databroker.SetRequest{ + Type: any.GetTypeUrl(), + Id: id, + Data: any, + }); err != nil { + return fmt.Errorf("failed to update directory user: %s", id) + } + return nil }) - if err != nil { - mgr.log.Warn().Err(err).Msg("failed to update directory user") - return - } } } for userID, curDU := range mgr.directoryUsers { _, ok := lookup[userID] if !ok { + id := curDU.GetId() any, err := ptypes.MarshalAny(curDU) if err != nil { mgr.log.Warn().Err(err).Msg("failed to marshal directory user") return } - _, err = mgr.cfg.Load().dataBrokerClient.Delete(ctx, &databroker.DeleteRequest{ - Type: any.GetTypeUrl(), - Id: curDU.GetId(), + eg.Go(func() error { + if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { + return err + } + defer mgr.dataBrokerSemaphore.Release(1) + + client := mgr.cfg.Load().dataBrokerClient + if _, err := client.Delete(ctx, &databroker.DeleteRequest{ + Type: any.GetTypeUrl(), + Id: id, + }); err != nil { + return fmt.Errorf("failed to delete directory user: %s", id) + } + return nil }) - if err != nil { - mgr.log.Warn().Err(err).Msg("failed to delete directory user") - return - } } } + + if err := eg.Wait(); err != nil { + mgr.log.Warn().Err(err).Msg("manager: failed to merge users") + } } func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string) { @@ -529,7 +578,7 @@ func (mgr *Manager) initDirectoryUsers(ctx context.Context) error { return err } - res, err := databroker.GetAllPages(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.GetAllRequest{ + res, err := databroker.InitialSync(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.SyncRequest{ Type: any.GetTypeUrl(), }) if err != nil { @@ -545,10 +594,12 @@ func (mgr *Manager) initDirectoryUsers(ctx context.Context) error { } mgr.directoryUsers[pbDirectoryUser.GetId()] = &pbDirectoryUser + mgr.directoryUsersRecordVersion = record.GetVersion() } - mgr.directoryUsersRecordVersion = res.GetRecordVersion() mgr.directoryUsersServerVersion = res.GetServerVersion() + mgr.log.Info().Int("count", len(mgr.directoryUsers)).Msg("initialized directory users") + return nil } @@ -598,7 +649,7 @@ func (mgr *Manager) initDirectoryGroups(ctx context.Context) error { return err } - res, err := databroker.GetAllPages(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.GetAllRequest{ + res, err := databroker.InitialSync(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.SyncRequest{ Type: any.GetTypeUrl(), }) if err != nil { @@ -614,10 +665,12 @@ func (mgr *Manager) initDirectoryGroups(ctx context.Context) error { } mgr.directoryGroups[pbDirectoryGroup.GetId()] = &pbDirectoryGroup + mgr.directoryGroupsRecordVersion = record.GetVersion() } - mgr.directoryGroupsRecordVersion = res.GetRecordVersion() mgr.directoryGroupsServerVersion = res.GetServerVersion() + mgr.log.Info().Int("count", len(mgr.directoryGroups)).Msg("initialized directory groups") + return nil } diff --git a/pkg/grpc/databroker/databroker.go b/pkg/grpc/databroker/databroker.go index 7ff49364f..b579bc03f 100644 --- a/pkg/grpc/databroker/databroker.go +++ b/pkg/grpc/databroker/databroker.go @@ -3,7 +3,10 @@ package databroker import ( "context" + "io" "strings" + + "google.golang.org/protobuf/proto" ) // GetUserID gets the databroker user id from a provider user id. @@ -34,27 +37,32 @@ func ApplyOffsetAndLimit(all []*Record, offset, limit int) (records []*Record, t return records, len(all) } -// GetAllPages calls GetAll for all pages of data. -func GetAllPages(ctx context.Context, client DataBrokerServiceClient, in *GetAllRequest) (*GetAllResponse, error) { - var res GetAllResponse - var pageToken string +// InitialSync performs a sync with no_wait set to true and then returns all the results. +func InitialSync(ctx context.Context, client DataBrokerServiceClient, in *SyncRequest) (*SyncResponse, error) { + dup := new(SyncRequest) + proto.Merge(dup, in) + dup.NoWait = true + + stream, err := client.Sync(ctx, dup) + if err != nil { + return nil, err + } + + finalRes := &SyncResponse{} + +loop: for { - nxt, err := client.GetAll(ctx, &GetAllRequest{ - Type: in.GetType(), - PageToken: pageToken, - }) - if err != nil { + res, err := stream.Recv() + switch { + case err == io.EOF: + break loop + case err != nil: return nil, err } - res.ServerVersion = nxt.ServerVersion - res.RecordVersion = nxt.RecordVersion - res.Records = append(res.Records, nxt.Records...) - - if nxt.NextPageToken == "" { - break - } - pageToken = nxt.NextPageToken + finalRes.ServerVersion = res.GetServerVersion() + finalRes.Records = append(finalRes.Records, res.GetRecords()...) } - return &res, nil + + return finalRes, nil } diff --git a/pkg/grpc/databroker/databroker.pb.go b/pkg/grpc/databroker/databroker.pb.go index 8036beb5b..3d0fdcbc7 100644 --- a/pkg/grpc/databroker/databroker.pb.go +++ b/pkg/grpc/databroker/databroker.pb.go @@ -709,6 +709,7 @@ type SyncRequest struct { ServerVersion string `protobuf:"bytes,1,opt,name=server_version,json=serverVersion,proto3" json:"server_version,omitempty"` RecordVersion string `protobuf:"bytes,2,opt,name=record_version,json=recordVersion,proto3" json:"record_version,omitempty"` Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"` + NoWait bool `protobuf:"varint,4,opt,name=no_wait,json=noWait,proto3" json:"no_wait,omitempty"` } func (x *SyncRequest) Reset() { @@ -764,6 +765,13 @@ func (x *SyncRequest) GetType() string { return "" } +func (x *SyncRequest) GetNoWait() bool { + if x != nil { + return x.NoWait + } + return false +} + type SyncResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -947,59 +955,61 @@ var file_databroker_proto_rawDesc = []byte{ 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x6f, 0x0a, 0x0b, 0x53, - 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x72, 0x65, 0x63, 0x6f, 0x72, - 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x63, 0x0a, 0x0c, - 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, - 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x02, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, - 0x73, 0x22, 0x28, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x79, 0x70, 0x65, 0x73, 0x18, 0x01, - 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x74, 0x79, 0x70, 0x65, 0x73, 0x32, 0x83, 0x04, 0x0a, 0x11, - 0x44, 0x61, 0x74, 0x61, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x61, - 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x36, - 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, - 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, - 0x12, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, - 0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x61, - 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, - 0x12, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, - 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x64, 0x61, 0x74, - 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x53, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, - 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, - 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x40, 0x0a, 0x08, 0x47, 0x65, - 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, - 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x54, - 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x09, - 0x53, 0x79, 0x6e, 0x63, 0x54, 0x79, 0x70, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x1a, 0x1c, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, - 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, - 0x01, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, - 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, - 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x88, 0x01, 0x0a, 0x0b, + 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x72, 0x65, 0x63, 0x6f, + 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, + 0x07, 0x6e, 0x6f, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, + 0x6e, 0x6f, 0x57, 0x61, 0x69, 0x74, 0x22, 0x63, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, + 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, + 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, + 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, + 0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x22, 0x28, 0x0a, 0x10, 0x47, + 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x14, 0x0a, 0x05, 0x74, 0x79, 0x70, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, + 0x74, 0x79, 0x70, 0x65, 0x73, 0x32, 0x83, 0x04, 0x0a, 0x11, 0x44, 0x61, 0x74, 0x61, 0x42, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x44, + 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, + 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x3f, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x12, 0x19, 0x2e, 0x64, 0x61, 0x74, + 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x3c, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x64, 0x61, 0x74, + 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x36, 0x0a, 0x03, 0x53, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, + 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, + 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, + 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x30, 0x01, 0x12, 0x40, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, + 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x09, 0x53, 0x79, 0x6e, 0x63, 0x54, 0x79, + 0x70, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, 0x64, 0x61, + 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x32, 0x5a, 0x30, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, + 0x75, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, + 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/grpc/databroker/databroker.proto b/pkg/grpc/databroker/databroker.proto index b46f4afac..538b029bf 100644 --- a/pkg/grpc/databroker/databroker.proto +++ b/pkg/grpc/databroker/databroker.proto @@ -7,7 +7,9 @@ import "google/protobuf/any.proto"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; -message ServerVersion { string version = 1; } +message ServerVersion { + string version = 1; +} message Record { string version = 1; @@ -28,7 +30,9 @@ message GetRequest { string type = 1; string id = 2; } -message GetResponse { Record record = 1; } +message GetResponse { + Record record = 1; +} message GetAllRequest { string type = 1; @@ -66,13 +70,16 @@ message SyncRequest { string server_version = 1; string record_version = 2; string type = 3; + bool no_wait = 4; } message SyncResponse { string server_version = 1; repeated Record records = 2; } -message GetTypesResponse { repeated string types = 1; } +message GetTypesResponse { + repeated string types = 1; +} service DataBrokerService { rpc Delete(DeleteRequest) returns (google.protobuf.Empty); diff --git a/pkg/grpc/databroker/databroker_test.go b/pkg/grpc/databroker/databroker_test.go index a9def882f..ecacda32a 100644 --- a/pkg/grpc/databroker/databroker_test.go +++ b/pkg/grpc/databroker/databroker_test.go @@ -3,16 +3,11 @@ package databroker import ( "context" "net" - "strconv" "testing" "time" "github.com/stretchr/testify/assert" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/wrapperspb" ) func TestApplyOffsetAndLimit(t *testing.T) { @@ -54,103 +49,61 @@ func TestApplyOffsetAndLimit(t *testing.T) { } } +func TestInitialSync(t *testing.T) { + ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10) + defer clearTimeout() + + li, err := net.Listen("tcp", "127.0.0.1:0") + if !assert.NoError(t, err) { + return + } + defer li.Close() + + r1 := new(Record) + r2 := new(Record) + r3 := new(Record) + + m := &mockServer{ + sync: func(req *SyncRequest, stream DataBrokerService_SyncServer) error { + assert.Equal(t, true, req.GetNoWait()) + stream.Send(&SyncResponse{ + ServerVersion: "a", + Records: []*Record{r1, r2}, + }) + stream.Send(&SyncResponse{ + ServerVersion: "b", + Records: []*Record{r3}, + }) + return nil + }, + } + + srv := grpc.NewServer() + RegisterDataBrokerServiceServer(srv, m) + go srv.Serve(li) + + cc, err := grpc.Dial(li.Addr().String(), grpc.WithInsecure()) + if !assert.NoError(t, err) { + return + } + defer cc.Close() + + c := NewDataBrokerServiceClient(cc) + + res, err := InitialSync(ctx, c, &SyncRequest{ + Type: "TEST", + }) + assert.NoError(t, err) + assert.Equal(t, "b", res.GetServerVersion()) + assert.Equal(t, []*Record{r1, r2, r3}, res.GetRecords()) +} + type mockServer struct { DataBrokerServiceServer - getAll func(context.Context, *GetAllRequest) (*GetAllResponse, error) + sync func(*SyncRequest, DataBrokerService_SyncServer) error } -func (m *mockServer) GetAll(ctx context.Context, req *GetAllRequest) (*GetAllResponse, error) { - return m.getAll(ctx, req) -} - -func TestGetAllPages(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - t.Run("resource exhausted", func(t *testing.T) { - li, err := net.Listen("tcp", "127.0.0.1:0") - if !assert.NoError(t, err) { - return - } - defer li.Close() - - m := &mockServer{ - getAll: func(ctx context.Context, req *GetAllRequest) (*GetAllResponse, error) { - any, _ := anypb.New(wrapperspb.String("TEST")) - var records []*Record - for i := 0; i < 1000000; i++ { - records = append(records, &Record{ - Type: req.GetType(), - Data: any, - }) - } - return &GetAllResponse{Records: records}, nil - }, - } - - srv := grpc.NewServer() - RegisterDataBrokerServiceServer(srv, m) - go srv.Serve(li) - - cc, err := grpc.Dial(li.Addr().String(), grpc.WithInsecure()) - if !assert.NoError(t, err) { - return - } - defer cc.Close() - - c := NewDataBrokerServiceClient(cc) - - res, err := GetAllPages(ctx, c, &GetAllRequest{ - Type: "TEST", - }) - assert.Error(t, err) - assert.Equal(t, codes.ResourceExhausted, status.Code(err)) - assert.Nil(t, res) - }) - t.Run("with paging", func(t *testing.T) { - li, err := net.Listen("tcp", "127.0.0.1:0") - if !assert.NoError(t, err) { - return - } - defer li.Close() - - m := &mockServer{ - getAll: func(ctx context.Context, req *GetAllRequest) (*GetAllResponse, error) { - pageToken, _ := strconv.Atoi(req.GetPageToken()) - - any, _ := anypb.New(wrapperspb.String("TEST")) - var records []*Record - for i := pageToken; i < pageToken+10000 && i < 1000000; i++ { - records = append(records, &Record{ - Type: req.GetType(), - Data: any, - }) - } - if len(records) == 0 { - return &GetAllResponse{}, nil - } - return &GetAllResponse{Records: records, NextPageToken: strconv.Itoa(pageToken + 10000)}, nil - }, - } - - srv := grpc.NewServer() - RegisterDataBrokerServiceServer(srv, m) - go srv.Serve(li) - - cc, err := grpc.Dial(li.Addr().String(), grpc.WithInsecure()) - if !assert.NoError(t, err) { - return - } - defer cc.Close() - - c := NewDataBrokerServiceClient(cc) - - res, err := GetAllPages(ctx, c, &GetAllRequest{ - Type: "TEST", - }) - assert.NoError(t, err) - assert.NotEqual(t, codes.ResourceExhausted, status.Code(err)) - assert.Len(t, res.GetRecords(), 1000000) - }) +func (m *mockServer) Sync(req *SyncRequest, stream DataBrokerService_SyncServer) error { + return m.sync(req, stream) }