restructure

This commit is contained in:
Denis Mishin 2023-08-08 14:47:32 -04:00
parent 40efa26257
commit c7fd5a4752
7 changed files with 86 additions and 122 deletions

View file

@ -1,3 +1,4 @@
// Package bootstrap fetches the very initial configuration for Pomerium Core to start.
package bootstrap package bootstrap
/* /*
@ -12,14 +13,11 @@ package bootstrap
import ( import (
"context" "context"
"crypto/cipher"
"fmt" "fmt"
"time" "time"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
cluster_api "github.com/pomerium/zero-sdk/cluster" cluster_api "github.com/pomerium/zero-sdk/cluster"
connect_mux "github.com/pomerium/zero-sdk/connect-mux" connect_mux "github.com/pomerium/zero-sdk/connect-mux"
@ -35,7 +33,16 @@ const (
) )
// Run initializes the bootstrap config source // Run initializes the bootstrap config source
func (svc *BootstrapConfigSource) Run(ctx context.Context) error { func (svc *Source) Run(
ctx context.Context,
clusterAPI cluster_api.ClientWithResponsesInterface,
mux *connect_mux.Mux,
fileCachePath string,
) error {
svc.clusterAPI = clusterAPI
svc.connectMux = mux
svc.fileCachePath = fileCachePath
svc.tryLoadInitial(ctx) svc.tryLoadInitial(ctx)
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
@ -45,7 +52,7 @@ func (svc *BootstrapConfigSource) Run(ctx context.Context) error {
return eg.Wait() return eg.Wait()
} }
func (svc *BootstrapConfigSource) watchUpdates(ctx context.Context) error { func (svc *Source) watchUpdates(ctx context.Context) error {
return svc.connectMux.Watch(ctx, return svc.connectMux.Watch(ctx,
connect_mux.WithOnConnected(func(_ context.Context) { connect_mux.WithOnConnected(func(_ context.Context) {
svc.triggerUpdate(DefaultCheckForUpdateIntervalWhenConnected) svc.triggerUpdate(DefaultCheckForUpdateIntervalWhenConnected)
@ -59,7 +66,7 @@ func (svc *BootstrapConfigSource) watchUpdates(ctx context.Context) error {
) )
} }
func (svc *BootstrapConfigSource) updateLoop(ctx context.Context) error { func (svc *Source) updateLoop(ctx context.Context) error {
ticker := time.NewTicker(svc.updateInterval.Load()) ticker := time.NewTicker(svc.updateInterval.Load())
defer ticker.Stop() defer ticker.Stop()
@ -72,7 +79,7 @@ func (svc *BootstrapConfigSource) updateLoop(ctx context.Context) error {
} }
ticker.Reset(svc.updateInterval.Load()) ticker.Reset(svc.updateInterval.Load())
err := svc.update(ctx) err := svc.tryUpdateAndSave(ctx)
if err != nil { if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to update bootstrap config") log.Ctx(ctx).Error().Err(err).Msg("failed to update bootstrap config")
} }
@ -81,7 +88,7 @@ func (svc *BootstrapConfigSource) updateLoop(ctx context.Context) error {
// triggerUpdate triggers an update of the bootstrap config // triggerUpdate triggers an update of the bootstrap config
// and sets the interval for the next update // and sets the interval for the next update
func (svc *BootstrapConfigSource) triggerUpdate(newUpdateInterval time.Duration) { func (svc *Source) triggerUpdate(newUpdateInterval time.Duration) {
svc.updateInterval.Store(newUpdateInterval) svc.updateInterval.Store(newUpdateInterval)
select { select {
@ -90,62 +97,37 @@ func (svc *BootstrapConfigSource) triggerUpdate(newUpdateInterval time.Duration)
} }
} }
func (svc *BootstrapConfigSource) update(ctx context.Context) error { func (svc *Source) tryUpdateAndSave(ctx context.Context) error {
current := svc.GetConfig() cfg, err := LoadBootstrapConfigFromAPI(ctx, svc.clusterAPI)
cfg := current.Clone()
err := tryUpdateAndSave(ctx, cfg.Options, svc.clusterAPI, svc.fileCachePath, svc.fileCipher)
if err != nil {
return err
}
_ = svc.SetConfig(ctx, cfg)
return nil
}
func tryUpdateAndSave(
ctx context.Context,
dst *config.Options,
clusterAPI cluster_api.ClientWithResponsesInterface,
fileCachePath string,
fileCipher cipher.AEAD,
) error {
err := LoadBootstrapConfigFromAPI(ctx, dst, clusterAPI)
if err != nil { if err != nil {
return fmt.Errorf("load bootstrap config from API: %w", err) return fmt.Errorf("load bootstrap config from API: %w", err)
} }
err = SaveBootstrapConfigToFile(dst, fileCachePath, fileCipher) err = SaveBootstrapConfigToFile(cfg, svc.fileCachePath, svc.fileCipher)
if err != nil { if err != nil {
log.Ctx(ctx).Error().Err(err). log.Ctx(ctx).Error().Err(err).
Msg("failed to save bootstrap config to file, note it may prevent Pomerium from starting up in case of connectivity issues") Msg("failed to save bootstrap config to file, note it may prevent Pomerium from starting up in case of connectivity issues")
} }
svc.UpdateBootstrap(ctx, *cfg)
return nil return nil
} }
func (src *BootstrapConfigSource) tryLoadInitial(ctx context.Context) { func (svc *Source) tryLoadInitial(ctx context.Context) {
dst := src.GetConfig() err := svc.tryUpdateAndSave(ctx)
err := tryUpdateAndSave(ctx, dst.Options, src.clusterAPI, src.fileCachePath, src.fileCipher)
if err != nil { if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to load bootstrap config") log.Ctx(ctx).Error().Err(err).Msg("failed to load bootstrap config")
src.tryLoadFromFile(ctx) svc.tryLoadFromFile(ctx)
return return
} }
src.SetConfig(ctx, dst)
} }
func (src *BootstrapConfigSource) tryLoadFromFile(ctx context.Context) { func (svc *Source) tryLoadFromFile(ctx context.Context) {
dst := src.GetConfig() cfg, err := LoadBootstrapConfigFromFile(svc.fileCachePath, svc.fileCipher)
err := LoadBootstrapConfigFromFile(dst.Options, src.fileCachePath, src.fileCipher)
if err != nil { if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to load bootstrap config from file") log.Ctx(ctx).Error().Err(err).Msg("failed to load bootstrap config from file")
return return
} }
src.SetConfig(ctx, dst) svc.UpdateBootstrap(ctx, *cfg)
} }

View file

@ -4,30 +4,21 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/pomerium/pomerium/config"
cluster_api "github.com/pomerium/zero-sdk/cluster" cluster_api "github.com/pomerium/zero-sdk/cluster"
) )
// LoadBootstrapConfigFromAPI loads the bootstrap configuration from the cluster API. // LoadBootstrapConfigFromAPI loads the bootstrap configuration from the cluster API.
func LoadBootstrapConfigFromAPI( func LoadBootstrapConfigFromAPI(
ctx context.Context, ctx context.Context,
dst *config.Options,
client cluster_api.ClientWithResponsesInterface, client cluster_api.ClientWithResponsesInterface,
) error { ) (*cluster_api.BootstrapConfig, error) {
resp, err := client.GetClusterBootstrapConfigWithResponse(ctx) resp, err := client.GetClusterBootstrapConfigWithResponse(ctx)
if err != nil { if err != nil {
return fmt.Errorf("get: %w", err) return nil, fmt.Errorf("get: %w", err)
} }
if resp.JSON200 == nil { if resp.JSON200 == nil {
return fmt.Errorf("unexpected response: %d/%v", resp.StatusCode(), resp.Status()) return nil, fmt.Errorf("unexpected response: %d/%v", resp.StatusCode(), resp.Status())
} }
v := cluster_api.BootstrapConfig(*resp.JSON200) return resp.JSON200, nil
if v.DatabrokerStorageConnection != nil {
dst.DataBrokerStorageType = "postgres"
dst.DataBrokerStorageConnectionString = *v.DatabrokerStorageConnection
}
return nil
} }

View file

@ -14,33 +14,33 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/cryptutil"
cluster_api "github.com/pomerium/zero-sdk/cluster"
) )
func LoadBootstrapConfigFromFile(dst *config.Options, fp string, cipher cipher.AEAD) error { // LoadBootstrapConfigFromFile loads the bootstrap configuration from a file.
func LoadBootstrapConfigFromFile(fp string, cipher cipher.AEAD) (*cluster_api.BootstrapConfig, error) {
ciphertext, err := os.ReadFile(fp) ciphertext, err := os.ReadFile(fp)
if err != nil { if err != nil {
return fmt.Errorf("read bootstrap config: %w", err) return nil, fmt.Errorf("read bootstrap config: %w", err)
} }
plaintext, err := cryptutil.Decrypt(cipher, ciphertext, nil) plaintext, err := cryptutil.Decrypt(cipher, ciphertext, nil)
if err != nil { if err != nil {
return fmt.Errorf("decrypt bootstrap config: %w", err) return nil, fmt.Errorf("decrypt bootstrap config: %w", err)
} }
fc := fileConfig{} var dst cluster_api.BootstrapConfig
err = json.Unmarshal(plaintext, &fc) err = json.Unmarshal(plaintext, &dst)
if err != nil { if err != nil {
return fmt.Errorf("unmarshal bootstrap config: %w", err) return nil, fmt.Errorf("unmarshal bootstrap config: %w", err)
} }
applyFileConfig(dst, fc) return &dst, nil
return nil
} }
func SaveBootstrapConfigToFile(src *config.Options, fp string, cipher cipher.AEAD) error { // SaveBootstrapConfigToFile saves the bootstrap configuration to a file.
plaintext, err := json.Marshal(getFileConfig(src)) func SaveBootstrapConfigToFile(src *cluster_api.BootstrapConfig, fp string, cipher cipher.AEAD) error {
plaintext, err := json.Marshal(src)
if err != nil { if err != nil {
return fmt.Errorf("marshal file config: %w", err) return fmt.Errorf("marshal file config: %w", err)
} }
@ -52,21 +52,3 @@ func SaveBootstrapConfigToFile(src *config.Options, fp string, cipher cipher.AEA
} }
return nil return nil
} }
type fileConfig struct {
PostgresDSN *string `json:"postgres_dsn"`
}
func getFileConfig(src *config.Options) fileConfig {
fc := fileConfig{}
if src.DataBrokerStorageConnectionString != "" {
fc.PostgresDSN = &src.DataBrokerStorageConnectionString
}
return fc
}
func applyFileConfig(dst *config.Options, fc fileConfig) {
if fc.PostgresDSN != nil {
dst.DataBrokerStorageConnectionString = *fc.PostgresDSN
}
}

View file

@ -1,29 +1,33 @@
package bootstrap_test package bootstrap_test
import ( import (
"os"
"testing" "testing"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/pomerium/pomerium/pkg/cryptutil"
cluster_api "github.com/pomerium/zero-sdk/cluster"
) )
func TestFile(t *testing.T) { func TestFile(t *testing.T) {
secret := []byte("secret") cipher, err := cryptutil.NewAEADCipher(cryptutil.NewKey())
src, err := bootstrap.New(secret)
require.NoError(t, err)
cfg := src.GetConfig()
require.NotNil(t, cfg)
// test that the config is valid
require.NoError(t, cfg.Options.Validate())
// test that the config is deterministic
src2, err := bootstrap.New(secret)
require.NoError(t, err) require.NoError(t, err)
cfg2 := src2.GetConfig() txt := "test"
require.NotNil(t, cfg2) src := cluster_api.BootstrapConfig{
DatabrokerStorageConnection: &txt,
}
require.Equal(t, cfg.Options, cfg2.Options) fd, err := os.CreateTemp(t.TempDir(), "test.data")
require.NoError(t, err)
require.NoError(t, fd.Close())
require.NoError(t, bootstrap.SaveBootstrapConfigToFile(&src, fd.Name(), cipher))
dst, err := bootstrap.LoadBootstrapConfigFromFile(fd.Name(), cipher)
require.NoError(t, err)
require.Equal(t, src, *dst)
} }

View file

@ -20,8 +20,8 @@ import (
connect_mux "github.com/pomerium/zero-sdk/connect-mux" connect_mux "github.com/pomerium/zero-sdk/connect-mux"
) )
// BootstrapConfigSource is a base config layer for Pomerium // Source is a base config layer for Pomerium
type BootstrapConfigSource struct { type Source struct {
source source
clusterAPI cluster_api.ClientWithResponsesInterface clusterAPI cluster_api.ClientWithResponsesInterface
@ -34,12 +34,8 @@ type BootstrapConfigSource struct {
updateInterval atomicutil.Value[time.Duration] updateInterval atomicutil.Value[time.Duration]
} }
func New( // New creates a new bootstrap config source
clusterAPI cluster_api.ClientWithResponsesInterface, func New(secret []byte) (*Source, error) {
mux *connect_mux.Mux,
secret []byte,
fileCachePath string,
) (*BootstrapConfigSource, error) {
cfg := new(config.Config) cfg := new(config.Config)
err := setConfigDefaults(cfg) err := setConfigDefaults(cfg)
@ -59,13 +55,10 @@ func New(
return nil, fmt.Errorf("init secrets: %w", err) return nil, fmt.Errorf("init secrets: %w", err)
} }
svc := &BootstrapConfigSource{ svc := &Source{
source: source{ready: make(chan struct{})}, source: source{ready: make(chan struct{})},
fileCipher: cipher, fileCipher: cipher,
checkForUpdate: make(chan struct{}, 1), checkForUpdate: make(chan struct{}, 1),
fileCachePath: fileCachePath,
clusterAPI: clusterAPI,
connectMux: mux,
} }
svc.cfg.Store(cfg) svc.cfg.Store(cfg)
svc.updateInterval.Store(DefaultCheckForUpdateInterval) svc.updateInterval.Store(DefaultCheckForUpdateInterval)

View file

@ -3,8 +3,9 @@ package bootstrap_test
import ( import (
"testing" "testing"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/pomerium/pomerium/internal/zero/bootstrap"
) )
func TestConfigDeterministic(t *testing.T) { func TestConfigDeterministic(t *testing.T) {

View file

@ -9,6 +9,7 @@ import (
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/atomicutil" "github.com/pomerium/pomerium/internal/atomicutil"
cluster_api "github.com/pomerium/zero-sdk/cluster"
) )
var ( var (
@ -43,7 +44,7 @@ func (src *source) WaitReady(ctx context.Context) error {
// GetConfig implements config.Source // GetConfig implements config.Source
func (src *source) GetConfig() *config.Config { func (src *source) GetConfig() *config.Config {
return src.cfg.Load().Clone() return src.cfg.Load()
} }
// OnConfigChange implements config.Source // OnConfigChange implements config.Source
@ -53,16 +54,19 @@ func (src *source) OnConfigChange(_ context.Context, l config.ChangeListener) {
src.listenerLock.Unlock() src.listenerLock.Unlock()
} }
// setConfig updates the underlying configuration // UpdateBootstrap updates the underlying configuration options
// its only called by the updater func (src *source) UpdateBootstrap(ctx context.Context, cfg cluster_api.BootstrapConfig) bool {
func (src *source) SetConfig(ctx context.Context, cfg *config.Config) bool {
current := src.cfg.Load() current := src.cfg.Load()
if cmp.Equal(cfg.Options, current.Options, cmpOpts...) { incoming := current.Clone()
applyBootstrapConfig(incoming.Options, &cfg)
if cmp.Equal(incoming.Options, current.Options, cmpOpts...) {
return false return false
} }
src.cfg.Store(cfg) src.cfg.Store(incoming)
src.notifyListeners(ctx, cfg)
src.notifyListeners(ctx, incoming)
return true return true
} }
@ -80,3 +84,10 @@ func (src *source) notifyListeners(ctx context.Context, cfg *config.Config) {
l(ctx, cfg) l(ctx, cfg)
} }
} }
func applyBootstrapConfig(dst *config.Options, src *cluster_api.BootstrapConfig) {
if src.DatabrokerStorageConnection != nil {
dst.DataBrokerStorageType = "postgres"
dst.DataBrokerStorageConnectionString = *src.DatabrokerStorageConnection
}
}