fileutil: update watcher to use fsnotify and polling (#3663)

* fileutil: update watcher to use fsnotify and polling

* raise timeout

* maybe fix
This commit is contained in:
Caleb Doxsey 2022-10-19 09:13:08 -06:00
parent f44c85880b
commit e379e24c6b
9 changed files with 335 additions and 47 deletions

View file

@ -0,0 +1,76 @@
package chanutil
import "time"
const (
defaultBatchMaxSize = 1024
defaultBatchMaxWait = time.Millisecond * 300
)
type batchConfig struct {
maxSize int
maxWait time.Duration
}
// A BatchOption customizes a batch operation.
type BatchOption func(cfg *batchConfig)
// WithBatchMaxSize sets the maximum batch size for a Batch operation.
func WithBatchMaxSize(maxSize int) BatchOption {
return func(cfg *batchConfig) {
cfg.maxSize = maxSize
}
}
// WithBatchMaxWait sets the maximum wait duration for a Batch operation.
func WithBatchMaxWait(maxWait time.Duration) BatchOption {
return func(cfg *batchConfig) {
cfg.maxWait = maxWait
}
}
// Batch returns a new channel that consumes all the items from `in` and batches them together.
func Batch[T any](in <-chan T, options ...BatchOption) <-chan []T {
cfg := new(batchConfig)
WithBatchMaxSize(defaultBatchMaxSize)(cfg)
WithBatchMaxWait(defaultBatchMaxWait)(cfg)
for _, option := range options {
option(cfg)
}
out := make(chan []T)
go func() {
var buf []T
var timer <-chan time.Time
for {
if in == nil && timer == nil {
close(out)
return
}
select {
case item, ok := <-in:
if !ok {
in = nil
timer = time.After(0)
continue
}
buf = append(buf, item)
if timer == nil {
timer = time.After(cfg.maxWait)
}
case <-timer:
timer = nil
for len(buf) > 0 {
batch := buf
if len(batch) > cfg.maxSize {
batch = batch[:cfg.maxSize]
}
buf = buf[len(batch):]
out <- batch
}
}
}
}()
return out
}

View file

@ -0,0 +1,25 @@
package chanutil
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestBatch(t *testing.T) {
ch1 := make(chan int)
go func() {
for _, i := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
ch1 <- i
}
close(ch1)
}()
ch2 := Batch(ch1, WithBatchMaxWait(time.Millisecond*10), WithBatchMaxSize(3))
assert.Equal(t, []int{1, 2, 3}, <-ch2)
assert.Equal(t, []int{4, 5, 6}, <-ch2)
assert.Equal(t, []int{7, 8, 9}, <-ch2)
assert.Equal(t, []int{10}, <-ch2)
assert.Equal(t, []int(nil), <-ch2)
}

View file

@ -0,0 +1,2 @@
// Package chanutil implements methods for working with channels.
package chanutil

View file

@ -0,0 +1,46 @@
package chanutil
// Merge merges multiple channels together.
func Merge[T any](ins ...<-chan T) <-chan T {
switch len(ins) {
case 0:
return nil
case 1:
return ins[0]
case 2:
default:
return Merge(
Merge(ins[:len(ins)/2]...),
Merge(ins[len(ins)/2:]...),
)
}
in1, in2 := ins[0], ins[1]
out := make(chan T)
go func() {
for {
if in1 == nil && in2 == nil {
close(out)
return
}
select {
case item, ok := <-in1:
if !ok {
in1 = nil
continue
}
out <- item
case item, ok := <-in2:
if !ok {
in2 = nil
continue
}
out <- item
}
}
}()
return out
}

View file

@ -0,0 +1,37 @@
package chanutil
import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMerge(t *testing.T) {
ch1, ch2, ch3 := make(chan int), make(chan int), make(chan int)
go func() {
for _, i := range []int{1, 2, 3} {
ch1 <- i
}
close(ch1)
}()
go func() {
for _, i := range []int{4, 5, 6} {
ch2 <- i
}
close(ch2)
}()
go func() {
for _, i := range []int{7, 8, 9} {
ch3 <- i
}
close(ch3)
}()
out := Merge(ch1, ch2, ch3)
var tmp []int
for item := range out {
tmp = append(tmp, item)
}
sort.Ints(tmp)
assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9}, tmp)
}