mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-04 01:09:36 +02:00
add paging support to GetAll (#1601)
* add paging support to GetAll * fix import
This commit is contained in:
parent
8ada0c51dd
commit
a41c37f9e0
9 changed files with 322 additions and 103 deletions
|
@ -17,6 +17,8 @@ var (
|
|||
DefaultBTreeDegree = 8
|
||||
// DefaultStorageType is the default storage type that Server use
|
||||
DefaultStorageType = "memory"
|
||||
// DefaultGetAllPageSize is the default page size for GetAll calls.
|
||||
DefaultGetAllPageSize = 50
|
||||
)
|
||||
|
||||
type serverConfig struct {
|
||||
|
@ -28,6 +30,7 @@ type serverConfig struct {
|
|||
storageCAFile string
|
||||
storageCertSkipVerify bool
|
||||
storageCertificate *tls.Certificate
|
||||
getAllPageSize int
|
||||
}
|
||||
|
||||
func newServerConfig(options ...ServerOption) *serverConfig {
|
||||
|
@ -35,6 +38,7 @@ func newServerConfig(options ...ServerOption) *serverConfig {
|
|||
WithDeletePermanentlyAfter(DefaultDeletePermanentlyAfter)(cfg)
|
||||
WithBTreeDegree(DefaultBTreeDegree)(cfg)
|
||||
WithStorageType(DefaultStorageType)(cfg)
|
||||
WithGetAllPageSize(DefaultGetAllPageSize)(cfg)
|
||||
for _, option := range options {
|
||||
option(cfg)
|
||||
}
|
||||
|
@ -60,6 +64,13 @@ func WithDeletePermanentlyAfter(dur time.Duration) ServerOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithGetAllPageSize sets the page size for GetAll calls.
|
||||
func WithGetAllPageSize(pageSize int) ServerOption {
|
||||
return func(cfg *serverConfig) {
|
||||
cfg.getAllPageSize = pageSize
|
||||
}
|
||||
}
|
||||
|
||||
// WithSharedKey sets the secret in the config.
|
||||
func WithSharedKey(sharedKey string) ServerOption {
|
||||
return func(cfg *serverConfig) {
|
||||
|
|
|
@ -181,7 +181,7 @@ func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databr
|
|||
return &databroker.GetResponse{Record: record}, nil
|
||||
}
|
||||
|
||||
// GetAll gets all the records from the in-memory list.
|
||||
// GetAll gets all the records from the backend.
|
||||
func (srv *Server) GetAll(ctx context.Context, req *databroker.GetAllRequest) (*databroker.GetAllResponse, error) {
|
||||
_, span := trace.StartSpan(ctx, "databroker.grpc.GetAll")
|
||||
defer span.End()
|
||||
|
@ -199,25 +199,40 @@ func (srv *Server) GetAll(ctx context.Context, req *databroker.GetAllRequest) (*
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if len(all) == 0 {
|
||||
return &databroker.GetAllResponse{ServerVersion: version}, nil
|
||||
}
|
||||
// sort by record version
|
||||
sort.Slice(all, func(i, j int) bool {
|
||||
return all[i].Version < all[j].Version
|
||||
})
|
||||
|
||||
var recordVersion string
|
||||
records := make([]*databroker.Record, 0, len(all))
|
||||
for _, record := range all {
|
||||
if record.GetVersion() > recordVersion {
|
||||
recordVersion = record.GetVersion()
|
||||
// skip previous page records
|
||||
if record.GetVersion() <= req.PageToken {
|
||||
continue
|
||||
}
|
||||
|
||||
recordVersion = record.GetVersion()
|
||||
if record.DeletedAt == nil {
|
||||
records = append(records, record)
|
||||
}
|
||||
|
||||
// stop when we've hit the page size
|
||||
if len(records) >= srv.cfg.getAllPageSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
nextPageToken := recordVersion
|
||||
if len(records) < srv.cfg.getAllPageSize {
|
||||
nextPageToken = ""
|
||||
}
|
||||
|
||||
return &databroker.GetAllResponse{
|
||||
ServerVersion: version,
|
||||
RecordVersion: recordVersion,
|
||||
Records: records,
|
||||
NextPageToken: nextPageToken,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package databroker
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
"google.golang.org/protobuf/types/known/wrapperspb"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/signal"
|
||||
|
@ -124,7 +126,9 @@ func TestServer_Get(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestServer_GetAll(t *testing.T) {
|
||||
cfg := newServerConfig()
|
||||
cfg := newServerConfig(
|
||||
WithGetAllPageSize(5),
|
||||
)
|
||||
t.Run("ignore deleted", func(t *testing.T) {
|
||||
srv := newServer(cfg)
|
||||
|
||||
|
@ -148,4 +152,33 @@ func TestServer_GetAll(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
assert.Len(t, res.GetRecords(), 0)
|
||||
})
|
||||
t.Run("paging", func(t *testing.T) {
|
||||
srv := newServer(cfg)
|
||||
|
||||
any, err := anypb.New(wrapperspb.String("TEST"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
srv.Set(context.Background(), &databroker.SetRequest{
|
||||
Type: any.TypeUrl,
|
||||
Id: fmt.Sprint(i),
|
||||
Data: any,
|
||||
})
|
||||
}
|
||||
|
||||
res, err := srv.GetAll(context.Background(), &databroker.GetAllRequest{
|
||||
Type: any.TypeUrl,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res.GetRecords(), 5)
|
||||
assert.Equal(t, res.GetNextPageToken(), "000000000005")
|
||||
|
||||
res, err = srv.GetAll(context.Background(), &databroker.GetAllRequest{
|
||||
Type: any.TypeUrl,
|
||||
PageToken: res.GetNextPageToken(),
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, res.GetRecords(), 2)
|
||||
assert.Equal(t, res.GetNextPageToken(), "")
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue