mirror of
https://github.com/pomerium/pomerium.git
synced 2025-07-20 01:58:10 +02:00
Merge branch 'main' into wasaga/pomerium-disable-validation
This commit is contained in:
commit
e85611b7d0
18 changed files with 1070 additions and 838 deletions
|
@ -27,7 +27,6 @@ type Config struct {
|
||||||
Options *Options
|
Options *Options
|
||||||
AutoCertificates []tls.Certificate
|
AutoCertificates []tls.Certificate
|
||||||
EnvoyVersion string
|
EnvoyVersion string
|
||||||
Version int64
|
|
||||||
|
|
||||||
// DerivedCertificates are TLS certificates derived from the shared secret
|
// DerivedCertificates are TLS certificates derived from the shared secret
|
||||||
DerivedCertificates []tls.Certificate
|
DerivedCertificates []tls.Certificate
|
||||||
|
@ -63,7 +62,6 @@ func (cfg *Config) Clone() *Config {
|
||||||
_ = copy(endpoints, cfg.MetricsScrapeEndpoints)
|
_ = copy(endpoints, cfg.MetricsScrapeEndpoints)
|
||||||
|
|
||||||
return &Config{
|
return &Config{
|
||||||
Version: cfg.Version,
|
|
||||||
Options: newOptions,
|
Options: newOptions,
|
||||||
AutoCertificates: cfg.AutoCertificates,
|
AutoCertificates: cfg.AutoCertificates,
|
||||||
EnvoyVersion: cfg.EnvoyVersion,
|
EnvoyVersion: cfg.EnvoyVersion,
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/events"
|
||||||
"github.com/pomerium/pomerium/internal/fileutil"
|
"github.com/pomerium/pomerium/internal/fileutil"
|
||||||
"github.com/pomerium/pomerium/internal/log"
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||||
|
@ -19,27 +20,27 @@ import (
|
||||||
// A ChangeListener is called when configuration changes.
|
// A ChangeListener is called when configuration changes.
|
||||||
type ChangeListener = func(context.Context, *Config)
|
type ChangeListener = func(context.Context, *Config)
|
||||||
|
|
||||||
|
type changeDispatcherEvent struct {
|
||||||
|
cfg *Config
|
||||||
|
}
|
||||||
|
|
||||||
// A ChangeDispatcher manages listeners on config changes.
|
// A ChangeDispatcher manages listeners on config changes.
|
||||||
type ChangeDispatcher struct {
|
type ChangeDispatcher struct {
|
||||||
sync.Mutex
|
target events.Target[changeDispatcherEvent]
|
||||||
onConfigChangeListeners []ChangeListener
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger triggers a change.
|
// Trigger triggers a change.
|
||||||
func (dispatcher *ChangeDispatcher) Trigger(ctx context.Context, cfg *Config) {
|
func (dispatcher *ChangeDispatcher) Trigger(ctx context.Context, cfg *Config) {
|
||||||
dispatcher.Lock()
|
dispatcher.target.Dispatch(ctx, changeDispatcherEvent{
|
||||||
defer dispatcher.Unlock()
|
cfg: cfg,
|
||||||
|
})
|
||||||
for _, li := range dispatcher.onConfigChangeListeners {
|
|
||||||
li(ctx, cfg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnConfigChange adds a listener.
|
// OnConfigChange adds a listener.
|
||||||
func (dispatcher *ChangeDispatcher) OnConfigChange(_ context.Context, li ChangeListener) {
|
func (dispatcher *ChangeDispatcher) OnConfigChange(_ context.Context, li ChangeListener) {
|
||||||
dispatcher.Lock()
|
dispatcher.target.AddListener(func(ctx context.Context, evt changeDispatcherEvent) {
|
||||||
defer dispatcher.Unlock()
|
li(ctx, evt.cfg)
|
||||||
dispatcher.onConfigChangeListeners = append(dispatcher.onConfigChangeListeners, li)
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// A Source gets configuration.
|
// A Source gets configuration.
|
||||||
|
@ -114,7 +115,6 @@ func NewFileOrEnvironmentSource(
|
||||||
cfg := &Config{
|
cfg := &Config{
|
||||||
Options: options,
|
Options: options,
|
||||||
EnvoyVersion: envoyVersion,
|
EnvoyVersion: envoyVersion,
|
||||||
Version: 1,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ports, err := netutil.AllocatePorts(6)
|
ports, err := netutil.AllocatePorts(6)
|
||||||
|
@ -152,7 +152,6 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
|
||||||
options, err := newOptionsFromConfig(src.configFile)
|
options, err := newOptionsFromConfig(src.configFile)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cfg = cfg.Clone()
|
cfg = cfg.Clone()
|
||||||
cfg.Version++
|
|
||||||
cfg.Options = options
|
cfg.Options = options
|
||||||
metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true)
|
metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true)
|
||||||
} else {
|
} else {
|
||||||
|
@ -162,7 +161,7 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
|
||||||
src.config = cfg
|
src.config = cfg
|
||||||
src.mu.Unlock()
|
src.mu.Unlock()
|
||||||
|
|
||||||
log.Info(ctx).Int64("config-version", cfg.Version).Msg("config: loaded configuration")
|
log.Info(ctx).Msg("config: loaded configuration")
|
||||||
|
|
||||||
src.Trigger(ctx, cfg)
|
src.Trigger(ctx, cfg)
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
func Test_buildPolicyTransportSocket(t *testing.T) {
|
func Test_buildPolicyTransportSocket(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
cacheDir, _ := os.UserCacheDir()
|
cacheDir, _ := os.UserCacheDir()
|
||||||
customCA := filepath.Join(cacheDir, "pomerium", "envoy", "files", "custom-ca-32484c314b584447463735303142374c31414145374650305a525539554938594d524855353757313942494d473847535231.pem")
|
customCA := filepath.Join(cacheDir, "pomerium", "envoy", "files", "custom-ca-57394a4e5157303436544830.pem")
|
||||||
|
|
||||||
b := New("local-grpc", "local-http", "local-metrics", filemgr.NewManager(), nil)
|
b := New("local-grpc", "local-http", "local-metrics", filemgr.NewManager(), nil)
|
||||||
rootCABytes, _ := getCombinedCertificateAuthority(&config.Config{Options: &config.Options{}})
|
rootCABytes, _ := getCombinedCertificateAuthority(&config.Config{Options: &config.Options{}})
|
||||||
|
@ -406,10 +406,10 @@ func Test_buildPolicyTransportSocket(t *testing.T) {
|
||||||
},
|
},
|
||||||
"tlsCertificates": [{
|
"tlsCertificates": [{
|
||||||
"certificateChain":{
|
"certificateChain":{
|
||||||
"filename": "`+filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-crt-354e49305a5a39414a545530374e58454e48334148524c4e324258463837364355564c4e4532464b54355139495547514a38.pem")+`"
|
"filename": "`+filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-crt-32375a484d4f49594c4d374830.pem")+`"
|
||||||
},
|
},
|
||||||
"privateKey": {
|
"privateKey": {
|
||||||
"filename": "`+filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-key-3350415a38414e4e4a4655424e55393430474147324651433949384e485341334b5157364f424b4c5856365a545937383735.pem")+`"
|
"filename": "`+filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-key-33393156483053584631414836.pem")+`"
|
||||||
}
|
}
|
||||||
}],
|
}],
|
||||||
"validationContext": {
|
"validationContext": {
|
||||||
|
|
|
@ -3,20 +3,21 @@ package filemgr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||||
"github.com/martinlindhe/base36"
|
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/log"
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
"github.com/pomerium/pomerium/pkg/cryptutil"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// A Manager manages files for envoy.
|
// A Manager manages files for envoy.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
cfg *config
|
cfg *config
|
||||||
|
|
||||||
|
initOnce sync.Once
|
||||||
|
initErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager creates a new Manager.
|
// NewManager creates a new Manager.
|
||||||
|
@ -27,18 +28,23 @@ func NewManager(options ...Option) *Manager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mgr *Manager) init() {
|
||||||
|
mgr.initOnce.Do(func() {
|
||||||
|
mgr.initErr = os.MkdirAll(mgr.cfg.cacheDir, 0o700)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// BytesDataSource returns an envoy config data source based on bytes.
|
// BytesDataSource returns an envoy config data source based on bytes.
|
||||||
func (mgr *Manager) BytesDataSource(fileName string, data []byte) *envoy_config_core_v3.DataSource {
|
func (mgr *Manager) BytesDataSource(fileName string, data []byte) *envoy_config_core_v3.DataSource {
|
||||||
h := base36.EncodeBytes(cryptutil.Hash("filemgr", data))
|
mgr.init()
|
||||||
ext := filepath.Ext(fileName)
|
if mgr.initErr != nil {
|
||||||
fileName = fmt.Sprintf("%s-%x%s", fileName[:len(fileName)-len(ext)], h, ext)
|
log.Error(context.Background()).Err(mgr.initErr).Msg("filemgr: error creating cache directory, falling back to inline bytes")
|
||||||
|
|
||||||
if err := os.MkdirAll(mgr.cfg.cacheDir, 0o700); err != nil {
|
|
||||||
log.Error(context.TODO()).Err(err).Msg("filemgr: error creating cache directory, falling back to inline bytes")
|
|
||||||
return inlineBytes(data)
|
return inlineBytes(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fileName = GetFileNameWithBytesHash(fileName, data)
|
||||||
filePath := filepath.Join(mgr.cfg.cacheDir, fileName)
|
filePath := filepath.Join(mgr.cfg.cacheDir, fileName)
|
||||||
|
|
||||||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||||||
err = os.WriteFile(filePath, data, 0o600)
|
err = os.WriteFile(filePath, data, 0o600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -17,7 +17,7 @@ func Test(t *testing.T) {
|
||||||
ds := mgr.BytesDataSource("test.txt", []byte{1, 2, 3, 4, 5})
|
ds := mgr.BytesDataSource("test.txt", []byte{1, 2, 3, 4, 5})
|
||||||
assert.Equal(t, &envoy_config_core_v3.DataSource{
|
assert.Equal(t, &envoy_config_core_v3.DataSource{
|
||||||
Specifier: &envoy_config_core_v3.DataSource_Filename{
|
Specifier: &envoy_config_core_v3.DataSource_Filename{
|
||||||
Filename: filepath.Join(dir, "test-353354494b53534a5538435652584d594a5759394d43484f38514b34594b4b524b34515339593249344e4238474a5436414b.txt"),
|
Filename: filepath.Join(dir, "test-32354837325a545944534a4537.txt"),
|
||||||
},
|
},
|
||||||
}, ds)
|
}, ds)
|
||||||
mgr.ClearCache()
|
mgr.ClearCache()
|
||||||
|
@ -32,7 +32,7 @@ func Test(t *testing.T) {
|
||||||
ds := mgr.FileDataSource(tmpFilePath)
|
ds := mgr.FileDataSource(tmpFilePath)
|
||||||
assert.Equal(t, &envoy_config_core_v3.DataSource{
|
assert.Equal(t, &envoy_config_core_v3.DataSource{
|
||||||
Specifier: &envoy_config_core_v3.DataSource_Filename{
|
Specifier: &envoy_config_core_v3.DataSource_Filename{
|
||||||
Filename: filepath.Join(dir, "test-34514f59593332445a5649504230484142544c515057383944383730554833564d32574836354654585954304e424f464336.txt"),
|
Filename: filepath.Join(dir, "test-474136555958463735414951.txt"),
|
||||||
},
|
},
|
||||||
}, ds)
|
}, ds)
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ func Test(t *testing.T) {
|
||||||
ds = mgr.FileDataSource(tmpFilePath)
|
ds = mgr.FileDataSource(tmpFilePath)
|
||||||
assert.Equal(t, &envoy_config_core_v3.DataSource{
|
assert.Equal(t, &envoy_config_core_v3.DataSource{
|
||||||
Specifier: &envoy_config_core_v3.DataSource_Filename{
|
Specifier: &envoy_config_core_v3.DataSource_Filename{
|
||||||
Filename: filepath.Join(dir, "test-32564e4457304430393559364b5747373138584f484f5a51334d365758584b47364b555a4c444849513241513457323259.txt"),
|
Filename: filepath.Join(dir, "test-3331324c4a35574d5439444d4c.txt"),
|
||||||
},
|
},
|
||||||
}, ds)
|
}, ds)
|
||||||
|
|
||||||
|
|
18
config/envoyconfig/filemgr/name.go
Normal file
18
config/envoyconfig/filemgr/name.go
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
package filemgr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/cespare/xxhash/v2"
|
||||||
|
"github.com/martinlindhe/base36"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetFileNameWithBytesHash constructs a filename using a base filename and a hash of
|
||||||
|
// the data. For example: GetFileNameWithBytesHash("example.txt", []byte{...}) ==> "example-abcd1234.txt"
|
||||||
|
func GetFileNameWithBytesHash(base string, data []byte) string {
|
||||||
|
h := xxhash.Sum64(data)
|
||||||
|
he := base36.Encode(h)
|
||||||
|
ext := filepath.Ext(base)
|
||||||
|
return fmt.Sprintf("%s-%x%s", base[:len(base)-len(ext)], he, ext)
|
||||||
|
}
|
19
config/envoyconfig/filemgr/name_test.go
Normal file
19
config/envoyconfig/filemgr/name_test.go
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
package filemgr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkGetFileNameWithBytesHash(b *testing.B) {
|
||||||
|
bs := make([]byte, 1024*128)
|
||||||
|
_, err := rand.Read(bs)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
GetFileNameWithBytesHash("example.crt", bs)
|
||||||
|
}
|
||||||
|
}
|
|
@ -41,8 +41,8 @@ func testData(t *testing.T, name string, data interface{}) string {
|
||||||
|
|
||||||
func Test_buildMetricsHTTPConnectionManagerFilter(t *testing.T) {
|
func Test_buildMetricsHTTPConnectionManagerFilter(t *testing.T) {
|
||||||
cacheDir, _ := os.UserCacheDir()
|
cacheDir, _ := os.UserCacheDir()
|
||||||
certFileName := filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-crt-354e49305a5a39414a545530374e58454e48334148524c4e324258463837364355564c4e4532464b54355139495547514a38.pem")
|
certFileName := filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-crt-32375a484d4f49594c4d374830.pem")
|
||||||
keyFileName := filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-key-3350415a38414e4e4a4655424e55393430474147324651433949384e485341334b5157364f424b4c5856365a545937383735.pem")
|
keyFileName := filepath.Join(cacheDir, "pomerium", "envoy", "files", "tls-key-33393156483053584631414836.pem")
|
||||||
|
|
||||||
b := New("local-grpc", "local-http", "local-metrics", filemgr.NewManager(), nil)
|
b := New("local-grpc", "local-http", "local-metrics", filemgr.NewManager(), nil)
|
||||||
li, err := b.buildMetricsListener(&config.Config{
|
li, err := b.buildMetricsListener(&config.Config{
|
||||||
|
@ -74,7 +74,7 @@ func Test_buildDownstreamTLSContext(t *testing.T) {
|
||||||
b := New("local-grpc", "local-http", "local-metrics", filemgr.NewManager(), nil)
|
b := New("local-grpc", "local-http", "local-metrics", filemgr.NewManager(), nil)
|
||||||
|
|
||||||
cacheDir, _ := os.UserCacheDir()
|
cacheDir, _ := os.UserCacheDir()
|
||||||
clientCAFileName := filepath.Join(cacheDir, "pomerium", "envoy", "files", "client-ca-3533485838304b593757424e3354425157494c4747433534384f474f3631364d5332554c3332485a483834334d50454c344a.pem")
|
clientCAFileName := filepath.Join(cacheDir, "pomerium", "envoy", "files", "client-ca-313754424855313435355a5348.pem")
|
||||||
|
|
||||||
t.Run("no-validation", func(t *testing.T) {
|
t.Run("no-validation", func(t *testing.T) {
|
||||||
downstreamTLSContext, err := b.buildDownstreamTLSContextMulti(context.Background(), &config.Config{Options: &config.Options{}}, nil)
|
downstreamTLSContext, err := b.buildDownstreamTLSContextMulti(context.Background(), &config.Config{Options: &config.Options{}}, nil)
|
||||||
|
@ -207,8 +207,7 @@ func Test_buildDownstreamTLSContext(t *testing.T) {
|
||||||
}}
|
}}
|
||||||
|
|
||||||
maxVerifyDepth = 10
|
maxVerifyDepth = 10
|
||||||
downstreamTLSContext, err :=
|
downstreamTLSContext, err := b.buildDownstreamTLSContextMulti(context.Background(), config, nil)
|
||||||
b.buildDownstreamTLSContextMulti(context.Background(), config, nil)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
testutil.AssertProtoJSONEqual(t, `{
|
testutil.AssertProtoJSONEqual(t, `{
|
||||||
"maxVerifyDepth": 10,
|
"maxVerifyDepth": 10,
|
||||||
|
@ -220,8 +219,7 @@ func Test_buildDownstreamTLSContext(t *testing.T) {
|
||||||
}`, downstreamTLSContext.GetCommonTlsContext().GetValidationContext())
|
}`, downstreamTLSContext.GetCommonTlsContext().GetValidationContext())
|
||||||
|
|
||||||
maxVerifyDepth = 0
|
maxVerifyDepth = 0
|
||||||
downstreamTLSContext, err =
|
downstreamTLSContext, err = b.buildDownstreamTLSContextMulti(context.Background(), config, nil)
|
||||||
b.buildDownstreamTLSContextMulti(context.Background(), config, nil)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
testutil.AssertProtoJSONEqual(t, `{
|
testutil.AssertProtoJSONEqual(t, `{
|
||||||
"onlyVerifyLeafCertCrl": true,
|
"onlyVerifyLeafCertCrl": true,
|
||||||
|
@ -243,8 +241,7 @@ func Test_buildDownstreamTLSContext(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}}
|
}}
|
||||||
downstreamTLSContext, err :=
|
downstreamTLSContext, err := b.buildDownstreamTLSContextMulti(context.Background(), config, nil)
|
||||||
b.buildDownstreamTLSContextMulti(context.Background(), config, nil)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
testutil.AssertProtoJSONEqual(t, `{
|
testutil.AssertProtoJSONEqual(t, `{
|
||||||
"maxVerifyDepth": 1,
|
"maxVerifyDepth": 1,
|
||||||
|
|
|
@ -3,7 +3,9 @@ package config_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -13,6 +15,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestLayeredConfig(t *testing.T) {
|
func TestLayeredConfig(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
t.Run("error on initial build", func(t *testing.T) {
|
t.Run("error on initial build", func(t *testing.T) {
|
||||||
|
@ -33,12 +37,15 @@ func TestLayeredConfig(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var dst *config.Config
|
var dst atomic.Pointer[config.Config]
|
||||||
|
dst.Store(layered.GetConfig())
|
||||||
layered.OnConfigChange(ctx, func(ctx context.Context, c *config.Config) {
|
layered.OnConfigChange(ctx, func(ctx context.Context, c *config.Config) {
|
||||||
dst = c
|
dst.Store(c)
|
||||||
})
|
})
|
||||||
|
|
||||||
underlying.SetConfig(ctx, &config.Config{Options: &config.Options{DeriveInternalDomainCert: proto.String("b.com")}})
|
underlying.SetConfig(ctx, &config.Config{Options: &config.Options{DeriveInternalDomainCert: proto.String("b.com")}})
|
||||||
assert.Equal(t, "b.com", dst.Options.GetDeriveInternalDomain())
|
assert.Eventually(t, func() bool {
|
||||||
|
return dst.Load().Options.GetDeriveInternalDomain() == "b.com"
|
||||||
|
}, 10*time.Second, time.Millisecond)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,7 +255,6 @@ func (srv *Server) Run(ctx context.Context) error {
|
||||||
err := srv.update(ctx, cfg)
|
err := srv.update(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx).Err(err).
|
log.Error(ctx).Err(err).
|
||||||
Int64("config-version", cfg.Version).
|
|
||||||
Msg("controlplane: error updating server with new config")
|
Msg("controlplane: error updating server with new config")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -303,7 +302,7 @@ func (srv *Server) update(ctx context.Context, cfg *config.Config) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
srv.xdsmgr.Update(ctx, cfg.Version, res)
|
srv.xdsmgr.Update(ctx, res)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
||||||
|
|
||||||
cfg := srv.currentConfig.Load()
|
cfg := srv.currentConfig.Load()
|
||||||
|
|
||||||
log.Info(ctx).Int64("config-version", cfg.Version).Msg("controlplane: building discovery resources")
|
log.Info(ctx).Msg("controlplane: building discovery resources")
|
||||||
|
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
@ -84,7 +84,6 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(ctx).
|
log.Info(ctx).
|
||||||
Int64("config-version", cfg.Version).
|
|
||||||
Int("cluster-count", len(clusterResources)).
|
Int("cluster-count", len(clusterResources)).
|
||||||
Int("listener-count", len(listenerResources)).
|
Int("listener-count", len(listenerResources)).
|
||||||
Int("route-configuration-count", len(routeConfigurationResources)).
|
Int("route-configuration-count", len(routeConfigurationResources)).
|
||||||
|
|
|
@ -3,9 +3,6 @@ package xdsmgr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||||
|
@ -40,7 +37,7 @@ func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Ma
|
||||||
return &Manager{
|
return &Manager{
|
||||||
signal: signal.New(),
|
signal: signal.New(),
|
||||||
|
|
||||||
nonce: toNonce(0),
|
nonce: uuid.New().String(),
|
||||||
resources: resources,
|
resources: resources,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -111,8 +108,8 @@ func (mgr *Manager) DeltaAggregatedResources(
|
||||||
// neither an ACK or a NACK
|
// neither an ACK or a NACK
|
||||||
case req.GetErrorDetail() != nil:
|
case req.GetErrorDetail() != nil:
|
||||||
log.Info(ctx).
|
log.Info(ctx).
|
||||||
|
Str("type-url", req.GetTypeUrl()).
|
||||||
Any("error-detail", req.GetErrorDetail()).
|
Any("error-detail", req.GetErrorDetail()).
|
||||||
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
|
|
||||||
Msg("xdsmgr: nack")
|
Msg("xdsmgr: nack")
|
||||||
// a NACK
|
// a NACK
|
||||||
// - set the client resource versions to the current resource versions
|
// - set the client resource versions to the current resource versions
|
||||||
|
@ -122,7 +119,7 @@ func (mgr *Manager) DeltaAggregatedResources(
|
||||||
}
|
}
|
||||||
case req.GetResponseNonce() == mgr.nonce:
|
case req.GetResponseNonce() == mgr.nonce:
|
||||||
log.Info(ctx).
|
log.Info(ctx).
|
||||||
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
|
Str("type-url", req.GetTypeUrl()).
|
||||||
Msg("xdsmgr: ack")
|
Msg("xdsmgr: ack")
|
||||||
// an ACK for the last response
|
// an ACK for the last response
|
||||||
// - set the client resource versions to the current resource versions
|
// - set the client resource versions to the current resource versions
|
||||||
|
@ -133,7 +130,7 @@ func (mgr *Manager) DeltaAggregatedResources(
|
||||||
default:
|
default:
|
||||||
// an ACK for a response that's not the last response
|
// an ACK for a response that's not the last response
|
||||||
log.Info(ctx).
|
log.Info(ctx).
|
||||||
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
|
Str("type-url", req.GetTypeUrl()).
|
||||||
Msg("xdsmgr: ack")
|
Msg("xdsmgr: ack")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +212,7 @@ func (mgr *Manager) DeltaAggregatedResources(
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case res := <-outgoing:
|
case res := <-outgoing:
|
||||||
log.Info(ctx).
|
log.Info(ctx).
|
||||||
Int64("config-version", versionFromNonce(res.GetNonce())).
|
Str("type-url", res.GetTypeUrl()).
|
||||||
Int("resource-count", len(res.GetResources())).
|
Int("resource-count", len(res.GetResources())).
|
||||||
Int("removed-resource-count", len(res.GetRemovedResources())).
|
Int("removed-resource-count", len(res.GetRemovedResources())).
|
||||||
Msg("xdsmgr: sending resources")
|
Msg("xdsmgr: sending resources")
|
||||||
|
@ -238,8 +235,8 @@ func (mgr *Manager) StreamAggregatedResources(
|
||||||
|
|
||||||
// Update updates the state of resources. If any changes are made they will be pushed to any listening
|
// Update updates the state of resources. If any changes are made they will be pushed to any listening
|
||||||
// streams. For each TypeURL the list of resources should be the complete list of resources.
|
// streams. For each TypeURL the list of resources should be the complete list of resources.
|
||||||
func (mgr *Manager) Update(ctx context.Context, version int64, resources map[string][]*envoy_service_discovery_v3.Resource) {
|
func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_service_discovery_v3.Resource) {
|
||||||
nonce := toNonce(version)
|
nonce := uuid.New().String()
|
||||||
|
|
||||||
mgr.mu.Lock()
|
mgr.mu.Lock()
|
||||||
mgr.nonce = nonce
|
mgr.nonce = nonce
|
||||||
|
@ -248,15 +245,3 @@ func (mgr *Manager) Update(ctx context.Context, version int64, resources map[str
|
||||||
|
|
||||||
mgr.signal.Broadcast(ctx)
|
mgr.signal.Broadcast(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func toNonce(version int64) string {
|
|
||||||
return fmt.Sprintf("%d/%s", version, uuid.New().String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// versionFromNonce parses the version out of the nonce. A missing or invalid version will be returned as 0.
|
|
||||||
func versionFromNonce(nonce string) (version int64) {
|
|
||||||
if idx := strings.Index(nonce, "/"); idx > 0 {
|
|
||||||
version, _ = strconv.ParseInt(nonce[:idx], 10, 64)
|
|
||||||
}
|
|
||||||
return version
|
|
||||||
}
|
|
||||||
|
|
|
@ -94,7 +94,7 @@ func TestManager(t *testing.T) {
|
||||||
}, msg.GetResources())
|
}, msg.GetResources())
|
||||||
ack(msg.Nonce)
|
ack(msg.Nonce)
|
||||||
|
|
||||||
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{
|
mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
|
||||||
typeURL: {{Name: "r1", Version: "2"}},
|
typeURL: {{Name: "r1", Version: "2"}},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ func TestManager(t *testing.T) {
|
||||||
}, msg.GetResources())
|
}, msg.GetResources())
|
||||||
ack(msg.Nonce)
|
ack(msg.Nonce)
|
||||||
|
|
||||||
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{
|
mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
|
||||||
typeURL: nil,
|
typeURL: nil,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -112,9 +112,6 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
|
||||||
// add all the config policies to the list
|
// add all the config policies to the list
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
cfgpb := src.dbConfigs[id]
|
cfgpb := src.dbConfigs[id]
|
||||||
if cfgpb.GetVersion() > 0 {
|
|
||||||
cfg.Version = cfgpb.GetVersion()
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.Options.ApplySettings(ctx, certsIndex, cfgpb.Settings)
|
cfg.Options.ApplySettings(ctx, certsIndex, cfgpb.Settings)
|
||||||
var errCount uint64
|
var errCount uint64
|
||||||
|
@ -174,7 +171,7 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
|
||||||
// add the additional policies here since calling `Validate` will reset them.
|
// add the additional policies here since calling `Validate` will reset them.
|
||||||
cfg.Options.AdditionalPolicies = append(cfg.Options.AdditionalPolicies, additionalPolicies...)
|
cfg.Options.AdditionalPolicies = append(cfg.Options.AdditionalPolicies, additionalPolicies...)
|
||||||
|
|
||||||
log.Info(ctx).Int64("config-version", cfg.Version).Msg("databroker: built new config")
|
log.Info(ctx).Msg("databroker: built new config")
|
||||||
|
|
||||||
src.computedConfig = cfg
|
src.computedConfig = cfg
|
||||||
if !firstTime {
|
if !firstTime {
|
||||||
|
|
166
internal/events/target.go
Normal file
166
internal/events/target.go
Normal file
|
@ -0,0 +1,166 @@
|
||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
// A Listener is a function that listens for events of type T.
|
||||||
|
Listener[T any] func(ctx context.Context, event T)
|
||||||
|
// A Handle represents a listener.
|
||||||
|
Handle string
|
||||||
|
|
||||||
|
addListenerEvent[T any] struct {
|
||||||
|
listener Listener[T]
|
||||||
|
handle Handle
|
||||||
|
}
|
||||||
|
removeListenerEvent[T any] struct {
|
||||||
|
handle Handle
|
||||||
|
}
|
||||||
|
dispatchEvent[T any] struct {
|
||||||
|
ctx context.Context
|
||||||
|
event T
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// A Target is a target for events.
|
||||||
|
//
|
||||||
|
// Listeners are added with AddListener with a function to be called when the event occurs.
|
||||||
|
// AddListener returns a Handle which can be used to remove a listener with RemoveListener.
|
||||||
|
//
|
||||||
|
// Dispatch dispatches events to all the registered listeners.
|
||||||
|
//
|
||||||
|
// Target is safe to use in its zero state.
|
||||||
|
//
|
||||||
|
// The first time any method of Target is called a background goroutine is started that handles
|
||||||
|
// any requests and maintains the state of the listeners. Each listener also starts a
|
||||||
|
// separate goroutine so that all listeners can be invoked concurrently.
|
||||||
|
//
|
||||||
|
// The channels to the main goroutine and to the listener goroutines have a size of 1 so typically
|
||||||
|
// methods and dispatches will return immediately. However a slow listener will cause the next event
|
||||||
|
// dispatch to block. This is the opposite behavior from Manager.
|
||||||
|
//
|
||||||
|
// Close will cancel all the goroutines. Subsequent calls to AddListener, RemoveListener, Close and
|
||||||
|
// Dispatch are no-ops.
|
||||||
|
type Target[T any] struct {
|
||||||
|
initOnce sync.Once
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelCauseFunc
|
||||||
|
addListenerCh chan addListenerEvent[T]
|
||||||
|
removeListenerCh chan removeListenerEvent[T]
|
||||||
|
dispatchCh chan dispatchEvent[T]
|
||||||
|
listeners map[Handle]chan dispatchEvent[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddListener adds a listener to the target.
|
||||||
|
func (t *Target[T]) AddListener(listener Listener[T]) Handle {
|
||||||
|
t.init()
|
||||||
|
|
||||||
|
// using a handle is necessary because you can't use a function as a map key.
|
||||||
|
handle := Handle(uuid.NewString())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
case t.addListenerCh <- addListenerEvent[T]{listener, handle}:
|
||||||
|
}
|
||||||
|
|
||||||
|
return handle
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the event target. This can be called multiple times safely.
|
||||||
|
// Once closed the target cannot be used.
|
||||||
|
func (t *Target[T]) Close() {
|
||||||
|
t.init()
|
||||||
|
|
||||||
|
t.cancel(errors.New("target closed"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dispatch dispatches an event to all listeners.
|
||||||
|
func (t *Target[T]) Dispatch(ctx context.Context, evt T) {
|
||||||
|
t.init()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
case t.dispatchCh <- dispatchEvent[T]{ctx: ctx, event: evt}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveListener removes a listener from the target.
|
||||||
|
func (t *Target[T]) RemoveListener(handle Handle) {
|
||||||
|
t.init()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
case t.removeListenerCh <- removeListenerEvent[T]{handle}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Target[T]) init() {
|
||||||
|
t.initOnce.Do(func() {
|
||||||
|
t.ctx, t.cancel = context.WithCancelCause(context.Background())
|
||||||
|
t.addListenerCh = make(chan addListenerEvent[T], 1)
|
||||||
|
t.removeListenerCh = make(chan removeListenerEvent[T], 1)
|
||||||
|
t.dispatchCh = make(chan dispatchEvent[T], 1)
|
||||||
|
t.listeners = map[Handle]chan dispatchEvent[T]{}
|
||||||
|
go t.run()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Target[T]) run() {
|
||||||
|
// listen for add/remove/dispatch events and call functions
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
return
|
||||||
|
case evt := <-t.addListenerCh:
|
||||||
|
t.addListener(evt.listener, evt.handle)
|
||||||
|
case evt := <-t.removeListenerCh:
|
||||||
|
t.removeListener(evt.handle)
|
||||||
|
case evt := <-t.dispatchCh:
|
||||||
|
t.dispatch(evt.ctx, evt.event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// these functions are not thread-safe. They are intended to be called only by "run".
|
||||||
|
|
||||||
|
func (t *Target[T]) addListener(listener Listener[T], handle Handle) {
|
||||||
|
ch := make(chan dispatchEvent[T], 1)
|
||||||
|
t.listeners[handle] = ch
|
||||||
|
// start a goroutine to send events to the listener
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
case evt := <-ch:
|
||||||
|
listener(evt.ctx, evt.event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Target[T]) removeListener(handle Handle) {
|
||||||
|
ch, ok := t.listeners[handle]
|
||||||
|
if !ok {
|
||||||
|
// nothing to do since the listener doesn't exist
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// close the channel to kill the goroutine
|
||||||
|
close(ch)
|
||||||
|
delete(t.listeners, handle)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Target[T]) dispatch(ctx context.Context, evt T) {
|
||||||
|
// loop over all the listeners and send the event to them
|
||||||
|
for _, ch := range t.listeners {
|
||||||
|
select {
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
return
|
||||||
|
case ch <- dispatchEvent[T]{ctx: ctx, event: evt}:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
53
internal/events/target_test.go
Normal file
53
internal/events/target_test.go
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
package events_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/events"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTarget(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var target events.Target[int64]
|
||||||
|
t.Cleanup(target.Close)
|
||||||
|
|
||||||
|
var calls1, calls2, calls3 atomic.Int64
|
||||||
|
h1 := target.AddListener(func(_ context.Context, i int64) {
|
||||||
|
calls1.Add(i)
|
||||||
|
})
|
||||||
|
h2 := target.AddListener(func(_ context.Context, i int64) {
|
||||||
|
calls2.Add(i)
|
||||||
|
})
|
||||||
|
h3 := target.AddListener(func(_ context.Context, i int64) {
|
||||||
|
calls3.Add(i)
|
||||||
|
})
|
||||||
|
|
||||||
|
shouldBe := func(i1, i2, i3 int64) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
assert.Eventually(t, func() bool { return calls1.Load() == i1 }, time.Second, time.Millisecond)
|
||||||
|
assert.Eventually(t, func() bool { return calls2.Load() == i2 }, time.Second, time.Millisecond)
|
||||||
|
assert.Eventually(t, func() bool { return calls3.Load() == i3 }, time.Second, time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
target.Dispatch(context.Background(), 1)
|
||||||
|
shouldBe(1, 1, 1)
|
||||||
|
|
||||||
|
target.RemoveListener(h2)
|
||||||
|
target.Dispatch(context.Background(), 2)
|
||||||
|
shouldBe(3, 1, 3)
|
||||||
|
|
||||||
|
target.RemoveListener(h1)
|
||||||
|
target.Dispatch(context.Background(), 3)
|
||||||
|
shouldBe(3, 1, 6)
|
||||||
|
|
||||||
|
target.RemoveListener(h3)
|
||||||
|
target.Dispatch(context.Background(), 4)
|
||||||
|
shouldBe(3, 1, 6)
|
||||||
|
}
|
File diff suppressed because it is too large
Load diff
|
@ -12,7 +12,6 @@ import "crypt/crypt.proto";
|
||||||
|
|
||||||
message Config {
|
message Config {
|
||||||
string name = 1;
|
string name = 1;
|
||||||
int64 version = 4;
|
|
||||||
repeated Route routes = 2;
|
repeated Route routes = 2;
|
||||||
Settings settings = 3;
|
Settings settings = 3;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue