From 1640151bc14b45d45d9607c0eee25fd3617be45d Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Thu, 23 Jul 2020 10:42:43 +0700 Subject: [PATCH] databroker server backend config (#1127) * config,docs: add databroker storage backend configuration * cache: allow configuring which backend storage to use Currently supported types are "memory", "redis". --- cache/cache.go | 2 +- cache/databroker.go | 9 +++++++-- config/options.go | 17 +++++++++++++++++ config/options_test.go | 8 ++++++++ docs/configuration/readme.md | 21 +++++++++++++++++++++ internal/databroker/config.go | 23 +++++++++++++++++++++-- internal/databroker/server.go | 19 ++++++++++++++++++- pkg/storage/inmemory/inmemory.go | 3 +++ pkg/storage/redis/redis.go | 3 +++ 9 files changed, 99 insertions(+), 6 deletions(-) diff --git a/cache/cache.go b/cache/cache.go index 17a8d689e..355e5b548 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -71,7 +71,7 @@ func New(opts config.Options) (*Cache, error) { return nil, err } - dataBrokerServer := NewDataBrokerServer(localGRPCServer) + dataBrokerServer := NewDataBrokerServer(localGRPCServer, opts) dataBrokerClient := databroker.NewDataBrokerServiceClient(localGRPCConnection) sessionServer := NewSessionServer(localGRPCServer, dataBrokerClient) sessionClient := session.NewSessionServiceClient(localGRPCConnection) diff --git a/cache/databroker.go b/cache/databroker.go index 100cedca9..3dceb385b 100644 --- a/cache/databroker.go +++ b/cache/databroker.go @@ -3,6 +3,7 @@ package cache import ( "google.golang.org/grpc" + "github.com/pomerium/pomerium/config" internal_databroker "github.com/pomerium/pomerium/internal/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) @@ -13,8 +14,12 @@ type DataBrokerServer struct { } // NewDataBrokerServer creates a new databroker service server. -func NewDataBrokerServer(grpcServer *grpc.Server) *DataBrokerServer { - srv := &DataBrokerServer{DataBrokerServiceServer: internal_databroker.New()} +func NewDataBrokerServer(grpcServer *grpc.Server, opts config.Options) *DataBrokerServer { + internalSrv := internal_databroker.New( + internal_databroker.WithStorageType(opts.DataBrokerStorageType), + internal_databroker.WithStorageConnectionString(opts.DataBrokerStorageConnectionString), + ) + srv := &DataBrokerServer{DataBrokerServiceServer: internalSrv} databroker.RegisterDataBrokerServiceServer(grpcServer, srv) return srv } diff --git a/config/options.go b/config/options.go index 9eea207ad..f39019125 100644 --- a/config/options.go +++ b/config/options.go @@ -25,6 +25,8 @@ import ( "github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/urlutil" "github.com/pomerium/pomerium/pkg/cryptutil" + "github.com/pomerium/pomerium/pkg/storage/inmemory" + "github.com/pomerium/pomerium/pkg/storage/redis" ) // DisableHeaderKey is the key used to check whether to disable setting header @@ -225,6 +227,11 @@ type Options struct { // DataBrokerURL is the routable destination of the databroker service's gRPC endpiont. DataBrokerURLString string `mapstructure:"databroker_service_url" yaml:"databroker_service_url,omitempty"` DataBrokerURL *url.URL `yaml:",omitempty"` + // DataBrokerStorageType is the storage backend type that databroker will use. + // Supported type: memory, redis + DataBrokerStorageType string `mapstructure:"databroker_storage_type" yaml:"databroker_storage_type,omitempty"` + // DataBrokerStorageConnectionString is the data source name for storage backend. + DataBrokerStorageConnectionString string `mapstructure:"databroker_storage_connection_string" yaml:"databroker_storage_connection_string,omitempty"` // ClientCA is the base64-encoded certificate authority to validate client mTLS certificates against. ClientCA string `mapstructure:"client_ca" yaml:"client_ca,omitempty"` @@ -279,6 +286,7 @@ var defaultOptions = Options{ AutocertOptions: AutocertOptions{ Folder: dataDir(), }, + DataBrokerStorageType: "memory", } // NewDefaultOptions returns a copy the default options. It's the caller's @@ -482,6 +490,15 @@ func (o *Options) Validate() error { log.Warn().Msg("config: cache url will be deprecated in v0.11.0") o.DataBrokerURLString = o.CacheURLString } + switch o.DataBrokerStorageType { + case inmemory.Name: + case redis.Name: + if o.DataBrokerStorageConnectionString == "" { + return errors.New("config: missing databroker storage backend dsn") + } + default: + return errors.New("config: unknown databroker storage backend type") + } if IsAuthorize(o.Services) || IsCache(o.Services) { // if authorize is set, we don't really need a http server diff --git a/config/options_test.go b/config/options_test.go index 96589a040..90fb1e017 100644 --- a/config/options_test.go +++ b/config/options_test.go @@ -40,6 +40,10 @@ func Test_Validate(t *testing.T) { badPolicyFile := testOptions() badPolicyFile.PolicyFile = "file" + invalidStorageType := testOptions() + invalidStorageType.DataBrokerStorageType = "foo" + missingStorageDSN := testOptions() + missingStorageDSN.DataBrokerStorageType = "redis" tests := []struct { name string @@ -51,6 +55,8 @@ func Test_Validate(t *testing.T) { {"missing shared secret", badSecret, true}, {"missing shared secret but all service", badSecretAllServices, false}, {"policy file specified", badPolicyFile, true}, + {"invalid databroker storage type", invalidStorageType, true}, + {"missing databroker storage dsn", missingStorageDSN, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -236,6 +242,7 @@ func TestOptionsFromViper(t *testing.T) { }, RefreshDirectoryTimeout: 1 * time.Minute, RefreshDirectoryInterval: 10 * time.Minute, + DataBrokerStorageType: "memory", }, false}, {"good disable header", @@ -252,6 +259,7 @@ func TestOptionsFromViper(t *testing.T) { Headers: map[string]string{}, RefreshDirectoryTimeout: 1 * time.Minute, RefreshDirectoryInterval: 10 * time.Minute, + DataBrokerStorageType: "memory", }, false}, {"bad url", []byte(`{"policy":[{"from": "https://","to":"https://to.example"}]}`), nil, true}, diff --git a/docs/configuration/readme.md b/docs/configuration/readme.md index 39522b85a..44b4afbb2 100644 --- a/docs/configuration/readme.md +++ b/docs/configuration/readme.md @@ -813,6 +813,27 @@ For an example implementation, the in-memory database used by the cache service - [pkg/databroker/memory](https://github.com/pomerium/pomerium/tree/master/pkg/databroker/memory) +### Data Broker Storage Type + +- Environmental Variable: `DATABROKER_STORAGE_TYPE` +- Config File Key: `databroker_storage_type` +- Type: `string` +- Optional +- Example: `redis` +- Default: `memory` + +The backend storage that databroker server will use, available types: `memory`, `redis`. + +### Data Broker Storage Connection String + +- Environmental Variable: `DATABROKER_STORAGE_CONNECTION_STRING` +- Config File Key: `databroker_storage_connection_string` +- Type: `string` +- **Required** when storage type is `redis` +- Example: `":6379"` + +The connection string that server will use to connect to storage backend. + ## Policy - Environmental Variable: `POLICY` diff --git a/internal/databroker/config.go b/internal/databroker/config.go index b68ebf2f8..dbe4814b6 100644 --- a/internal/databroker/config.go +++ b/internal/databroker/config.go @@ -8,17 +8,22 @@ var ( DefaultDeletePermanentlyAfter = time.Hour // DefaultBTreeDegree is the default number of items to store in each node of the BTree. DefaultBTreeDegree = 8 + // DefaultStorageType is the default storage type that Server use + DefaultStorageType = "memory" ) type serverConfig struct { - deletePermanentlyAfter time.Duration - btreeDegree int + deletePermanentlyAfter time.Duration + btreeDegree int + storageType string + storageConnectionString string } func newServerConfig(options ...ServerOption) *serverConfig { cfg := new(serverConfig) WithDeletePermanentlyAfter(DefaultDeletePermanentlyAfter)(cfg) WithBTreeDegree(DefaultBTreeDegree)(cfg) + WithStorageType(DefaultStorageType)(cfg) for _, option := range options { option(cfg) } @@ -43,3 +48,17 @@ func WithDeletePermanentlyAfter(dur time.Duration) ServerOption { cfg.deletePermanentlyAfter = dur } } + +// WithStorageType sets the storage type. +func WithStorageType(typ string) ServerOption { + return func(cfg *serverConfig) { + cfg.storageType = typ + } +} + +// WithStorageConnectionString sets the DSN for storage. +func WithStorageConnectionString(connStr string) ServerOption { + return func(cfg *serverConfig) { + cfg.storageConnectionString = connStr + } +} diff --git a/internal/databroker/server.go b/internal/databroker/server.go index 48be0c054..991679a2e 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -21,6 +21,7 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" "github.com/pomerium/pomerium/pkg/storage/inmemory" + "github.com/pomerium/pomerium/pkg/storage/redis" ) const ( @@ -278,10 +279,26 @@ func (srv *Server) getDB(recordType string) storage.Backend { srv.mu.Lock() db = srv.byType[recordType] if db == nil { - db = inmemory.NewDB(recordType, srv.cfg.btreeDegree) + db = srv.newDB(recordType) srv.byType[recordType] = db } srv.mu.Unlock() } return db } + +func (srv *Server) newDB(recordType string) storage.Backend { + switch srv.cfg.storageType { + case inmemory.Name: + return inmemory.NewDB(recordType, srv.cfg.btreeDegree) + case redis.Name: + db, err := redis.New(srv.cfg.storageConnectionString, recordType, int64(srv.cfg.deletePermanentlyAfter.Seconds())) + if err != nil { + srv.log.Error().Err(err).Msg("failed to create new redis storage") + return nil + } + return db + default: + return nil + } +} diff --git a/pkg/storage/inmemory/inmemory.go b/pkg/storage/inmemory/inmemory.go index 747b3e01c..941624950 100644 --- a/pkg/storage/inmemory/inmemory.go +++ b/pkg/storage/inmemory/inmemory.go @@ -17,6 +17,9 @@ import ( "github.com/pomerium/pomerium/pkg/storage" ) +// Name is the storage type name for inmemory backend. +const Name = "memory" + var _ storage.Backend = (*DB)(nil) type byIDRecord struct { diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index 5391977ca..5f786acd9 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -18,6 +18,9 @@ import ( "github.com/pomerium/pomerium/pkg/storage" ) +// Name is the storage type name for redis backend. +const Name = "redis" + var _ storage.Backend = (*DB)(nil) // DB wraps redis conn to interact with redis server.