mirror of
https://github.com/pomerium/pomerium.git
synced 2025-04-30 10:56:28 +02:00
replace GetAllPages with InitialSync, improve merge performance (#1624)
* replace GetAllPages with InitialSync, improve merge performance * fmt proto * add test for base64 function * add sync test * go mod tidy Co-authored-by: Bobby DeSimone <bobbydesimone@gmail.com>
This commit is contained in:
parent
ba55fec67c
commit
aad8ac2e61
10 changed files with 298 additions and 214 deletions
|
@ -91,7 +91,7 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error
|
||||||
ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.GetAll")
|
ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.GetAll")
|
||||||
backoff := backoff.NewExponentialBackOff()
|
backoff := backoff.NewExponentialBackOff()
|
||||||
for {
|
for {
|
||||||
res, err := databroker.GetAllPages(ctx, a.state.Load().dataBrokerClient, &databroker.GetAllRequest{
|
res, err := databroker.InitialSync(ctx, a.state.Load().dataBrokerClient, &databroker.SyncRequest{
|
||||||
Type: typeURL,
|
Type: typeURL,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -105,10 +105,10 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
serverVersion = res.GetServerVersion()
|
serverVersion = res.GetServerVersion()
|
||||||
recordVersion = res.GetRecordVersion()
|
|
||||||
|
|
||||||
for _, record := range res.GetRecords() {
|
for _, record := range res.GetRecords() {
|
||||||
a.updateRecord(record)
|
a.updateRecord(record)
|
||||||
|
recordVersion = record.GetVersion()
|
||||||
}
|
}
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -249,6 +249,7 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
|
||||||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
|
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
|
||||||
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
|
||||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-jsonnet v0.17.0 h1:/9NIEfhK1NQRKl3sP2536b2+x5HnZMdql7x3yK/l8JY=
|
github.com/google/go-jsonnet v0.17.0 h1:/9NIEfhK1NQRKl3sP2536b2+x5HnZMdql7x3yK/l8JY=
|
||||||
github.com/google/go-jsonnet v0.17.0/go.mod h1:sOcuej3UW1vpPTZOr8L7RQimqai1a57bt5j22LzGZCw=
|
github.com/google/go-jsonnet v0.17.0/go.mod h1:sOcuej3UW1vpPTZOr8L7RQimqai1a57bt5j22LzGZCw=
|
||||||
|
@ -984,6 +985,7 @@ gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
|
||||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|
|
@ -309,9 +309,6 @@ func (srv *Server) doSync(ctx context.Context,
|
||||||
if len(updated) == 0 {
|
if len(updated) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
sort.Slice(updated, func(i, j int) bool {
|
|
||||||
return updated[i].Version < updated[j].Version
|
|
||||||
})
|
|
||||||
*recordVersion = updated[len(updated)-1].Version
|
*recordVersion = updated[len(updated)-1].Version
|
||||||
for i := 0; i < len(updated); i += syncBatchSize {
|
for i := 0; i < len(updated); i += syncBatchSize {
|
||||||
j := i + syncBatchSize
|
j := i + syncBatchSize
|
||||||
|
@ -346,6 +343,7 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke
|
||||||
recordVersion := req.GetRecordVersion()
|
recordVersion := req.GetRecordVersion()
|
||||||
// reset record version if the server versions don't match
|
// reset record version if the server versions don't match
|
||||||
if req.GetServerVersion() != serverVersion {
|
if req.GetServerVersion() != serverVersion {
|
||||||
|
serverVersion = req.GetServerVersion()
|
||||||
recordVersion = ""
|
recordVersion = ""
|
||||||
// send the new server version to the client
|
// send the new server version to the client
|
||||||
err := stream.Send(&databroker.SyncResponse{
|
err := stream.Send(&databroker.SyncResponse{
|
||||||
|
@ -357,13 +355,23 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := stream.Context()
|
ctx := stream.Context()
|
||||||
ch := db.Watch(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
// Do first sync, so we won't missed anything.
|
var ch <-chan struct{}
|
||||||
|
if !req.GetNoWait() {
|
||||||
|
ch = db.Watch(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do first sync, so we won't miss anything.
|
||||||
if err := srv.doSync(ctx, serverVersion, &recordVersion, db, stream); err != nil {
|
if err := srv.doSync(ctx, serverVersion, &recordVersion, db, stream); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.GetNoWait() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
for range ch {
|
for range ch {
|
||||||
if err := srv.doSync(ctx, serverVersion, &recordVersion, db, stream); err != nil {
|
if err := srv.doSync(ctx, serverVersion, &recordVersion, db, stream); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
23
internal/encoding/base64.go
Normal file
23
internal/encoding/base64.go
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
package encoding
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DecodeBase64OrJSON decodes a JSON string that can optionally be base64 encoded.
|
||||||
|
func DecodeBase64OrJSON(in string, out interface{}) error {
|
||||||
|
in = strings.TrimSpace(in)
|
||||||
|
|
||||||
|
// the data can be base64 encoded
|
||||||
|
if !json.Valid([]byte(in)) {
|
||||||
|
bs, err := base64.StdEncoding.DecodeString(in)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
in = string(bs)
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Unmarshal([]byte(in), out)
|
||||||
|
}
|
20
internal/encoding/base64_test.go
Normal file
20
internal/encoding/base64_test.go
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
package encoding
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDecodeBase64OrJSON(t *testing.T) {
|
||||||
|
var obj struct {
|
||||||
|
X string `json:"x"`
|
||||||
|
}
|
||||||
|
err := DecodeBase64OrJSON(` {"x": "y"} `, &obj)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, "y", obj.X)
|
||||||
|
|
||||||
|
err = DecodeBase64OrJSON(` eyJ4IjoieiJ9Cg== `, &obj)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, "z", obj.X)
|
||||||
|
}
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/directory"
|
"github.com/pomerium/pomerium/internal/directory"
|
||||||
"github.com/pomerium/pomerium/internal/identity/identity"
|
"github.com/pomerium/pomerium/internal/identity/identity"
|
||||||
|
@ -23,6 +24,10 @@ import (
|
||||||
"github.com/pomerium/pomerium/pkg/grpc/user"
|
"github.com/pomerium/pomerium/pkg/grpc/user"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
dataBrokerParallelism = 10
|
||||||
|
)
|
||||||
|
|
||||||
// Authenticator is an identity.Provider with only the methods needed by the manager.
|
// Authenticator is an identity.Provider with only the methods needed by the manager.
|
||||||
type Authenticator interface {
|
type Authenticator interface {
|
||||||
Refresh(context.Context, *oauth2.Token, identity.State) (*oauth2.Token, error)
|
Refresh(context.Context, *oauth2.Token, identity.State) (*oauth2.Token, error)
|
||||||
|
@ -61,6 +66,8 @@ type Manager struct {
|
||||||
directoryGroupsRecordVersion string
|
directoryGroupsRecordVersion string
|
||||||
|
|
||||||
directoryNextRefresh time.Time
|
directoryNextRefresh time.Time
|
||||||
|
|
||||||
|
dataBrokerSemaphore *semaphore.Weighted
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new identity manager.
|
// New creates a new identity manager.
|
||||||
|
@ -79,6 +86,8 @@ func New(
|
||||||
BTree: btree.New(8),
|
BTree: btree.New(8),
|
||||||
},
|
},
|
||||||
userScheduler: scheduler.New(),
|
userScheduler: scheduler.New(),
|
||||||
|
|
||||||
|
dataBrokerSemaphore: semaphore.NewWeighted(dataBrokerParallelism),
|
||||||
}
|
}
|
||||||
mgr.UpdateConfig(options...)
|
mgr.UpdateConfig(options...)
|
||||||
return mgr
|
return mgr
|
||||||
|
@ -228,6 +237,8 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) {
|
func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*directory.Group) {
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
lookup := map[string]*directory.Group{}
|
lookup := map[string]*directory.Group{}
|
||||||
for _, dg := range directoryGroups {
|
for _, dg := range directoryGroups {
|
||||||
lookup[dg.GetId()] = dg
|
lookup[dg.GetId()] = dg
|
||||||
|
@ -236,44 +247,62 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director
|
||||||
for groupID, newDG := range lookup {
|
for groupID, newDG := range lookup {
|
||||||
curDG, ok := mgr.directoryGroups[groupID]
|
curDG, ok := mgr.directoryGroups[groupID]
|
||||||
if !ok || !proto.Equal(newDG, curDG) {
|
if !ok || !proto.Equal(newDG, curDG) {
|
||||||
|
id := newDG.GetId()
|
||||||
any, err := ptypes.MarshalAny(newDG)
|
any, err := ptypes.MarshalAny(newDG)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mgr.log.Warn().Err(err).Msg("failed to marshal directory group")
|
mgr.log.Warn().Err(err).Msg("failed to marshal directory group")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, err = mgr.cfg.Load().dataBrokerClient.Set(ctx, &databroker.SetRequest{
|
eg.Go(func() error {
|
||||||
Type: any.GetTypeUrl(),
|
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
|
||||||
Id: newDG.GetId(),
|
return err
|
||||||
Data: any,
|
}
|
||||||
|
defer mgr.dataBrokerSemaphore.Release(1)
|
||||||
|
|
||||||
|
_, err = mgr.cfg.Load().dataBrokerClient.Set(ctx, &databroker.SetRequest{
|
||||||
|
Type: any.GetTypeUrl(),
|
||||||
|
Id: id,
|
||||||
|
Data: any,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to update directory group: %s", id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
mgr.log.Warn().Err(err).Msg("failed to update directory group")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for groupID, curDG := range mgr.directoryGroups {
|
for groupID, curDG := range mgr.directoryGroups {
|
||||||
_, ok := lookup[groupID]
|
_, ok := lookup[groupID]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
id := curDG.GetId()
|
||||||
any, err := ptypes.MarshalAny(curDG)
|
any, err := ptypes.MarshalAny(curDG)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mgr.log.Warn().Err(err).Msg("failed to marshal directory group")
|
mgr.log.Warn().Err(err).Msg("failed to marshal directory group")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, err = mgr.cfg.Load().dataBrokerClient.Delete(ctx, &databroker.DeleteRequest{
|
eg.Go(func() error {
|
||||||
Type: any.GetTypeUrl(),
|
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
|
||||||
Id: curDG.GetId(),
|
return err
|
||||||
|
}
|
||||||
|
defer mgr.dataBrokerSemaphore.Release(1)
|
||||||
|
|
||||||
|
_, err = mgr.cfg.Load().dataBrokerClient.Delete(ctx, &databroker.DeleteRequest{
|
||||||
|
Type: any.GetTypeUrl(),
|
||||||
|
Id: id,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to delete directory group: %s", id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
mgr.log.Warn().Err(err).Msg("failed to delete directory group")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) {
|
func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.User) {
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
lookup := map[string]*directory.User{}
|
lookup := map[string]*directory.User{}
|
||||||
for _, du := range directoryUsers {
|
for _, du := range directoryUsers {
|
||||||
lookup[du.GetId()] = du
|
lookup[du.GetId()] = du
|
||||||
|
@ -282,41 +311,61 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.
|
||||||
for userID, newDU := range lookup {
|
for userID, newDU := range lookup {
|
||||||
curDU, ok := mgr.directoryUsers[userID]
|
curDU, ok := mgr.directoryUsers[userID]
|
||||||
if !ok || !proto.Equal(newDU, curDU) {
|
if !ok || !proto.Equal(newDU, curDU) {
|
||||||
|
id := newDU.GetId()
|
||||||
any, err := ptypes.MarshalAny(newDU)
|
any, err := ptypes.MarshalAny(newDU)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mgr.log.Warn().Err(err).Msg("failed to marshal directory user")
|
mgr.log.Warn().Err(err).Msg("failed to marshal directory user")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, err = mgr.cfg.Load().dataBrokerClient.Set(ctx, &databroker.SetRequest{
|
eg.Go(func() error {
|
||||||
Type: any.GetTypeUrl(),
|
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
|
||||||
Id: newDU.GetId(),
|
return err
|
||||||
Data: any,
|
}
|
||||||
|
defer mgr.dataBrokerSemaphore.Release(1)
|
||||||
|
|
||||||
|
client := mgr.cfg.Load().dataBrokerClient
|
||||||
|
if _, err := client.Set(ctx, &databroker.SetRequest{
|
||||||
|
Type: any.GetTypeUrl(),
|
||||||
|
Id: id,
|
||||||
|
Data: any,
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("failed to update directory user: %s", id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
mgr.log.Warn().Err(err).Msg("failed to update directory user")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for userID, curDU := range mgr.directoryUsers {
|
for userID, curDU := range mgr.directoryUsers {
|
||||||
_, ok := lookup[userID]
|
_, ok := lookup[userID]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
id := curDU.GetId()
|
||||||
any, err := ptypes.MarshalAny(curDU)
|
any, err := ptypes.MarshalAny(curDU)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mgr.log.Warn().Err(err).Msg("failed to marshal directory user")
|
mgr.log.Warn().Err(err).Msg("failed to marshal directory user")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, err = mgr.cfg.Load().dataBrokerClient.Delete(ctx, &databroker.DeleteRequest{
|
eg.Go(func() error {
|
||||||
Type: any.GetTypeUrl(),
|
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
|
||||||
Id: curDU.GetId(),
|
return err
|
||||||
|
}
|
||||||
|
defer mgr.dataBrokerSemaphore.Release(1)
|
||||||
|
|
||||||
|
client := mgr.cfg.Load().dataBrokerClient
|
||||||
|
if _, err := client.Delete(ctx, &databroker.DeleteRequest{
|
||||||
|
Type: any.GetTypeUrl(),
|
||||||
|
Id: id,
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("failed to delete directory user: %s", id)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
mgr.log.Warn().Err(err).Msg("failed to delete directory user")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
mgr.log.Warn().Err(err).Msg("manager: failed to merge users")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string) {
|
func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string) {
|
||||||
|
@ -529,7 +578,7 @@ func (mgr *Manager) initDirectoryUsers(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := databroker.GetAllPages(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.GetAllRequest{
|
res, err := databroker.InitialSync(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.SyncRequest{
|
||||||
Type: any.GetTypeUrl(),
|
Type: any.GetTypeUrl(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -545,10 +594,12 @@ func (mgr *Manager) initDirectoryUsers(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
mgr.directoryUsers[pbDirectoryUser.GetId()] = &pbDirectoryUser
|
mgr.directoryUsers[pbDirectoryUser.GetId()] = &pbDirectoryUser
|
||||||
|
mgr.directoryUsersRecordVersion = record.GetVersion()
|
||||||
}
|
}
|
||||||
mgr.directoryUsersRecordVersion = res.GetRecordVersion()
|
|
||||||
mgr.directoryUsersServerVersion = res.GetServerVersion()
|
mgr.directoryUsersServerVersion = res.GetServerVersion()
|
||||||
|
|
||||||
|
mgr.log.Info().Int("count", len(mgr.directoryUsers)).Msg("initialized directory users")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -598,7 +649,7 @@ func (mgr *Manager) initDirectoryGroups(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := databroker.GetAllPages(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.GetAllRequest{
|
res, err := databroker.InitialSync(ctx, mgr.cfg.Load().dataBrokerClient, &databroker.SyncRequest{
|
||||||
Type: any.GetTypeUrl(),
|
Type: any.GetTypeUrl(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -614,10 +665,12 @@ func (mgr *Manager) initDirectoryGroups(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
mgr.directoryGroups[pbDirectoryGroup.GetId()] = &pbDirectoryGroup
|
mgr.directoryGroups[pbDirectoryGroup.GetId()] = &pbDirectoryGroup
|
||||||
|
mgr.directoryGroupsRecordVersion = record.GetVersion()
|
||||||
}
|
}
|
||||||
mgr.directoryGroupsRecordVersion = res.GetRecordVersion()
|
|
||||||
mgr.directoryGroupsServerVersion = res.GetServerVersion()
|
mgr.directoryGroupsServerVersion = res.GetServerVersion()
|
||||||
|
|
||||||
|
mgr.log.Info().Int("count", len(mgr.directoryGroups)).Msg("initialized directory groups")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,10 @@ package databroker
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetUserID gets the databroker user id from a provider user id.
|
// GetUserID gets the databroker user id from a provider user id.
|
||||||
|
@ -34,27 +37,32 @@ func ApplyOffsetAndLimit(all []*Record, offset, limit int) (records []*Record, t
|
||||||
return records, len(all)
|
return records, len(all)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAllPages calls GetAll for all pages of data.
|
// InitialSync performs a sync with no_wait set to true and then returns all the results.
|
||||||
func GetAllPages(ctx context.Context, client DataBrokerServiceClient, in *GetAllRequest) (*GetAllResponse, error) {
|
func InitialSync(ctx context.Context, client DataBrokerServiceClient, in *SyncRequest) (*SyncResponse, error) {
|
||||||
var res GetAllResponse
|
dup := new(SyncRequest)
|
||||||
var pageToken string
|
proto.Merge(dup, in)
|
||||||
|
dup.NoWait = true
|
||||||
|
|
||||||
|
stream, err := client.Sync(ctx, dup)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
finalRes := &SyncResponse{}
|
||||||
|
|
||||||
|
loop:
|
||||||
for {
|
for {
|
||||||
nxt, err := client.GetAll(ctx, &GetAllRequest{
|
res, err := stream.Recv()
|
||||||
Type: in.GetType(),
|
switch {
|
||||||
PageToken: pageToken,
|
case err == io.EOF:
|
||||||
})
|
break loop
|
||||||
if err != nil {
|
case err != nil:
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res.ServerVersion = nxt.ServerVersion
|
finalRes.ServerVersion = res.GetServerVersion()
|
||||||
res.RecordVersion = nxt.RecordVersion
|
finalRes.Records = append(finalRes.Records, res.GetRecords()...)
|
||||||
res.Records = append(res.Records, nxt.Records...)
|
|
||||||
|
|
||||||
if nxt.NextPageToken == "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
pageToken = nxt.NextPageToken
|
|
||||||
}
|
}
|
||||||
return &res, nil
|
|
||||||
|
return finalRes, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -709,6 +709,7 @@ type SyncRequest struct {
|
||||||
ServerVersion string `protobuf:"bytes,1,opt,name=server_version,json=serverVersion,proto3" json:"server_version,omitempty"`
|
ServerVersion string `protobuf:"bytes,1,opt,name=server_version,json=serverVersion,proto3" json:"server_version,omitempty"`
|
||||||
RecordVersion string `protobuf:"bytes,2,opt,name=record_version,json=recordVersion,proto3" json:"record_version,omitempty"`
|
RecordVersion string `protobuf:"bytes,2,opt,name=record_version,json=recordVersion,proto3" json:"record_version,omitempty"`
|
||||||
Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"`
|
Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"`
|
||||||
|
NoWait bool `protobuf:"varint,4,opt,name=no_wait,json=noWait,proto3" json:"no_wait,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *SyncRequest) Reset() {
|
func (x *SyncRequest) Reset() {
|
||||||
|
@ -764,6 +765,13 @@ func (x *SyncRequest) GetType() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *SyncRequest) GetNoWait() bool {
|
||||||
|
if x != nil {
|
||||||
|
return x.NoWait
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
type SyncResponse struct {
|
type SyncResponse struct {
|
||||||
state protoimpl.MessageState
|
state protoimpl.MessageState
|
||||||
sizeCache protoimpl.SizeCache
|
sizeCache protoimpl.SizeCache
|
||||||
|
@ -947,59 +955,61 @@ var file_databroker_proto_rawDesc = []byte{
|
||||||
0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65,
|
0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65,
|
||||||
0x63, 0x6f, 0x72, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76,
|
0x63, 0x6f, 0x72, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76,
|
||||||
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65,
|
0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65,
|
||||||
0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x6f, 0x0a, 0x0b, 0x53,
|
0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x88, 0x01, 0x0a, 0x0b,
|
||||||
0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65,
|
0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73,
|
||||||
0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01,
|
0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20,
|
||||||
0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f,
|
0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69,
|
||||||
0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73,
|
0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72,
|
||||||
0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x72, 0x65, 0x63, 0x6f, 0x72,
|
0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x72, 0x65, 0x63, 0x6f,
|
||||||
0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65,
|
0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70,
|
||||||
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x63, 0x0a, 0x0c,
|
0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a,
|
||||||
0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e,
|
0x07, 0x6e, 0x6f, 0x5f, 0x77, 0x61, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06,
|
||||||
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01,
|
0x6e, 0x6f, 0x57, 0x61, 0x69, 0x74, 0x22, 0x63, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65,
|
||||||
0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73,
|
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72,
|
||||||
0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x02,
|
0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
|
||||||
0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65,
|
0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a,
|
||||||
0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64,
|
0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12,
|
||||||
0x73, 0x22, 0x28, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73,
|
0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f,
|
||||||
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x79, 0x70, 0x65, 0x73, 0x18, 0x01,
|
0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x22, 0x28, 0x0a, 0x10, 0x47,
|
||||||
0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x74, 0x79, 0x70, 0x65, 0x73, 0x32, 0x83, 0x04, 0x0a, 0x11,
|
0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
|
||||||
0x44, 0x61, 0x74, 0x61, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
|
0x14, 0x0a, 0x05, 0x74, 0x79, 0x70, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05,
|
||||||
0x65, 0x12, 0x3b, 0x0a, 0x06, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x61,
|
0x74, 0x79, 0x70, 0x65, 0x73, 0x32, 0x83, 0x04, 0x0a, 0x11, 0x44, 0x61, 0x74, 0x61, 0x42, 0x72,
|
||||||
0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52,
|
0x6f, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x44,
|
||||||
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
|
0x65, 0x6c, 0x65, 0x74, 0x65, 0x12, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b,
|
||||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x36,
|
0x65, 0x72, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
|
||||||
0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b,
|
0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
|
||||||
0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e,
|
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12,
|
||||||
0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65,
|
0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74,
|
||||||
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c,
|
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72,
|
||||||
0x12, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65,
|
0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
|
||||||
0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x61,
|
0x12, 0x3f, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x12, 0x19, 0x2e, 0x64, 0x61, 0x74,
|
||||||
0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x52,
|
0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65,
|
||||||
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79,
|
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b,
|
||||||
0x12, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75,
|
0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
|
||||||
0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x64, 0x61, 0x74,
|
0x65, 0x12, 0x3c, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x64, 0x61, 0x74,
|
||||||
0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73,
|
0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71,
|
||||||
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x53, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64,
|
0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65,
|
||||||
0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x71,
|
0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
|
||||||
0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65,
|
0x36, 0x0a, 0x03, 0x53, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f,
|
||||||
0x72, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a,
|
0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17,
|
||||||
0x04, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b,
|
0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x52,
|
||||||
0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18,
|
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x04, 0x53, 0x79, 0x6e, 0x63, 0x12,
|
||||||
0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63,
|
0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e,
|
||||||
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x40, 0x0a, 0x08, 0x47, 0x65,
|
0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62,
|
||||||
0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
|
0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
|
||||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c,
|
0x73, 0x65, 0x30, 0x01, 0x12, 0x40, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73,
|
||||||
0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x54,
|
0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
|
||||||
0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x09,
|
0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62,
|
||||||
0x53, 0x79, 0x6e, 0x63, 0x54, 0x79, 0x70, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
|
0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65,
|
||||||
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74,
|
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x09, 0x53, 0x79, 0x6e, 0x63, 0x54, 0x79,
|
||||||
0x79, 0x1a, 0x1c, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47,
|
0x70, 0x65, 0x73, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
|
||||||
0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30,
|
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1c, 0x2e, 0x64, 0x61,
|
||||||
0x01, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
|
0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65,
|
||||||
0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75,
|
0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x32, 0x5a, 0x30, 0x67,
|
||||||
0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62,
|
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69,
|
||||||
0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
0x75, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f,
|
||||||
|
0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62,
|
||||||
|
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -7,7 +7,9 @@ import "google/protobuf/any.proto";
|
||||||
import "google/protobuf/empty.proto";
|
import "google/protobuf/empty.proto";
|
||||||
import "google/protobuf/timestamp.proto";
|
import "google/protobuf/timestamp.proto";
|
||||||
|
|
||||||
message ServerVersion { string version = 1; }
|
message ServerVersion {
|
||||||
|
string version = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message Record {
|
message Record {
|
||||||
string version = 1;
|
string version = 1;
|
||||||
|
@ -28,7 +30,9 @@ message GetRequest {
|
||||||
string type = 1;
|
string type = 1;
|
||||||
string id = 2;
|
string id = 2;
|
||||||
}
|
}
|
||||||
message GetResponse { Record record = 1; }
|
message GetResponse {
|
||||||
|
Record record = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message GetAllRequest {
|
message GetAllRequest {
|
||||||
string type = 1;
|
string type = 1;
|
||||||
|
@ -66,13 +70,16 @@ message SyncRequest {
|
||||||
string server_version = 1;
|
string server_version = 1;
|
||||||
string record_version = 2;
|
string record_version = 2;
|
||||||
string type = 3;
|
string type = 3;
|
||||||
|
bool no_wait = 4;
|
||||||
}
|
}
|
||||||
message SyncResponse {
|
message SyncResponse {
|
||||||
string server_version = 1;
|
string server_version = 1;
|
||||||
repeated Record records = 2;
|
repeated Record records = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message GetTypesResponse { repeated string types = 1; }
|
message GetTypesResponse {
|
||||||
|
repeated string types = 1;
|
||||||
|
}
|
||||||
|
|
||||||
service DataBrokerService {
|
service DataBrokerService {
|
||||||
rpc Delete(DeleteRequest) returns (google.protobuf.Empty);
|
rpc Delete(DeleteRequest) returns (google.protobuf.Empty);
|
||||||
|
|
|
@ -3,16 +3,11 @@ package databroker
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
|
||||||
status "google.golang.org/grpc/status"
|
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
|
||||||
"google.golang.org/protobuf/types/known/wrapperspb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestApplyOffsetAndLimit(t *testing.T) {
|
func TestApplyOffsetAndLimit(t *testing.T) {
|
||||||
|
@ -54,103 +49,61 @@ func TestApplyOffsetAndLimit(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInitialSync(t *testing.T) {
|
||||||
|
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
|
||||||
|
defer clearTimeout()
|
||||||
|
|
||||||
|
li, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if !assert.NoError(t, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer li.Close()
|
||||||
|
|
||||||
|
r1 := new(Record)
|
||||||
|
r2 := new(Record)
|
||||||
|
r3 := new(Record)
|
||||||
|
|
||||||
|
m := &mockServer{
|
||||||
|
sync: func(req *SyncRequest, stream DataBrokerService_SyncServer) error {
|
||||||
|
assert.Equal(t, true, req.GetNoWait())
|
||||||
|
stream.Send(&SyncResponse{
|
||||||
|
ServerVersion: "a",
|
||||||
|
Records: []*Record{r1, r2},
|
||||||
|
})
|
||||||
|
stream.Send(&SyncResponse{
|
||||||
|
ServerVersion: "b",
|
||||||
|
Records: []*Record{r3},
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := grpc.NewServer()
|
||||||
|
RegisterDataBrokerServiceServer(srv, m)
|
||||||
|
go srv.Serve(li)
|
||||||
|
|
||||||
|
cc, err := grpc.Dial(li.Addr().String(), grpc.WithInsecure())
|
||||||
|
if !assert.NoError(t, err) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
c := NewDataBrokerServiceClient(cc)
|
||||||
|
|
||||||
|
res, err := InitialSync(ctx, c, &SyncRequest{
|
||||||
|
Type: "TEST",
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, "b", res.GetServerVersion())
|
||||||
|
assert.Equal(t, []*Record{r1, r2, r3}, res.GetRecords())
|
||||||
|
}
|
||||||
|
|
||||||
type mockServer struct {
|
type mockServer struct {
|
||||||
DataBrokerServiceServer
|
DataBrokerServiceServer
|
||||||
|
|
||||||
getAll func(context.Context, *GetAllRequest) (*GetAllResponse, error)
|
sync func(*SyncRequest, DataBrokerService_SyncServer) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockServer) GetAll(ctx context.Context, req *GetAllRequest) (*GetAllResponse, error) {
|
func (m *mockServer) Sync(req *SyncRequest, stream DataBrokerService_SyncServer) error {
|
||||||
return m.getAll(ctx, req)
|
return m.sync(req, stream)
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetAllPages(t *testing.T) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
t.Run("resource exhausted", func(t *testing.T) {
|
|
||||||
li, err := net.Listen("tcp", "127.0.0.1:0")
|
|
||||||
if !assert.NoError(t, err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer li.Close()
|
|
||||||
|
|
||||||
m := &mockServer{
|
|
||||||
getAll: func(ctx context.Context, req *GetAllRequest) (*GetAllResponse, error) {
|
|
||||||
any, _ := anypb.New(wrapperspb.String("TEST"))
|
|
||||||
var records []*Record
|
|
||||||
for i := 0; i < 1000000; i++ {
|
|
||||||
records = append(records, &Record{
|
|
||||||
Type: req.GetType(),
|
|
||||||
Data: any,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return &GetAllResponse{Records: records}, nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
srv := grpc.NewServer()
|
|
||||||
RegisterDataBrokerServiceServer(srv, m)
|
|
||||||
go srv.Serve(li)
|
|
||||||
|
|
||||||
cc, err := grpc.Dial(li.Addr().String(), grpc.WithInsecure())
|
|
||||||
if !assert.NoError(t, err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer cc.Close()
|
|
||||||
|
|
||||||
c := NewDataBrokerServiceClient(cc)
|
|
||||||
|
|
||||||
res, err := GetAllPages(ctx, c, &GetAllRequest{
|
|
||||||
Type: "TEST",
|
|
||||||
})
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Equal(t, codes.ResourceExhausted, status.Code(err))
|
|
||||||
assert.Nil(t, res)
|
|
||||||
})
|
|
||||||
t.Run("with paging", func(t *testing.T) {
|
|
||||||
li, err := net.Listen("tcp", "127.0.0.1:0")
|
|
||||||
if !assert.NoError(t, err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer li.Close()
|
|
||||||
|
|
||||||
m := &mockServer{
|
|
||||||
getAll: func(ctx context.Context, req *GetAllRequest) (*GetAllResponse, error) {
|
|
||||||
pageToken, _ := strconv.Atoi(req.GetPageToken())
|
|
||||||
|
|
||||||
any, _ := anypb.New(wrapperspb.String("TEST"))
|
|
||||||
var records []*Record
|
|
||||||
for i := pageToken; i < pageToken+10000 && i < 1000000; i++ {
|
|
||||||
records = append(records, &Record{
|
|
||||||
Type: req.GetType(),
|
|
||||||
Data: any,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if len(records) == 0 {
|
|
||||||
return &GetAllResponse{}, nil
|
|
||||||
}
|
|
||||||
return &GetAllResponse{Records: records, NextPageToken: strconv.Itoa(pageToken + 10000)}, nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
srv := grpc.NewServer()
|
|
||||||
RegisterDataBrokerServiceServer(srv, m)
|
|
||||||
go srv.Serve(li)
|
|
||||||
|
|
||||||
cc, err := grpc.Dial(li.Addr().String(), grpc.WithInsecure())
|
|
||||||
if !assert.NoError(t, err) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer cc.Close()
|
|
||||||
|
|
||||||
c := NewDataBrokerServiceClient(cc)
|
|
||||||
|
|
||||||
res, err := GetAllPages(ctx, c, &GetAllRequest{
|
|
||||||
Type: "TEST",
|
|
||||||
})
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.NotEqual(t, codes.ResourceExhausted, status.Code(err))
|
|
||||||
assert.Len(t, res.GetRecords(), 1000000)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue