// Package databroker is a pomerium service that handles the storage of user // session state. It communicates over RPC with other pomerium services, // and can be configured to use a number of different backend databroker stores. package databroker import ( "context" "fmt" "net" "sync" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/directory" "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/identity" "github.com/pomerium/pomerium/internal/identity/manager" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/version" "github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/envoy/files" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/registry" "github.com/pomerium/pomerium/pkg/grpcutil" ) // DataBroker represents the databroker service. The databroker service is a simple interface // for storing keyed blobs (bytes) of unstructured data. type DataBroker struct { dataBrokerServer *dataBrokerServer manager *manager.Manager eventsMgr *events.Manager localListener net.Listener localGRPCServer *grpc.Server localGRPCConnection *grpc.ClientConn dataBrokerStorageType string // TODO remove in v0.11 deprecatedCacheClusterDomain string // TODO: remove in v0.11 mu sync.Mutex directoryProvider directory.Provider } // New creates a new databroker service. func New(cfg *config.Config, eventsMgr *events.Manager) (*DataBroker, error) { localListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, err } sharedKey, _ := cfg.Options.GetSharedKey() ui, si := grpcutil.AttachMetadataInterceptors( metadata.Pairs( grpcutil.MetadataKeyEnvoyVersion, files.FullVersion(), grpcutil.MetadataKeyPomeriumVersion, version.FullVersion(), ), ) // No metrics handler because we have one in the control plane. Add one // if we no longer register with that grpc Server localGRPCServer := grpc.NewServer( grpc.StreamInterceptor(si), grpc.UnaryInterceptor(ui), ) clientStatsHandler := telemetry.NewGRPCClientStatsHandler(cfg.Options.Services) clientDialOptions := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithChainUnaryInterceptor(clientStatsHandler.UnaryInterceptor, grpcutil.WithUnarySignedJWT(sharedKey)), grpc.WithChainStreamInterceptor(grpcutil.WithStreamSignedJWT(sharedKey)), grpc.WithStatsHandler(clientStatsHandler.Handler), } ctx := log.WithContext(context.Background(), func(c zerolog.Context) zerolog.Context { return c.Str("service", "databroker").Str("config_source", "bootstrap") }) localGRPCConnection, err := grpc.DialContext( ctx, localListener.Addr().String(), clientDialOptions..., ) if err != nil { return nil, err } dataBrokerServer := newDataBrokerServer(cfg) dataBrokerURLs, err := cfg.Options.GetInternalDataBrokerURLs() if err != nil { return nil, err } c := &DataBroker{ dataBrokerServer: dataBrokerServer, localListener: localListener, localGRPCServer: localGRPCServer, localGRPCConnection: localGRPCConnection, deprecatedCacheClusterDomain: dataBrokerURLs[0].Hostname(), dataBrokerStorageType: cfg.Options.DataBrokerStorageType, eventsMgr: eventsMgr, } c.Register(c.localGRPCServer) err = c.update(ctx, cfg) if err != nil { return nil, err } return c, nil } // OnConfigChange is called whenever configuration is changed. func (c *DataBroker) OnConfigChange(ctx context.Context, cfg *config.Config) { err := c.update(ctx, cfg) if err != nil { log.Error(ctx).Err(err).Msg("databroker: error updating configuration") } c.dataBrokerServer.OnConfigChange(ctx, cfg) } // Register registers all the gRPC services with the given server. func (c *DataBroker) Register(grpcServer *grpc.Server) { databroker.RegisterDataBrokerServiceServer(grpcServer, c.dataBrokerServer) directory.RegisterDirectoryServiceServer(grpcServer, c) registry.RegisterRegistryServer(grpcServer, c.dataBrokerServer) } // Run runs the databroker components. func (c *DataBroker) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { return c.localGRPCServer.Serve(c.localListener) }) eg.Go(func() error { <-ctx.Done() c.localGRPCServer.Stop() return nil }) eg.Go(func() error { return c.manager.Run(ctx) }) return eg.Wait() } func (c *DataBroker) update(ctx context.Context, cfg *config.Config) error { if err := validate(cfg.Options); err != nil { return fmt.Errorf("databroker: bad option: %w", err) } oauthOptions, err := cfg.Options.GetOauthOptions() if err != nil { return fmt.Errorf("databroker: invalid oauth options: %w", err) } directoryProvider := directory.GetProvider(directory.Options{ ServiceAccount: cfg.Options.ServiceAccount, Provider: cfg.Options.Provider, ProviderURL: cfg.Options.ProviderURL, QPS: cfg.Options.GetQPS(), ClientID: cfg.Options.ClientID, ClientSecret: cfg.Options.ClientSecret, }) c.mu.Lock() c.directoryProvider = directoryProvider c.mu.Unlock() dataBrokerClient := databroker.NewDataBrokerServiceClient(c.localGRPCConnection) options := []manager.Option{ manager.WithDirectoryProvider(directoryProvider), manager.WithDataBrokerClient(dataBrokerClient), manager.WithGroupRefreshInterval(cfg.Options.RefreshDirectoryInterval), manager.WithGroupRefreshTimeout(cfg.Options.RefreshDirectoryTimeout), manager.WithEventManager(c.eventsMgr), } authenticator, err := identity.NewAuthenticator(oauthOptions) if err != nil { log.Error(ctx).Err(err).Msg("databroker: failed to create authenticator") } else { options = append(options, manager.WithAuthenticator(authenticator)) } if c.manager == nil { c.manager = manager.New(options...) } else { c.manager.UpdateConfig(options...) } return nil } // validate checks that proper configuration settings are set to create // a databroker instance func validate(o *config.Options) error { sharedKey, err := o.GetSharedKey() if err != nil { return fmt.Errorf("invalid 'SHARED_SECRET': %w", err) } if _, err := cryptutil.NewAEADCipher(sharedKey); err != nil { return fmt.Errorf("invalid 'SHARED_SECRET': %w", err) } return nil }