databroker server backend config (#1127)

* config,docs: add databroker storage backend configuration

* cache: allow configuring which backend storage to use

Currently supported types are "memory", "redis".
This commit is contained in:
Cuong Manh Le 2020-07-23 10:42:43 +07:00 committed by GitHub
parent c9182f757e
commit 1640151bc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 99 additions and 6 deletions

2
cache/cache.go vendored
View file

@ -71,7 +71,7 @@ func New(opts config.Options) (*Cache, error) {
return nil, err return nil, err
} }
dataBrokerServer := NewDataBrokerServer(localGRPCServer) dataBrokerServer := NewDataBrokerServer(localGRPCServer, opts)
dataBrokerClient := databroker.NewDataBrokerServiceClient(localGRPCConnection) dataBrokerClient := databroker.NewDataBrokerServiceClient(localGRPCConnection)
sessionServer := NewSessionServer(localGRPCServer, dataBrokerClient) sessionServer := NewSessionServer(localGRPCServer, dataBrokerClient)
sessionClient := session.NewSessionServiceClient(localGRPCConnection) sessionClient := session.NewSessionServiceClient(localGRPCConnection)

9
cache/databroker.go vendored
View file

@ -3,6 +3,7 @@ package cache
import ( import (
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/pomerium/pomerium/config"
internal_databroker "github.com/pomerium/pomerium/internal/databroker" internal_databroker "github.com/pomerium/pomerium/internal/databroker"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
) )
@ -13,8 +14,12 @@ type DataBrokerServer struct {
} }
// NewDataBrokerServer creates a new databroker service server. // NewDataBrokerServer creates a new databroker service server.
func NewDataBrokerServer(grpcServer *grpc.Server) *DataBrokerServer { func NewDataBrokerServer(grpcServer *grpc.Server, opts config.Options) *DataBrokerServer {
srv := &DataBrokerServer{DataBrokerServiceServer: internal_databroker.New()} internalSrv := internal_databroker.New(
internal_databroker.WithStorageType(opts.DataBrokerStorageType),
internal_databroker.WithStorageConnectionString(opts.DataBrokerStorageConnectionString),
)
srv := &DataBrokerServer{DataBrokerServiceServer: internalSrv}
databroker.RegisterDataBrokerServiceServer(grpcServer, srv) databroker.RegisterDataBrokerServiceServer(grpcServer, srv)
return srv return srv
} }

View file

@ -25,6 +25,8 @@ import (
"github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/urlutil" "github.com/pomerium/pomerium/internal/urlutil"
"github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/storage/inmemory"
"github.com/pomerium/pomerium/pkg/storage/redis"
) )
// DisableHeaderKey is the key used to check whether to disable setting header // DisableHeaderKey is the key used to check whether to disable setting header
@ -225,6 +227,11 @@ type Options struct {
// DataBrokerURL is the routable destination of the databroker service's gRPC endpiont. // DataBrokerURL is the routable destination of the databroker service's gRPC endpiont.
DataBrokerURLString string `mapstructure:"databroker_service_url" yaml:"databroker_service_url,omitempty"` DataBrokerURLString string `mapstructure:"databroker_service_url" yaml:"databroker_service_url,omitempty"`
DataBrokerURL *url.URL `yaml:",omitempty"` DataBrokerURL *url.URL `yaml:",omitempty"`
// DataBrokerStorageType is the storage backend type that databroker will use.
// Supported type: memory, redis
DataBrokerStorageType string `mapstructure:"databroker_storage_type" yaml:"databroker_storage_type,omitempty"`
// DataBrokerStorageConnectionString is the data source name for storage backend.
DataBrokerStorageConnectionString string `mapstructure:"databroker_storage_connection_string" yaml:"databroker_storage_connection_string,omitempty"`
// ClientCA is the base64-encoded certificate authority to validate client mTLS certificates against. // ClientCA is the base64-encoded certificate authority to validate client mTLS certificates against.
ClientCA string `mapstructure:"client_ca" yaml:"client_ca,omitempty"` ClientCA string `mapstructure:"client_ca" yaml:"client_ca,omitempty"`
@ -279,6 +286,7 @@ var defaultOptions = Options{
AutocertOptions: AutocertOptions{ AutocertOptions: AutocertOptions{
Folder: dataDir(), Folder: dataDir(),
}, },
DataBrokerStorageType: "memory",
} }
// NewDefaultOptions returns a copy the default options. It's the caller's // NewDefaultOptions returns a copy the default options. It's the caller's
@ -482,6 +490,15 @@ func (o *Options) Validate() error {
log.Warn().Msg("config: cache url will be deprecated in v0.11.0") log.Warn().Msg("config: cache url will be deprecated in v0.11.0")
o.DataBrokerURLString = o.CacheURLString o.DataBrokerURLString = o.CacheURLString
} }
switch o.DataBrokerStorageType {
case inmemory.Name:
case redis.Name:
if o.DataBrokerStorageConnectionString == "" {
return errors.New("config: missing databroker storage backend dsn")
}
default:
return errors.New("config: unknown databroker storage backend type")
}
if IsAuthorize(o.Services) || IsCache(o.Services) { if IsAuthorize(o.Services) || IsCache(o.Services) {
// if authorize is set, we don't really need a http server // if authorize is set, we don't really need a http server

View file

@ -40,6 +40,10 @@ func Test_Validate(t *testing.T) {
badPolicyFile := testOptions() badPolicyFile := testOptions()
badPolicyFile.PolicyFile = "file" badPolicyFile.PolicyFile = "file"
invalidStorageType := testOptions()
invalidStorageType.DataBrokerStorageType = "foo"
missingStorageDSN := testOptions()
missingStorageDSN.DataBrokerStorageType = "redis"
tests := []struct { tests := []struct {
name string name string
@ -51,6 +55,8 @@ func Test_Validate(t *testing.T) {
{"missing shared secret", badSecret, true}, {"missing shared secret", badSecret, true},
{"missing shared secret but all service", badSecretAllServices, false}, {"missing shared secret but all service", badSecretAllServices, false},
{"policy file specified", badPolicyFile, true}, {"policy file specified", badPolicyFile, true},
{"invalid databroker storage type", invalidStorageType, true},
{"missing databroker storage dsn", missingStorageDSN, true},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
@ -236,6 +242,7 @@ func TestOptionsFromViper(t *testing.T) {
}, },
RefreshDirectoryTimeout: 1 * time.Minute, RefreshDirectoryTimeout: 1 * time.Minute,
RefreshDirectoryInterval: 10 * time.Minute, RefreshDirectoryInterval: 10 * time.Minute,
DataBrokerStorageType: "memory",
}, },
false}, false},
{"good disable header", {"good disable header",
@ -252,6 +259,7 @@ func TestOptionsFromViper(t *testing.T) {
Headers: map[string]string{}, Headers: map[string]string{},
RefreshDirectoryTimeout: 1 * time.Minute, RefreshDirectoryTimeout: 1 * time.Minute,
RefreshDirectoryInterval: 10 * time.Minute, RefreshDirectoryInterval: 10 * time.Minute,
DataBrokerStorageType: "memory",
}, },
false}, false},
{"bad url", []byte(`{"policy":[{"from": "https://","to":"https://to.example"}]}`), nil, true}, {"bad url", []byte(`{"policy":[{"from": "https://","to":"https://to.example"}]}`), nil, true},

View file

@ -813,6 +813,27 @@ For an example implementation, the in-memory database used by the cache service
- [pkg/databroker/memory](https://github.com/pomerium/pomerium/tree/master/pkg/databroker/memory) - [pkg/databroker/memory](https://github.com/pomerium/pomerium/tree/master/pkg/databroker/memory)
### Data Broker Storage Type
- Environmental Variable: `DATABROKER_STORAGE_TYPE`
- Config File Key: `databroker_storage_type`
- Type: `string`
- Optional
- Example: `redis`
- Default: `memory`
The backend storage that databroker server will use, available types: `memory`, `redis`.
### Data Broker Storage Connection String
- Environmental Variable: `DATABROKER_STORAGE_CONNECTION_STRING`
- Config File Key: `databroker_storage_connection_string`
- Type: `string`
- **Required** when storage type is `redis`
- Example: `":6379"`
The connection string that server will use to connect to storage backend.
## Policy ## Policy
- Environmental Variable: `POLICY` - Environmental Variable: `POLICY`

View file

@ -8,17 +8,22 @@ var (
DefaultDeletePermanentlyAfter = time.Hour DefaultDeletePermanentlyAfter = time.Hour
// DefaultBTreeDegree is the default number of items to store in each node of the BTree. // DefaultBTreeDegree is the default number of items to store in each node of the BTree.
DefaultBTreeDegree = 8 DefaultBTreeDegree = 8
// DefaultStorageType is the default storage type that Server use
DefaultStorageType = "memory"
) )
type serverConfig struct { type serverConfig struct {
deletePermanentlyAfter time.Duration deletePermanentlyAfter time.Duration
btreeDegree int btreeDegree int
storageType string
storageConnectionString string
} }
func newServerConfig(options ...ServerOption) *serverConfig { func newServerConfig(options ...ServerOption) *serverConfig {
cfg := new(serverConfig) cfg := new(serverConfig)
WithDeletePermanentlyAfter(DefaultDeletePermanentlyAfter)(cfg) WithDeletePermanentlyAfter(DefaultDeletePermanentlyAfter)(cfg)
WithBTreeDegree(DefaultBTreeDegree)(cfg) WithBTreeDegree(DefaultBTreeDegree)(cfg)
WithStorageType(DefaultStorageType)(cfg)
for _, option := range options { for _, option := range options {
option(cfg) option(cfg)
} }
@ -43,3 +48,17 @@ func WithDeletePermanentlyAfter(dur time.Duration) ServerOption {
cfg.deletePermanentlyAfter = dur cfg.deletePermanentlyAfter = dur
} }
} }
// WithStorageType sets the storage type.
func WithStorageType(typ string) ServerOption {
return func(cfg *serverConfig) {
cfg.storageType = typ
}
}
// WithStorageConnectionString sets the DSN for storage.
func WithStorageConnectionString(connStr string) ServerOption {
return func(cfg *serverConfig) {
cfg.storageConnectionString = connStr
}
}

View file

@ -21,6 +21,7 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/storage" "github.com/pomerium/pomerium/pkg/storage"
"github.com/pomerium/pomerium/pkg/storage/inmemory" "github.com/pomerium/pomerium/pkg/storage/inmemory"
"github.com/pomerium/pomerium/pkg/storage/redis"
) )
const ( const (
@ -278,10 +279,26 @@ func (srv *Server) getDB(recordType string) storage.Backend {
srv.mu.Lock() srv.mu.Lock()
db = srv.byType[recordType] db = srv.byType[recordType]
if db == nil { if db == nil {
db = inmemory.NewDB(recordType, srv.cfg.btreeDegree) db = srv.newDB(recordType)
srv.byType[recordType] = db srv.byType[recordType] = db
} }
srv.mu.Unlock() srv.mu.Unlock()
} }
return db return db
} }
func (srv *Server) newDB(recordType string) storage.Backend {
switch srv.cfg.storageType {
case inmemory.Name:
return inmemory.NewDB(recordType, srv.cfg.btreeDegree)
case redis.Name:
db, err := redis.New(srv.cfg.storageConnectionString, recordType, int64(srv.cfg.deletePermanentlyAfter.Seconds()))
if err != nil {
srv.log.Error().Err(err).Msg("failed to create new redis storage")
return nil
}
return db
default:
return nil
}
}

View file

@ -17,6 +17,9 @@ import (
"github.com/pomerium/pomerium/pkg/storage" "github.com/pomerium/pomerium/pkg/storage"
) )
// Name is the storage type name for inmemory backend.
const Name = "memory"
var _ storage.Backend = (*DB)(nil) var _ storage.Backend = (*DB)(nil)
type byIDRecord struct { type byIDRecord struct {

View file

@ -18,6 +18,9 @@ import (
"github.com/pomerium/pomerium/pkg/storage" "github.com/pomerium/pomerium/pkg/storage"
) )
// Name is the storage type name for redis backend.
const Name = "redis"
var _ storage.Backend = (*DB)(nil) var _ storage.Backend = (*DB)(nil)
// DB wraps redis conn to interact with redis server. // DB wraps redis conn to interact with redis server.