fileutil: update watcher to use fsnotify and polling (#3663)

* fileutil: update watcher to use fsnotify and polling

* raise timeout

* maybe fix
This commit is contained in:
Caleb Doxsey 2022-10-19 09:13:08 -06:00 committed by GitHub
parent daed2d260c
commit d147846e64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 315 additions and 34 deletions

View file

@ -4,9 +4,11 @@ import (
"context"
"sync"
"github.com/rjeczalik/notify"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog"
"namespacelabs.dev/go-filenotify"
"github.com/pomerium/pomerium/internal/chanutil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/signal"
)
@ -14,15 +16,18 @@ import (
// A Watcher watches files for changes.
type Watcher struct {
*signal.Signal
mu sync.Mutex
filePaths map[string]chan notify.EventInfo
mu sync.Mutex
watching map[string]struct{}
eventWatcher filenotify.FileWatcher
pollingWatcher filenotify.FileWatcher
}
// NewWatcher creates a new Watcher.
func NewWatcher() *Watcher {
return &Watcher{
Signal: signal.New(),
filePaths: map[string]chan notify.EventInfo{},
Signal: signal.New(),
watching: make(map[string]struct{}),
}
}
@ -31,32 +36,27 @@ func (watcher *Watcher) Add(filePath string) {
watcher.mu.Lock()
defer watcher.mu.Unlock()
ctx := log.WithContext(context.TODO(), func(c zerolog.Context) zerolog.Context {
// already watching
if _, ok := watcher.watching[filePath]; ok {
return
}
ctx := log.WithContext(context.Background(), func(c zerolog.Context) zerolog.Context {
return c.Str("watch_file", filePath)
})
watcher.initLocked(ctx)
// already watching
if _, ok := watcher.filePaths[filePath]; ok {
return
}
ch := make(chan notify.EventInfo, 1)
go func() {
for evt := range ch {
log.Info(ctx).Str("event", evt.Event().String()).Msg("filemgr: detected file change")
watcher.Signal.Broadcast(ctx)
if watcher.eventWatcher != nil {
if err := watcher.eventWatcher.Add(filePath); err != nil {
log.Error(ctx).Msg("fileutil/watcher: failed to watch file with event-based file watcher")
}
}()
err := notify.Watch(filePath, ch, notify.All)
if err != nil {
log.Error(ctx).Err(err).Msg("filemgr: error watching file path")
notify.Stop(ch)
close(ch)
return
}
log.Debug(ctx).Msg("filemgr: watching file for changes")
watcher.filePaths[filePath] = ch
if watcher.pollingWatcher != nil {
if err := watcher.pollingWatcher.Add(filePath); err != nil {
log.Error(ctx).Msg("fileutil/watcher: failed to watch file with polling-based file watcher")
}
}
}
// Clear removes all watches.
@ -64,9 +64,57 @@ func (watcher *Watcher) Clear() {
watcher.mu.Lock()
defer watcher.mu.Unlock()
for filePath, ch := range watcher.filePaths {
notify.Stop(ch)
close(ch)
delete(watcher.filePaths, filePath)
if w := watcher.eventWatcher; w != nil {
_ = watcher.pollingWatcher.Close()
watcher.eventWatcher = nil
}
if w := watcher.pollingWatcher; w != nil {
_ = watcher.pollingWatcher.Close()
watcher.pollingWatcher = nil
}
watcher.watching = make(map[string]struct{})
}
func (watcher *Watcher) initLocked(ctx context.Context) {
if watcher.eventWatcher != nil || watcher.pollingWatcher != nil {
return
}
if watcher.eventWatcher == nil {
var err error
watcher.eventWatcher, err = filenotify.NewEventWatcher()
if err != nil {
log.Error(ctx).Msg("fileutil/watcher: failed to create event-based file watcher")
}
}
if watcher.pollingWatcher == nil {
watcher.pollingWatcher = filenotify.NewPollingWatcher(nil)
}
var errors <-chan error = watcher.pollingWatcher.Errors() //nolint
var events <-chan fsnotify.Event = watcher.pollingWatcher.Events() //nolint
if watcher.eventWatcher != nil {
errors = chanutil.Merge(errors, watcher.eventWatcher.Errors())
events = chanutil.Merge(events, watcher.eventWatcher.Events())
}
// log errors
go func() {
for err := range errors {
log.Error(ctx).Err(err).Msg("fileutil/watcher: file notification error")
}
}()
// handle events
go func() {
for evts := range chanutil.Batch(events) {
for _, evt := range evts {
log.Info(ctx).Str("name", evt.Name).Str("op", evt.Op.String()).Msg("fileutil/watcher: file notification event")
}
watcher.Broadcast(ctx)
}
}()
}

View file

@ -23,6 +23,7 @@ func TestWatcher(t *testing.T) {
}
w := NewWatcher()
defer w.Clear()
w.Add(filepath.Join(tmpdir, "test1.txt"))
ch := w.Bind()
@ -39,3 +40,50 @@ func TestWatcher(t *testing.T) {
t.Error("expected change signal when file is modified")
}
}
func TestWatcherSymlink(t *testing.T) {
t.Parallel()
tmpdir := filepath.Join(os.TempDir(), uuid.New().String())
err := os.MkdirAll(tmpdir, 0o755)
if !assert.NoError(t, err) {
return
}
t.Cleanup(func() { os.RemoveAll(tmpdir) })
err = os.WriteFile(filepath.Join(tmpdir, "test1.txt"), []byte{1, 2, 3, 4}, 0o666)
if !assert.NoError(t, err) {
return
}
err = os.WriteFile(filepath.Join(tmpdir, "test2.txt"), []byte{5, 6, 7, 8}, 0o666)
if !assert.NoError(t, err) {
return
}
assert.NoError(t, os.Symlink(filepath.Join(tmpdir, "test1.txt"), filepath.Join(tmpdir, "symlink1.txt")))
w := NewWatcher()
defer w.Clear()
w.Add(filepath.Join(tmpdir, "symlink1.txt"))
ch := w.Bind()
t.Cleanup(func() { w.Unbind(ch) })
assert.NoError(t, os.WriteFile(filepath.Join(tmpdir, "test1.txt"), []byte{9, 10, 11}, 0o666))
select {
case <-ch:
case <-time.After(time.Second):
t.Error("expected change signal when underlying file is modified")
}
assert.NoError(t, os.Symlink(filepath.Join(tmpdir, "test2.txt"), filepath.Join(tmpdir, "symlink2.txt")))
assert.NoError(t, os.Rename(filepath.Join(tmpdir, "symlink2.txt"), filepath.Join(tmpdir, "symlink1.txt")))
select {
case <-ch:
case <-time.After(10 * time.Second):
t.Error("expected change signal when symlink is changed")
}
}