diff --git a/internal/benchmarks/latency_bench_test.go b/internal/benchmarks/latency_bench_test.go index 519732cfd..e3de8d94c 100644 --- a/internal/benchmarks/latency_bench_test.go +++ b/internal/benchmarks/latency_bench_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/pomerium/pomerium/internal/testenv" + "github.com/pomerium/pomerium/internal/testenv/envutil" "github.com/pomerium/pomerium/internal/testenv/scenarios" "github.com/pomerium/pomerium/internal/testenv/snippets" "github.com/pomerium/pomerium/internal/testenv/upstreams" @@ -27,7 +28,7 @@ func init() { } func TestRequestLatency(t *testing.T) { - resume := snippets.PauseProfiling(t) + resume := envutil.PauseProfiling(t) env := testenv.New(t, testenv.Silent()) users := []*scenarios.User{} for i := range numRoutes { diff --git a/internal/testenv/environment.go b/internal/testenv/environment.go index d5d8f88e1..beef11ec2 100644 --- a/internal/testenv/environment.go +++ b/internal/testenv/environment.go @@ -13,6 +13,7 @@ import ( "encoding/base64" "encoding/hex" "errors" + "flag" "fmt" "io" "math/big" @@ -33,8 +34,10 @@ import ( "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config/envoyconfig/filemgr" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/testenv/envutil" "github.com/pomerium/pomerium/internal/testenv/values" "github.com/pomerium/pomerium/pkg/cmd/pomerium" + "github.com/pomerium/pomerium/pkg/envoy" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/health" "github.com/pomerium/pomerium/pkg/netutil" @@ -248,11 +251,21 @@ func Silent(silent ...bool) EnvironmentOption { var setGrpcLoggerOnce sync.Once +var ( + flagDebug = flag.Bool("env.debug", false, "enables test environment debug logging (equivalent to Debug() option)") + flagPauseOnFailure = flag.Bool("env.pause-on-failure", false, "enables pausing the test environment on failure (equivalent to PauseOnFailure() option)") + flagSilent = flag.Bool("env.silent", false, "suppresses all test environment output (equivalent to Silent() option)") +) + func New(t testing.TB, opts ...EnvironmentOption) Environment { if runtime.GOOS != "linux" { t.Skip("test environment only supported on linux") } - options := EnvironmentOptions{} + options := EnvironmentOptions{ + debug: *flagDebug, + pauseOnFailure: *flagPauseOnFailure, + forceSilent: *flagSilent, + } options.apply(opts...) if testing.Short() { t.Helper() @@ -348,6 +361,7 @@ func New(t testing.TB, opts ...EnvironmentOption) Environment { } func (e *environment) debugf(format string, args ...any) { + e.t.Helper() if !e.debug { return } @@ -509,7 +523,49 @@ func (e *environment) Start() { mod.Value.Modify(cfg) require.NoError(e.t, cfg.Options.Validate(), "invoking modifier resulted in an invalid configuration:\nadded by: "+mod.Caller) } - return pomerium.Run(ctx, e.src, pomerium.WithOverrideFileManager(fileMgr)) + + opts := []pomerium.RunOption{ + pomerium.WithOverrideFileManager(fileMgr), + } + envoyBinaryPath := filepath.Join(e.workspaceFolder, fmt.Sprintf("pkg/envoy/files/envoy-%s-%s", runtime.GOOS, runtime.GOARCH)) + if envutil.EnvoyProfilerAvailable(envoyBinaryPath) { + e.debugf("envoy profiling available") + envVars := []string{} + pprofCmdLog := "=> go run github.com/google/pprof@latest -symbolize=local -ignore='TCMalloc|^tcmalloc::|^msync$|stacktrace_generic_fp' -http=: %s %s" + if path := envutil.ProfileOutputPath("cpuprofile"); path != "" { + dir, base := filepath.Split(path) + path = filepath.Join(dir, "envoy_"+base) + envVars = append(envVars, fmt.Sprintf("CPUPROFILE=%s", path)) + e.t.Cleanup(func() { + e.debugf("View envoy cpu profile:") + e.debugf(pprofCmdLog, envoyBinaryPath, path) + }) + } + if path := envutil.ProfileOutputPath("memprofile"); path != "" { + dir, base := filepath.Split(path) + path = filepath.Join(dir, "envoy_"+base) + envVars = append(envVars, fmt.Sprintf("HEAPPROFILE=%s", path)) + e.t.Cleanup(func() { + if err := envutil.CollectEnvoyHeapProfiles(path); err != nil { + e.t.Logf("error collecting envoy heap profiles: %s", err) + } + e.debugf("View envoy heap profile:") + envoyBinaryPath := filepath.Join(e.workspaceFolder, fmt.Sprintf("pkg/envoy/files/envoy-%s-%s", runtime.GOOS, runtime.GOARCH)) + e.debugf(pprofCmdLog, envoyBinaryPath, path) + }) + } + if len(envVars) > 0 { + e.debugf("adding envoy env vars: %v\n", envVars) + opts = append(opts, pomerium.WithEnvoyServerOptions( + envoy.WithExtraEnvVars(envVars...), + envoy.WithExitGracePeriod(10*time.Second), // allow envoy time to flush pprof data to disk + )) + } + } else { + e.debugf("envoy profiling not available") + } + + return pomerium.Run(ctx, e.src, opts...) })) for i, task := range e.tasks { diff --git a/internal/testenv/envutil/pprof.go b/internal/testenv/envutil/pprof.go new file mode 100644 index 000000000..6ef653196 --- /dev/null +++ b/internal/testenv/envutil/pprof.go @@ -0,0 +1,101 @@ +package envutil + +import ( + "flag" + "io" + "os" + "os/exec" + "path/filepath" + "runtime" + "runtime/pprof" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +// PauseProfiling will suspend CPU and memory profiling, if started using the +// -cpuprofile and/or -memprofile test flags. The returned function will restart +// profiling when called. Existing CPU profile data is overwritten, but +// existing memory profile data is kept. +func PauseProfiling(t testing.TB) (resume func()) { + resumeFuncs := []func(){} + + if filename := ProfileOutputPath("cpuprofile"); filename != "" { + if _, err := os.Stat(filename); err == nil { + pprof.StopCPUProfile() + t.Logf("pausing cpu profiling (%s)", filename) + resumeFuncs = append(resumeFuncs, func() { + t.Logf("resuming cpu profiling (%s)", filename) + f, err := os.Create(filename) + require.NoError(t, err) + require.NoError(t, pprof.StartCPUProfile(f)) + }) + } + } + + if filename := ProfileOutputPath("memprofile"); filename != "" { + rate := runtime.MemProfileRate + runtime.MemProfileRate = 0 + t.Log("pausing memory profiling") + resumeFuncs = append(resumeFuncs, func() { + t.Log("resuming memory profiling") + runtime.MemProfileRate = rate + }) + } + return sync.OnceFunc(func() { + for _, f := range resumeFuncs { + f() + } + }) +} + +// Returns the file path set by the '-test.profile' flag, or empty string +// if the flag was not set. +func ProfileOutputPath(name string) string { + outputdir := flag.Lookup("test.outputdir") + if f := flag.Lookup("test." + name); f != nil { + if filename := f.Value.String(); filename != "" { + if outputdir != nil { + filename = filepath.Join(outputdir.Value.String(), filename) + } + return filename + } + } + return "" +} + +// Returns true if the envoy binary at the given path was compiled with +// gperftools profiler support. +func EnvoyProfilerAvailable(binary string) bool { + // There are a few symbols that will only show up if envoy is compiled with + // tcmalloc=gperftools. Specifically, symbols defined in these headers: + // https://github.com/gperftools/gperftools/tree/master/src/gperftools + // The symbols are not mangled, so pick one that is unlikely to be ambiguous + // or part of another function name. + err := exec.Command("/usr/bin/grep", "-q", "ProfilingIsEnabledForAllThreads", binary).Run() + return err == nil +} + +func CollectEnvoyHeapProfiles(base string) error { + combined, err := os.OpenFile(base, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644) + if err != nil { + return err + } + defer combined.Close() + parts, _ := filepath.Glob(base + ".*.heap") + for _, part := range parts { + pf, err := os.Open(part) + if err != nil { + return err + } + if _, err := io.Copy(combined, pf); err != nil { + return err + } + _ = pf.Close() + if err := os.Remove(part); err != nil { + return err + } + } + return nil +} diff --git a/internal/testenv/snippets/pprof.go b/internal/testenv/snippets/pprof.go deleted file mode 100644 index 04a020b3e..000000000 --- a/internal/testenv/snippets/pprof.go +++ /dev/null @@ -1,54 +0,0 @@ -package snippets - -import ( - "flag" - "os" - "path/filepath" - "runtime" - "runtime/pprof" - "sync" - "testing" - - "github.com/stretchr/testify/require" -) - -// PauseProfiling will suspend CPU and memory profiling, if started using the -// -cpuprofile and/or -memprofile test flags. The returned function will restart -// profiling when called. Existing CPU profile data is overwritten, but -// existing memory profile data is kept. -func PauseProfiling(t testing.TB) (resume func()) { - resumeFuncs := []func(){} - - outputdir := flag.Lookup("test.outputdir") - if f := flag.Lookup("test.cpuprofile"); f != nil { - filename := f.Value.String() - if outputdir != nil { - filename = filepath.Join(outputdir.Value.String(), filename) - } - if _, err := os.Stat(filename); err == nil { - pprof.StopCPUProfile() - t.Logf("pausing cpu profiling (%s)", filename) - resumeFuncs = append(resumeFuncs, func() { - t.Logf("resuming cpu profiling (%s)", filename) - f, err := os.Create(filename) - require.NoError(t, err) - require.NoError(t, pprof.StartCPUProfile(f)) - }) - } - } - - if f := flag.Lookup("test.memprofile"); f != nil { - rate := runtime.MemProfileRate - runtime.MemProfileRate = 0 - t.Log("pausing memory profiling") - resumeFuncs = append(resumeFuncs, func() { - t.Log("resuming memory profiling") - runtime.MemProfileRate = rate - }) - } - return sync.OnceFunc(func() { - for _, f := range resumeFuncs { - f() - } - }) -} diff --git a/pkg/cmd/pomerium/pomerium.go b/pkg/cmd/pomerium/pomerium.go index de3851fe6..1014d247e 100644 --- a/pkg/cmd/pomerium/pomerium.go +++ b/pkg/cmd/pomerium/pomerium.go @@ -33,6 +33,7 @@ import ( type RunOptions struct { fileMgr *filemgr.Manager + envoyServerOptions []envoy.ServerOption databrokerServerOptions []databroker_service.Option } @@ -50,6 +51,12 @@ func WithOverrideFileManager(fileMgr *filemgr.Manager) RunOption { } } +func WithEnvoyServerOptions(opts ...envoy.ServerOption) RunOption { + return func(o *RunOptions) { + o.envoyServerOptions = append(o.envoyServerOptions, opts...) + } +} + func WithDataBrokerServerOptions(opts ...databroker_service.Option) RunOption { return func(o *RunOptions) { o.databrokerServerOptions = append(o.databrokerServerOptions, opts...) @@ -130,7 +137,7 @@ func Run(ctx context.Context, src config.Source, opts ...RunOption) error { Msg("server started") // create envoy server - envoyServer, err := envoy.NewServer(ctx, src, controlPlane.Builder) + envoyServer, err := envoy.NewServer(ctx, src, controlPlane.Builder, options.envoyServerOptions...) if err != nil { return fmt.Errorf("error creating envoy server: %w", err) } diff --git a/pkg/envoy/envoy.go b/pkg/envoy/envoy.go index 66cf71ae5..78c80a308 100644 --- a/pkg/envoy/envoy.go +++ b/pkg/envoy/envoy.go @@ -36,15 +36,12 @@ const ( configFileName = "envoy-config.yaml" ) -type serverOptions struct { - services string - logLevel config.LogLevel -} - // A Server is a pomerium proxy implemented via envoy. type Server struct { - wd string - cmd *exec.Cmd + ServerOptions + wd string + cmd *exec.Cmd + cmdExited chan struct{} builder *envoyconfig.Builder resourceMonitor ResourceMonitor @@ -53,12 +50,40 @@ type Server struct { monitorProcessCancel context.CancelFunc - mu sync.Mutex - options serverOptions + mu sync.Mutex +} + +type ServerOptions struct { + extraEnvVars []string + logLevel config.LogLevel + exitGracePeriod time.Duration +} + +type ServerOption func(*ServerOptions) + +func (o *ServerOptions) apply(opts ...ServerOption) { + for _, op := range opts { + op(o) + } +} + +func WithExtraEnvVars(extraEnvVars ...string) ServerOption { + return func(o *ServerOptions) { + o.extraEnvVars = append(o.extraEnvVars, extraEnvVars...) + } +} + +func WithExitGracePeriod(duration time.Duration) ServerOption { + return func(o *ServerOptions) { + o.exitGracePeriod = duration + } } // NewServer creates a new server with traffic routed by envoy. -func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Builder) (*Server, error) { +func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Builder, opts ...ServerOption) (*Server, error) { + options := ServerOptions{} + options.apply(opts...) + if err := preserveRlimitNofile(); err != nil { log.Ctx(ctx).Debug().Err(err).Msg("couldn't preserve RLIMIT_NOFILE before starting Envoy") } @@ -69,11 +94,12 @@ func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Buil } srv := &Server{ - wd: path.Dir(envoyPath), - builder: builder, - grpcPort: src.GetConfig().GRPCPort, - httpPort: src.GetConfig().HTTPPort, - envoyPath: envoyPath, + ServerOptions: options, + wd: path.Dir(envoyPath), + builder: builder, + grpcPort: src.GetConfig().GRPCPort, + httpPort: src.GetConfig().HTTPPort, + envoyPath: envoyPath, monitorProcessCancel: func() {}, } @@ -105,10 +131,24 @@ func (srv *Server) Close() error { var err error if srv.cmd != nil && srv.cmd.Process != nil { - err = srv.cmd.Process.Kill() - if err != nil { - log.Error().Err(err).Str("service", "envoy").Msg("envoy: failed to kill process on close") + var exited bool + if srv.exitGracePeriod > 0 { + _ = srv.cmd.Process.Signal(os.Interrupt) + select { + case <-srv.cmdExited: + exited = true + case <-time.After(srv.exitGracePeriod): + } } + if !exited { + err = srv.cmd.Process.Kill() + if err != nil { + log.Error().Err(err).Str("service", "envoy").Msg("envoy: failed to kill process on close") + } else { + <-srv.cmdExited + } + } + srv.cmd = nil } @@ -123,16 +163,15 @@ func (srv *Server) update(ctx context.Context, cfg *config.Config) { srv.mu.Lock() defer srv.mu.Unlock() - options := serverOptions{ - services: cfg.Options.Services, - logLevel: firstNonEmpty(cfg.Options.ProxyLogLevel, cfg.Options.LogLevel, config.LogLevelDebug), - } + opts := srv.ServerOptions + // log level is managed via config + opts.logLevel = firstNonEmpty(cfg.Options.ProxyLogLevel, cfg.Options.LogLevel, config.LogLevelDebug) - if cmp.Equal(srv.options, options, cmp.AllowUnexported(serverOptions{})) { + if cmp.Equal(srv.ServerOptions, opts, cmp.AllowUnexported(ServerOptions{})) { log.Ctx(ctx).Debug().Str("service", "envoy").Msg("envoy: no config changes detected") return } - srv.options = options + srv.ServerOptions = opts log.Ctx(ctx).Debug().Msg("envoy: starting envoy process") if err := srv.run(ctx, cfg); err != nil { @@ -152,7 +191,7 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error { args := []string{ "-c", configFileName, - "--log-level", srv.options.logLevel.ToEnvoy(), + "--log-level", srv.logLevel.ToEnvoy(), "--log-format", "[LOG_FORMAT]%l--%n--%v", "--log-format-escaped", } @@ -160,6 +199,7 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error { exePath, args := srv.prepareRunEnvoyCommand(ctx, args) cmd := exec.Command(exePath, args...) cmd.Dir = srv.wd + cmd.Env = append(cmd.Env, srv.extraEnvVars...) stderr, err := cmd.StderrPipe() if err != nil { @@ -181,7 +221,11 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error { return fmt.Errorf("error starting envoy: %w", err) } // call Wait to avoid zombie processes - go func() { _ = cmd.Wait() }() + exited := make(chan struct{}) + go func() { + defer close(exited) + _ = cmd.Wait() + }() // monitor the process so we exit if it prematurely exits var monitorProcessCtx context.Context @@ -220,6 +264,7 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error { }() } srv.cmd = cmd + srv.cmdExited = exited return nil } diff --git a/pkg/envoy/extract.go b/pkg/envoy/extract.go index 3e191c960..0645feada 100644 --- a/pkg/envoy/extract.go +++ b/pkg/envoy/extract.go @@ -19,7 +19,7 @@ import ( const ( ownerRX = os.FileMode(0o500) - maxExpandedEnvoySize = 1 << 30 + maxExpandedEnvoySize = 2 * 1024 * 1024 * 1024 // 2GB envoyPrefix = "pomerium-envoy" )