diff --git a/go.mod b/go.mod index 24f392afd..d63865e91 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,8 @@ require ( github.com/coreos/go-oidc/v3 v3.2.0 github.com/docker/docker v20.10.17+incompatible github.com/envoyproxy/go-control-plane v0.10.3-0.20220819153403-8a9be01c9575 - github.com/envoyproxy/protoc-gen-validate v0.6.7 + github.com/envoyproxy/protoc-gen-validate v0.6.13 + github.com/fsnotify/fsnotify v1.5.4 github.com/go-chi/chi/v5 v5.0.7 github.com/go-jose/go-jose/v3 v3.0.0 github.com/go-redis/redis/v8 v8.11.5 @@ -50,7 +51,6 @@ require ( github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.37.0 github.com/prometheus/procfs v0.8.0 - github.com/rjeczalik/notify v0.9.3-0.20201210012515-e2a77dcc14cf github.com/rs/cors v1.8.2 github.com/rs/zerolog v1.28.0 github.com/shirou/gopsutil/v3 v3.22.7 @@ -65,7 +65,7 @@ require ( go.uber.org/zap v1.23.0 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e - golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 + golang.org/x/net v0.0.0-20220907135653-1e95f45603a7 golang.org/x/oauth2 v0.0.0-20220822191816-0ebed06d0094 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 google.golang.org/api v0.94.0 @@ -74,6 +74,7 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/auth0.v5 v5.21.1 gopkg.in/yaml.v3 v3.0.1 + namespacelabs.dev/go-filenotify v0.0.0-20220511192020-53ea11be7eaa sigs.k8s.io/yaml v1.3.0 ) @@ -125,7 +126,6 @@ require ( github.com/fatih/structtag v1.2.0 // indirect github.com/felixge/httpsnoop v1.0.2 // indirect github.com/firefart/nonamedreturns v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/fxamacker/cbor/v2 v2.3.0 // indirect github.com/fzipp/gocyclo v0.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -195,7 +195,7 @@ require ( github.com/libdns/libdns v0.2.1 // indirect github.com/lufeee/execinquery v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect - github.com/lyft/protoc-gen-star v0.6.0 // indirect + github.com/lyft/protoc-gen-star v0.6.1 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/maratori/testpackage v1.1.0 // indirect github.com/matoous/godox v0.0.0-20210227103229-6504466cf951 // indirect @@ -244,7 +244,7 @@ require ( github.com/sivchari/tenv v1.7.0 // indirect github.com/sonatard/noctx v0.0.1 // indirect github.com/sourcegraph/go-diff v0.6.1 // indirect - github.com/spf13/afero v1.8.2 // indirect + github.com/spf13/afero v1.9.2 // indirect github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cobra v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -281,7 +281,7 @@ require ( golang.org/x/exp/typeparams v0.0.0-20220613132600-b0d781184e0d // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect - golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect + golang.org/x/sys v0.0.0-20220908150016-7ac13a9a928d // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.12 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 012aabb3f..371ec301c 100644 --- a/go.sum +++ b/go.sum @@ -509,8 +509,9 @@ github.com/envoyproxy/go-control-plane v0.10.3-0.20220819153403-8a9be01c9575 h1: github.com/envoyproxy/go-control-plane v0.10.3-0.20220819153403-8a9be01c9575/go.mod h1:fJJn/j26vwOu972OllsvAgJJM//w9BV6Fxbg2LuVd34= github.com/envoyproxy/protoc-gen-validate v0.0.14/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/envoyproxy/protoc-gen-validate v0.6.7 h1:qcZcULcd/abmQg6dwigimCNEyi4gg31M/xaciQlDml8= github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J6romD608Ba7Hij42vrOBCo= +github.com/envoyproxy/protoc-gen-validate v0.6.13 h1:TvDcILLkjuZV3ER58VkBmncKsLUBqBDxra/XctCzuMM= +github.com/envoyproxy/protoc-gen-validate v0.6.13/go.mod h1:qEySVqXrEugbHKvmhI8ZqtQi75/RHSSRNpffvB4I6Bw= github.com/esimonov/ifshort v1.0.4 h1:6SID4yGWfRae/M7hkVDVVyppy8q/v9OuxNdmjLQStBA= github.com/esimonov/ifshort v1.0.4/go.mod h1:Pe8zjlRrJ80+q2CxHLfEOfTwxCZ4O+MuhcHcfgNWTk0= github.com/ettle/strcase v0.1.1 h1:htFueZyVeE1XNnMEfbqp5r67qAN/4r6ya1ysq8Q+Zcw= @@ -1054,8 +1055,9 @@ github.com/lufeee/execinquery v1.2.1 h1:hf0Ems4SHcUGBxpGN7Jz78z1ppVkP/837ZlETPCE github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= -github.com/lyft/protoc-gen-star v0.6.0 h1:xOpFu4vwmIoUeUrRuAtdCrZZymT/6AkW/bsUWA506Fo= github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= +github.com/lyft/protoc-gen-star v0.6.1 h1:erE0rdztuaDq3bpGifD95wfoPrSZc95nGA6tbiNYh6M= +github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -1386,8 +1388,6 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= -github.com/rjeczalik/notify v0.9.3-0.20201210012515-e2a77dcc14cf h1:MY2fqXPSLfjld10N04fNcSFdR9K/Y57iXxZRFAivHzI= -github.com/rjeczalik/notify v0.9.3-0.20201210012515-e2a77dcc14cf/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1467,8 +1467,8 @@ github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= -github.com/spf13/afero v1.8.2 h1:xehSyVa0YnHWsJ49JFljMpg1HX19V6NDZ1fkm1Xznbo= -github.com/spf13/afero v1.8.2/go.mod h1:CtAatgMJh6bJEIs48Ay/FOnkljP3WeGUG0MC1RfAqwo= +github.com/spf13/afero v1.9.2 h1:j49Hj62F0n+DaZ1dDCvhABaPNSGNkt32oRFxI33IEMw= +github.com/spf13/afero v1.9.2/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= @@ -1625,6 +1625,7 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= @@ -1870,8 +1871,9 @@ golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220607020251-c690dde0001d/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220630215102-69896b714898/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48 h1:N9Vc/rorQUDes6B9CNdIxAn5jODGj2wzfrei2x4wNj4= -golang.org/x/net v0.0.0-20220805013720-a33c5aa5df48/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220907135653-1e95f45603a7 h1:1WGATo9HAhkWMbfyuVU0tEFP88OIkUvwaHFveQPvzCQ= +golang.org/x/net v0.0.0-20220907135653-1e95f45603a7/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1913,7 +1915,6 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2049,8 +2050,10 @@ golang.org/x/sys v0.0.0-20220702020025-31831981b65f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908150016-7ac13a9a928d h1:RoyzQTK76Rktm3p4xyZslc8T8I1tBz4UEjZCzeh57mM= +golang.org/x/sys v0.0.0-20220908150016-7ac13a9a928d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -2194,8 +2197,9 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0= golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= google.golang.org/api v0.0.0-20160322025152-9bf6e6e569ff/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -2521,6 +2525,8 @@ mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b h1:DxJ5nJdkhDlLok9K6qO+5290kphD mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b/go.mod h1:2odslEg/xrtNQqCYg2/jCoyKnw3vv5biOc3JnIcYfL4= mvdan.cc/unparam v0.0.0-20220706161116-678bad134442 h1:seuXWbRB1qPrS3NQnHmFKLJLtskWyueeIzmLXghMGgk= mvdan.cc/unparam v0.0.0-20220706161116-678bad134442/go.mod h1:F/Cxw/6mVrNKqrR2YjFf5CaW0Bw4RL8RfbEf4GRggJk= +namespacelabs.dev/go-filenotify v0.0.0-20220511192020-53ea11be7eaa h1:jj2kjs0Hvufj40wuhMAzoZUOwrwMDFg1gHZ49RiIv9w= +namespacelabs.dev/go-filenotify v0.0.0-20220511192020-53ea11be7eaa/go.mod h1:e8NJRaInXRRm1+KPA6EkGEzdLJAgEvVSIKiLzpP97nI= oras.land/oras-go v1.2.0/go.mod h1:pFNs7oHp2dYsYMSS82HaX5l4mpnGO7hbpPN6EWH2ltc= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/internal/chanutil/batch.go b/internal/chanutil/batch.go new file mode 100644 index 000000000..688cd1947 --- /dev/null +++ b/internal/chanutil/batch.go @@ -0,0 +1,76 @@ +package chanutil + +import "time" + +const ( + defaultBatchMaxSize = 1024 + defaultBatchMaxWait = time.Millisecond * 300 +) + +type batchConfig struct { + maxSize int + maxWait time.Duration +} + +// A BatchOption customizes a batch operation. +type BatchOption func(cfg *batchConfig) + +// WithBatchMaxSize sets the maximum batch size for a Batch operation. +func WithBatchMaxSize(maxSize int) BatchOption { + return func(cfg *batchConfig) { + cfg.maxSize = maxSize + } +} + +// WithBatchMaxWait sets the maximum wait duration for a Batch operation. +func WithBatchMaxWait(maxWait time.Duration) BatchOption { + return func(cfg *batchConfig) { + cfg.maxWait = maxWait + } +} + +// Batch returns a new channel that consumes all the items from `in` and batches them together. +func Batch[T any](in <-chan T, options ...BatchOption) <-chan []T { + cfg := new(batchConfig) + WithBatchMaxSize(defaultBatchMaxSize)(cfg) + WithBatchMaxWait(defaultBatchMaxWait)(cfg) + for _, option := range options { + option(cfg) + } + + out := make(chan []T) + go func() { + var buf []T + var timer <-chan time.Time + for { + if in == nil && timer == nil { + close(out) + return + } + + select { + case item, ok := <-in: + if !ok { + in = nil + timer = time.After(0) + continue + } + buf = append(buf, item) + if timer == nil { + timer = time.After(cfg.maxWait) + } + case <-timer: + timer = nil + for len(buf) > 0 { + batch := buf + if len(batch) > cfg.maxSize { + batch = batch[:cfg.maxSize] + } + buf = buf[len(batch):] + out <- batch + } + } + } + }() + return out +} diff --git a/internal/chanutil/batch_test.go b/internal/chanutil/batch_test.go new file mode 100644 index 000000000..81200d87b --- /dev/null +++ b/internal/chanutil/batch_test.go @@ -0,0 +1,25 @@ +package chanutil + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestBatch(t *testing.T) { + ch1 := make(chan int) + go func() { + for _, i := range []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { + ch1 <- i + } + close(ch1) + }() + + ch2 := Batch(ch1, WithBatchMaxWait(time.Millisecond*10), WithBatchMaxSize(3)) + assert.Equal(t, []int{1, 2, 3}, <-ch2) + assert.Equal(t, []int{4, 5, 6}, <-ch2) + assert.Equal(t, []int{7, 8, 9}, <-ch2) + assert.Equal(t, []int{10}, <-ch2) + assert.Equal(t, []int(nil), <-ch2) +} diff --git a/internal/chanutil/chanutil.go b/internal/chanutil/chanutil.go new file mode 100644 index 000000000..f5c89a717 --- /dev/null +++ b/internal/chanutil/chanutil.go @@ -0,0 +1,2 @@ +// Package chanutil implements methods for working with channels. +package chanutil diff --git a/internal/chanutil/merge.go b/internal/chanutil/merge.go new file mode 100644 index 000000000..c2e0ba7fd --- /dev/null +++ b/internal/chanutil/merge.go @@ -0,0 +1,46 @@ +package chanutil + +// Merge merges multiple channels together. +func Merge[T any](ins ...<-chan T) <-chan T { + switch len(ins) { + case 0: + return nil + case 1: + return ins[0] + case 2: + default: + return Merge( + Merge(ins[:len(ins)/2]...), + Merge(ins[len(ins)/2:]...), + ) + } + + in1, in2 := ins[0], ins[1] + out := make(chan T) + go func() { + for { + if in1 == nil && in2 == nil { + close(out) + return + } + + select { + case item, ok := <-in1: + if !ok { + in1 = nil + continue + } + + out <- item + case item, ok := <-in2: + if !ok { + in2 = nil + continue + } + + out <- item + } + } + }() + return out +} diff --git a/internal/chanutil/merge_test.go b/internal/chanutil/merge_test.go new file mode 100644 index 000000000..3e1e30407 --- /dev/null +++ b/internal/chanutil/merge_test.go @@ -0,0 +1,37 @@ +package chanutil + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMerge(t *testing.T) { + ch1, ch2, ch3 := make(chan int), make(chan int), make(chan int) + go func() { + for _, i := range []int{1, 2, 3} { + ch1 <- i + } + close(ch1) + }() + go func() { + for _, i := range []int{4, 5, 6} { + ch2 <- i + } + close(ch2) + }() + go func() { + for _, i := range []int{7, 8, 9} { + ch3 <- i + } + close(ch3) + }() + out := Merge(ch1, ch2, ch3) + var tmp []int + for item := range out { + tmp = append(tmp, item) + } + sort.Ints(tmp) + assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9}, tmp) +} diff --git a/internal/fileutil/watcher.go b/internal/fileutil/watcher.go index d2acf7c2f..c1f6ee34e 100644 --- a/internal/fileutil/watcher.go +++ b/internal/fileutil/watcher.go @@ -4,9 +4,11 @@ import ( "context" "sync" - "github.com/rjeczalik/notify" + "github.com/fsnotify/fsnotify" "github.com/rs/zerolog" + "namespacelabs.dev/go-filenotify" + "github.com/pomerium/pomerium/internal/chanutil" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/signal" ) @@ -14,15 +16,18 @@ import ( // A Watcher watches files for changes. type Watcher struct { *signal.Signal - mu sync.Mutex - filePaths map[string]chan notify.EventInfo + + mu sync.Mutex + watching map[string]struct{} + eventWatcher filenotify.FileWatcher + pollingWatcher filenotify.FileWatcher } // NewWatcher creates a new Watcher. func NewWatcher() *Watcher { return &Watcher{ - Signal: signal.New(), - filePaths: map[string]chan notify.EventInfo{}, + Signal: signal.New(), + watching: make(map[string]struct{}), } } @@ -31,32 +36,27 @@ func (watcher *Watcher) Add(filePath string) { watcher.mu.Lock() defer watcher.mu.Unlock() - ctx := log.WithContext(context.TODO(), func(c zerolog.Context) zerolog.Context { + // already watching + if _, ok := watcher.watching[filePath]; ok { + return + } + + ctx := log.WithContext(context.Background(), func(c zerolog.Context) zerolog.Context { return c.Str("watch_file", filePath) }) + watcher.initLocked(ctx) - // already watching - if _, ok := watcher.filePaths[filePath]; ok { - return - } - - ch := make(chan notify.EventInfo, 1) - go func() { - for evt := range ch { - log.Info(ctx).Str("event", evt.Event().String()).Msg("filemgr: detected file change") - watcher.Signal.Broadcast(ctx) + if watcher.eventWatcher != nil { + if err := watcher.eventWatcher.Add(filePath); err != nil { + log.Error(ctx).Msg("fileutil/watcher: failed to watch file with event-based file watcher") } - }() - err := notify.Watch(filePath, ch, notify.All) - if err != nil { - log.Error(ctx).Err(err).Msg("filemgr: error watching file path") - notify.Stop(ch) - close(ch) - return } - log.Debug(ctx).Msg("filemgr: watching file for changes") - watcher.filePaths[filePath] = ch + if watcher.pollingWatcher != nil { + if err := watcher.pollingWatcher.Add(filePath); err != nil { + log.Error(ctx).Msg("fileutil/watcher: failed to watch file with polling-based file watcher") + } + } } // Clear removes all watches. @@ -64,9 +64,57 @@ func (watcher *Watcher) Clear() { watcher.mu.Lock() defer watcher.mu.Unlock() - for filePath, ch := range watcher.filePaths { - notify.Stop(ch) - close(ch) - delete(watcher.filePaths, filePath) + if w := watcher.eventWatcher; w != nil { + _ = watcher.pollingWatcher.Close() + watcher.eventWatcher = nil } + + if w := watcher.pollingWatcher; w != nil { + _ = watcher.pollingWatcher.Close() + watcher.pollingWatcher = nil + } + + watcher.watching = make(map[string]struct{}) +} + +func (watcher *Watcher) initLocked(ctx context.Context) { + if watcher.eventWatcher != nil || watcher.pollingWatcher != nil { + return + } + + if watcher.eventWatcher == nil { + var err error + watcher.eventWatcher, err = filenotify.NewEventWatcher() + if err != nil { + log.Error(ctx).Msg("fileutil/watcher: failed to create event-based file watcher") + } + } + if watcher.pollingWatcher == nil { + watcher.pollingWatcher = filenotify.NewPollingWatcher(nil) + } + + var errors <-chan error = watcher.pollingWatcher.Errors() //nolint + var events <-chan fsnotify.Event = watcher.pollingWatcher.Events() //nolint + + if watcher.eventWatcher != nil { + errors = chanutil.Merge(errors, watcher.eventWatcher.Errors()) + events = chanutil.Merge(events, watcher.eventWatcher.Events()) + } + + // log errors + go func() { + for err := range errors { + log.Error(ctx).Err(err).Msg("fileutil/watcher: file notification error") + } + }() + + // handle events + go func() { + for evts := range chanutil.Batch(events) { + for _, evt := range evts { + log.Info(ctx).Str("name", evt.Name).Str("op", evt.Op.String()).Msg("fileutil/watcher: file notification event") + } + watcher.Broadcast(ctx) + } + }() } diff --git a/internal/fileutil/watcher_test.go b/internal/fileutil/watcher_test.go index 1319ecc48..ec6017b9f 100644 --- a/internal/fileutil/watcher_test.go +++ b/internal/fileutil/watcher_test.go @@ -23,6 +23,7 @@ func TestWatcher(t *testing.T) { } w := NewWatcher() + defer w.Clear() w.Add(filepath.Join(tmpdir, "test1.txt")) ch := w.Bind() @@ -39,3 +40,50 @@ func TestWatcher(t *testing.T) { t.Error("expected change signal when file is modified") } } + +func TestWatcherSymlink(t *testing.T) { + t.Parallel() + + tmpdir := filepath.Join(os.TempDir(), uuid.New().String()) + err := os.MkdirAll(tmpdir, 0o755) + if !assert.NoError(t, err) { + return + } + t.Cleanup(func() { os.RemoveAll(tmpdir) }) + + err = os.WriteFile(filepath.Join(tmpdir, "test1.txt"), []byte{1, 2, 3, 4}, 0o666) + if !assert.NoError(t, err) { + return + } + + err = os.WriteFile(filepath.Join(tmpdir, "test2.txt"), []byte{5, 6, 7, 8}, 0o666) + if !assert.NoError(t, err) { + return + } + + assert.NoError(t, os.Symlink(filepath.Join(tmpdir, "test1.txt"), filepath.Join(tmpdir, "symlink1.txt"))) + + w := NewWatcher() + defer w.Clear() + w.Add(filepath.Join(tmpdir, "symlink1.txt")) + + ch := w.Bind() + t.Cleanup(func() { w.Unbind(ch) }) + + assert.NoError(t, os.WriteFile(filepath.Join(tmpdir, "test1.txt"), []byte{9, 10, 11}, 0o666)) + + select { + case <-ch: + case <-time.After(time.Second): + t.Error("expected change signal when underlying file is modified") + } + + assert.NoError(t, os.Symlink(filepath.Join(tmpdir, "test2.txt"), filepath.Join(tmpdir, "symlink2.txt"))) + assert.NoError(t, os.Rename(filepath.Join(tmpdir, "symlink2.txt"), filepath.Join(tmpdir, "symlink1.txt"))) + + select { + case <-ch: + case <-time.After(10 * time.Second): + t.Error("expected change signal when symlink is changed") + } +}