core/config: refactor file watcher (#4702)

* core/config: refactor file watcher

* add comments

* updates

* only use the polling watcher

* fix test

* fix test

* try to fix test again

* remove batching

* dont rely on file modification timestamp

* remove benchmark

* try fix again
This commit is contained in:
Caleb Doxsey 2023-11-03 15:53:20 -06:00 committed by GitHub
parent 77bb203276
commit 2771a5ae87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 170 additions and 139 deletions

View file

@ -2,11 +2,12 @@ package config
import (
"context"
"crypto/sha256"
"fmt"
"io"
"os"
"sync"
"github.com/cespare/xxhash/v2"
"github.com/google/uuid"
"github.com/rs/zerolog"
@ -15,6 +16,7 @@ import (
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/pkg/netutil"
"github.com/pomerium/pomerium/pkg/slices"
)
// A ChangeListener is called when configuration changes.
@ -131,7 +133,9 @@ func NewFileOrEnvironmentSource(
watcher: fileutil.NewWatcher(),
config: cfg,
}
src.watcher.Add(configFile)
if configFile != "" {
src.watcher.Watch(ctx, []string{configFile})
}
ch := src.watcher.Bind()
go func() {
for range ch {
@ -179,29 +183,32 @@ type FileWatcherSource struct {
underlying Source
watcher *fileutil.Watcher
mu sync.RWMutex
computedConfig *Config
mu sync.RWMutex
hash uint64
cfg *Config
ChangeDispatcher
}
// NewFileWatcherSource creates a new FileWatcherSource
func NewFileWatcherSource(underlying Source) *FileWatcherSource {
func NewFileWatcherSource(ctx context.Context, underlying Source) *FileWatcherSource {
cfg := underlying.GetConfig()
src := &FileWatcherSource{
underlying: underlying,
watcher: fileutil.NewWatcher(),
cfg: cfg,
}
ch := src.watcher.Bind()
go func() {
for range ch {
src.check(context.TODO(), underlying.GetConfig())
src.onFileChange(ctx)
}
}()
underlying.OnConfigChange(context.TODO(), func(ctx context.Context, cfg *Config) {
src.check(ctx, cfg)
underlying.OnConfigChange(ctx, func(ctx context.Context, cfg *Config) {
src.onConfigChange(ctx, cfg)
})
src.check(context.TODO(), underlying.GetConfig())
src.onConfigChange(ctx, cfg)
return src
}
@ -210,20 +217,56 @@ func NewFileWatcherSource(underlying Source) *FileWatcherSource {
func (src *FileWatcherSource) GetConfig() *Config {
src.mu.RLock()
defer src.mu.RUnlock()
return src.computedConfig
return src.cfg
}
func (src *FileWatcherSource) check(ctx context.Context, cfg *Config) {
if cfg == nil || cfg.Options == nil {
return
}
func (src *FileWatcherSource) onConfigChange(ctx context.Context, cfg *Config) {
// update the file watcher with paths from the config
src.watcher.Watch(ctx, getAllConfigFilePaths(cfg))
src.mu.Lock()
defer src.mu.Unlock()
src.watcher.Clear()
// store the config and trigger an update
src.cfg = cfg.Clone()
src.hash = getAllConfigFilePathsHash(src.cfg)
log.Info(ctx).Uint64("hash", src.hash).Msg("config/filewatchersource: underlying config change, triggering update")
src.Trigger(ctx, src.cfg)
}
h := sha256.New()
func (src *FileWatcherSource) onFileChange(ctx context.Context) {
src.mu.Lock()
defer src.mu.Unlock()
hash := getAllConfigFilePathsHash(src.cfg)
if hash == src.hash {
log.Info(ctx).Uint64("hash", src.hash).Msg("config/filewatchersource: no change detected")
} else {
// if the hash changed, trigger an update
// the actual config will be identical
src.hash = hash
log.Info(ctx).Uint64("hash", src.hash).Msg("config/filewatchersource: change detected, triggering update")
src.Trigger(ctx, src.cfg)
}
}
func getAllConfigFilePathsHash(cfg *Config) uint64 {
// read all the config files and build a hash from their contents
h := xxhash.New()
for _, f := range getAllConfigFilePaths(cfg) {
_, _ = h.Write([]byte{0})
f, err := os.Open(f)
if err == nil {
_, _ = io.Copy(h, f)
_ = f.Close()
}
}
return h.Sum64()
}
func getAllConfigFilePaths(cfg *Config) []string {
fs := []string{
cfg.Options.CAFile,
cfg.Options.CertFile,
@ -258,18 +301,9 @@ func (src *FileWatcherSource) check(ctx context.Context, cfg *Config) {
)
}
for _, f := range fs {
_, _ = h.Write([]byte{0})
bs, err := os.ReadFile(f)
if err == nil {
src.watcher.Add(f)
_, _ = h.Write(bs)
}
}
fs = slices.Filter(fs, func(s string) bool {
return s != ""
})
// update the computed config
src.computedConfig = cfg.Clone()
// trigger a change
src.Trigger(ctx, src.computedConfig)
return fs
}