testenv: embedded envoy cpu/memory profiling config (#5377)

This commit is contained in:
Joe Kralicky 2025-01-03 17:41:54 -05:00 committed by GitHub
parent 68764407b8
commit 8f36870650
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 241 additions and 85 deletions

View file

@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/pomerium/pomerium/internal/testenv" "github.com/pomerium/pomerium/internal/testenv"
"github.com/pomerium/pomerium/internal/testenv/envutil"
"github.com/pomerium/pomerium/internal/testenv/scenarios" "github.com/pomerium/pomerium/internal/testenv/scenarios"
"github.com/pomerium/pomerium/internal/testenv/snippets" "github.com/pomerium/pomerium/internal/testenv/snippets"
"github.com/pomerium/pomerium/internal/testenv/upstreams" "github.com/pomerium/pomerium/internal/testenv/upstreams"
@ -27,7 +28,7 @@ func init() {
} }
func TestRequestLatency(t *testing.T) { func TestRequestLatency(t *testing.T) {
resume := snippets.PauseProfiling(t) resume := envutil.PauseProfiling(t)
env := testenv.New(t, testenv.Silent()) env := testenv.New(t, testenv.Silent())
users := []*scenarios.User{} users := []*scenarios.User{}
for i := range numRoutes { for i := range numRoutes {

View file

@ -13,6 +13,7 @@ import (
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"errors" "errors"
"flag"
"fmt" "fmt"
"io" "io"
"math/big" "math/big"
@ -33,8 +34,10 @@ import (
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/config/envoyconfig/filemgr" "github.com/pomerium/pomerium/config/envoyconfig/filemgr"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/testenv/envutil"
"github.com/pomerium/pomerium/internal/testenv/values" "github.com/pomerium/pomerium/internal/testenv/values"
"github.com/pomerium/pomerium/pkg/cmd/pomerium" "github.com/pomerium/pomerium/pkg/cmd/pomerium"
"github.com/pomerium/pomerium/pkg/envoy"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/health" "github.com/pomerium/pomerium/pkg/health"
"github.com/pomerium/pomerium/pkg/netutil" "github.com/pomerium/pomerium/pkg/netutil"
@ -248,11 +251,21 @@ func Silent(silent ...bool) EnvironmentOption {
var setGrpcLoggerOnce sync.Once var setGrpcLoggerOnce sync.Once
var (
flagDebug = flag.Bool("env.debug", false, "enables test environment debug logging (equivalent to Debug() option)")
flagPauseOnFailure = flag.Bool("env.pause-on-failure", false, "enables pausing the test environment on failure (equivalent to PauseOnFailure() option)")
flagSilent = flag.Bool("env.silent", false, "suppresses all test environment output (equivalent to Silent() option)")
)
func New(t testing.TB, opts ...EnvironmentOption) Environment { func New(t testing.TB, opts ...EnvironmentOption) Environment {
if runtime.GOOS != "linux" { if runtime.GOOS != "linux" {
t.Skip("test environment only supported on linux") t.Skip("test environment only supported on linux")
} }
options := EnvironmentOptions{} options := EnvironmentOptions{
debug: *flagDebug,
pauseOnFailure: *flagPauseOnFailure,
forceSilent: *flagSilent,
}
options.apply(opts...) options.apply(opts...)
if testing.Short() { if testing.Short() {
t.Helper() t.Helper()
@ -348,6 +361,7 @@ func New(t testing.TB, opts ...EnvironmentOption) Environment {
} }
func (e *environment) debugf(format string, args ...any) { func (e *environment) debugf(format string, args ...any) {
e.t.Helper()
if !e.debug { if !e.debug {
return return
} }
@ -509,7 +523,49 @@ func (e *environment) Start() {
mod.Value.Modify(cfg) mod.Value.Modify(cfg)
require.NoError(e.t, cfg.Options.Validate(), "invoking modifier resulted in an invalid configuration:\nadded by: "+mod.Caller) require.NoError(e.t, cfg.Options.Validate(), "invoking modifier resulted in an invalid configuration:\nadded by: "+mod.Caller)
} }
return pomerium.Run(ctx, e.src, pomerium.WithOverrideFileManager(fileMgr))
opts := []pomerium.RunOption{
pomerium.WithOverrideFileManager(fileMgr),
}
envoyBinaryPath := filepath.Join(e.workspaceFolder, fmt.Sprintf("pkg/envoy/files/envoy-%s-%s", runtime.GOOS, runtime.GOARCH))
if envutil.EnvoyProfilerAvailable(envoyBinaryPath) {
e.debugf("envoy profiling available")
envVars := []string{}
pprofCmdLog := "=> go run github.com/google/pprof@latest -symbolize=local -ignore='TCMalloc|^tcmalloc::|^msync$|stacktrace_generic_fp' -http=: %s %s"
if path := envutil.ProfileOutputPath("cpuprofile"); path != "" {
dir, base := filepath.Split(path)
path = filepath.Join(dir, "envoy_"+base)
envVars = append(envVars, fmt.Sprintf("CPUPROFILE=%s", path))
e.t.Cleanup(func() {
e.debugf("View envoy cpu profile:")
e.debugf(pprofCmdLog, envoyBinaryPath, path)
})
}
if path := envutil.ProfileOutputPath("memprofile"); path != "" {
dir, base := filepath.Split(path)
path = filepath.Join(dir, "envoy_"+base)
envVars = append(envVars, fmt.Sprintf("HEAPPROFILE=%s", path))
e.t.Cleanup(func() {
if err := envutil.CollectEnvoyHeapProfiles(path); err != nil {
e.t.Logf("error collecting envoy heap profiles: %s", err)
}
e.debugf("View envoy heap profile:")
envoyBinaryPath := filepath.Join(e.workspaceFolder, fmt.Sprintf("pkg/envoy/files/envoy-%s-%s", runtime.GOOS, runtime.GOARCH))
e.debugf(pprofCmdLog, envoyBinaryPath, path)
})
}
if len(envVars) > 0 {
e.debugf("adding envoy env vars: %v\n", envVars)
opts = append(opts, pomerium.WithEnvoyServerOptions(
envoy.WithExtraEnvVars(envVars...),
envoy.WithExitGracePeriod(10*time.Second), // allow envoy time to flush pprof data to disk
))
}
} else {
e.debugf("envoy profiling not available")
}
return pomerium.Run(ctx, e.src, opts...)
})) }))
for i, task := range e.tasks { for i, task := range e.tasks {

View file

@ -0,0 +1,101 @@
package envutil
import (
"flag"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"runtime/pprof"
"sync"
"testing"
"github.com/stretchr/testify/require"
)
// PauseProfiling will suspend CPU and memory profiling, if started using the
// -cpuprofile and/or -memprofile test flags. The returned function will restart
// profiling when called. Existing CPU profile data is overwritten, but
// existing memory profile data is kept.
func PauseProfiling(t testing.TB) (resume func()) {
resumeFuncs := []func(){}
if filename := ProfileOutputPath("cpuprofile"); filename != "" {
if _, err := os.Stat(filename); err == nil {
pprof.StopCPUProfile()
t.Logf("pausing cpu profiling (%s)", filename)
resumeFuncs = append(resumeFuncs, func() {
t.Logf("resuming cpu profiling (%s)", filename)
f, err := os.Create(filename)
require.NoError(t, err)
require.NoError(t, pprof.StartCPUProfile(f))
})
}
}
if filename := ProfileOutputPath("memprofile"); filename != "" {
rate := runtime.MemProfileRate
runtime.MemProfileRate = 0
t.Log("pausing memory profiling")
resumeFuncs = append(resumeFuncs, func() {
t.Log("resuming memory profiling")
runtime.MemProfileRate = rate
})
}
return sync.OnceFunc(func() {
for _, f := range resumeFuncs {
f()
}
})
}
// Returns the file path set by the '-test.<name>profile' flag, or empty string
// if the flag was not set.
func ProfileOutputPath(name string) string {
outputdir := flag.Lookup("test.outputdir")
if f := flag.Lookup("test." + name); f != nil {
if filename := f.Value.String(); filename != "" {
if outputdir != nil {
filename = filepath.Join(outputdir.Value.String(), filename)
}
return filename
}
}
return ""
}
// Returns true if the envoy binary at the given path was compiled with
// gperftools profiler support.
func EnvoyProfilerAvailable(binary string) bool {
// There are a few symbols that will only show up if envoy is compiled with
// tcmalloc=gperftools. Specifically, symbols defined in these headers:
// https://github.com/gperftools/gperftools/tree/master/src/gperftools
// The symbols are not mangled, so pick one that is unlikely to be ambiguous
// or part of another function name.
err := exec.Command("/usr/bin/grep", "-q", "ProfilingIsEnabledForAllThreads", binary).Run()
return err == nil
}
func CollectEnvoyHeapProfiles(base string) error {
combined, err := os.OpenFile(base, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644)
if err != nil {
return err
}
defer combined.Close()
parts, _ := filepath.Glob(base + ".*.heap")
for _, part := range parts {
pf, err := os.Open(part)
if err != nil {
return err
}
if _, err := io.Copy(combined, pf); err != nil {
return err
}
_ = pf.Close()
if err := os.Remove(part); err != nil {
return err
}
}
return nil
}

View file

@ -1,54 +0,0 @@
package snippets
import (
"flag"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sync"
"testing"
"github.com/stretchr/testify/require"
)
// PauseProfiling will suspend CPU and memory profiling, if started using the
// -cpuprofile and/or -memprofile test flags. The returned function will restart
// profiling when called. Existing CPU profile data is overwritten, but
// existing memory profile data is kept.
func PauseProfiling(t testing.TB) (resume func()) {
resumeFuncs := []func(){}
outputdir := flag.Lookup("test.outputdir")
if f := flag.Lookup("test.cpuprofile"); f != nil {
filename := f.Value.String()
if outputdir != nil {
filename = filepath.Join(outputdir.Value.String(), filename)
}
if _, err := os.Stat(filename); err == nil {
pprof.StopCPUProfile()
t.Logf("pausing cpu profiling (%s)", filename)
resumeFuncs = append(resumeFuncs, func() {
t.Logf("resuming cpu profiling (%s)", filename)
f, err := os.Create(filename)
require.NoError(t, err)
require.NoError(t, pprof.StartCPUProfile(f))
})
}
}
if f := flag.Lookup("test.memprofile"); f != nil {
rate := runtime.MemProfileRate
runtime.MemProfileRate = 0
t.Log("pausing memory profiling")
resumeFuncs = append(resumeFuncs, func() {
t.Log("resuming memory profiling")
runtime.MemProfileRate = rate
})
}
return sync.OnceFunc(func() {
for _, f := range resumeFuncs {
f()
}
})
}

View file

@ -33,6 +33,7 @@ import (
type RunOptions struct { type RunOptions struct {
fileMgr *filemgr.Manager fileMgr *filemgr.Manager
envoyServerOptions []envoy.ServerOption
databrokerServerOptions []databroker_service.Option databrokerServerOptions []databroker_service.Option
} }
@ -50,6 +51,12 @@ func WithOverrideFileManager(fileMgr *filemgr.Manager) RunOption {
} }
} }
func WithEnvoyServerOptions(opts ...envoy.ServerOption) RunOption {
return func(o *RunOptions) {
o.envoyServerOptions = append(o.envoyServerOptions, opts...)
}
}
func WithDataBrokerServerOptions(opts ...databroker_service.Option) RunOption { func WithDataBrokerServerOptions(opts ...databroker_service.Option) RunOption {
return func(o *RunOptions) { return func(o *RunOptions) {
o.databrokerServerOptions = append(o.databrokerServerOptions, opts...) o.databrokerServerOptions = append(o.databrokerServerOptions, opts...)
@ -130,7 +137,7 @@ func Run(ctx context.Context, src config.Source, opts ...RunOption) error {
Msg("server started") Msg("server started")
// create envoy server // create envoy server
envoyServer, err := envoy.NewServer(ctx, src, controlPlane.Builder) envoyServer, err := envoy.NewServer(ctx, src, controlPlane.Builder, options.envoyServerOptions...)
if err != nil { if err != nil {
return fmt.Errorf("error creating envoy server: %w", err) return fmt.Errorf("error creating envoy server: %w", err)
} }

View file

@ -36,15 +36,12 @@ const (
configFileName = "envoy-config.yaml" configFileName = "envoy-config.yaml"
) )
type serverOptions struct {
services string
logLevel config.LogLevel
}
// A Server is a pomerium proxy implemented via envoy. // A Server is a pomerium proxy implemented via envoy.
type Server struct { type Server struct {
wd string ServerOptions
cmd *exec.Cmd wd string
cmd *exec.Cmd
cmdExited chan struct{}
builder *envoyconfig.Builder builder *envoyconfig.Builder
resourceMonitor ResourceMonitor resourceMonitor ResourceMonitor
@ -53,12 +50,40 @@ type Server struct {
monitorProcessCancel context.CancelFunc monitorProcessCancel context.CancelFunc
mu sync.Mutex mu sync.Mutex
options serverOptions }
type ServerOptions struct {
extraEnvVars []string
logLevel config.LogLevel
exitGracePeriod time.Duration
}
type ServerOption func(*ServerOptions)
func (o *ServerOptions) apply(opts ...ServerOption) {
for _, op := range opts {
op(o)
}
}
func WithExtraEnvVars(extraEnvVars ...string) ServerOption {
return func(o *ServerOptions) {
o.extraEnvVars = append(o.extraEnvVars, extraEnvVars...)
}
}
func WithExitGracePeriod(duration time.Duration) ServerOption {
return func(o *ServerOptions) {
o.exitGracePeriod = duration
}
} }
// NewServer creates a new server with traffic routed by envoy. // NewServer creates a new server with traffic routed by envoy.
func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Builder) (*Server, error) { func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Builder, opts ...ServerOption) (*Server, error) {
options := ServerOptions{}
options.apply(opts...)
if err := preserveRlimitNofile(); err != nil { if err := preserveRlimitNofile(); err != nil {
log.Ctx(ctx).Debug().Err(err).Msg("couldn't preserve RLIMIT_NOFILE before starting Envoy") log.Ctx(ctx).Debug().Err(err).Msg("couldn't preserve RLIMIT_NOFILE before starting Envoy")
} }
@ -69,11 +94,12 @@ func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Buil
} }
srv := &Server{ srv := &Server{
wd: path.Dir(envoyPath), ServerOptions: options,
builder: builder, wd: path.Dir(envoyPath),
grpcPort: src.GetConfig().GRPCPort, builder: builder,
httpPort: src.GetConfig().HTTPPort, grpcPort: src.GetConfig().GRPCPort,
envoyPath: envoyPath, httpPort: src.GetConfig().HTTPPort,
envoyPath: envoyPath,
monitorProcessCancel: func() {}, monitorProcessCancel: func() {},
} }
@ -105,10 +131,24 @@ func (srv *Server) Close() error {
var err error var err error
if srv.cmd != nil && srv.cmd.Process != nil { if srv.cmd != nil && srv.cmd.Process != nil {
err = srv.cmd.Process.Kill() var exited bool
if err != nil { if srv.exitGracePeriod > 0 {
log.Error().Err(err).Str("service", "envoy").Msg("envoy: failed to kill process on close") _ = srv.cmd.Process.Signal(os.Interrupt)
select {
case <-srv.cmdExited:
exited = true
case <-time.After(srv.exitGracePeriod):
}
} }
if !exited {
err = srv.cmd.Process.Kill()
if err != nil {
log.Error().Err(err).Str("service", "envoy").Msg("envoy: failed to kill process on close")
} else {
<-srv.cmdExited
}
}
srv.cmd = nil srv.cmd = nil
} }
@ -123,16 +163,15 @@ func (srv *Server) update(ctx context.Context, cfg *config.Config) {
srv.mu.Lock() srv.mu.Lock()
defer srv.mu.Unlock() defer srv.mu.Unlock()
options := serverOptions{ opts := srv.ServerOptions
services: cfg.Options.Services, // log level is managed via config
logLevel: firstNonEmpty(cfg.Options.ProxyLogLevel, cfg.Options.LogLevel, config.LogLevelDebug), opts.logLevel = firstNonEmpty(cfg.Options.ProxyLogLevel, cfg.Options.LogLevel, config.LogLevelDebug)
}
if cmp.Equal(srv.options, options, cmp.AllowUnexported(serverOptions{})) { if cmp.Equal(srv.ServerOptions, opts, cmp.AllowUnexported(ServerOptions{})) {
log.Ctx(ctx).Debug().Str("service", "envoy").Msg("envoy: no config changes detected") log.Ctx(ctx).Debug().Str("service", "envoy").Msg("envoy: no config changes detected")
return return
} }
srv.options = options srv.ServerOptions = opts
log.Ctx(ctx).Debug().Msg("envoy: starting envoy process") log.Ctx(ctx).Debug().Msg("envoy: starting envoy process")
if err := srv.run(ctx, cfg); err != nil { if err := srv.run(ctx, cfg); err != nil {
@ -152,7 +191,7 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error {
args := []string{ args := []string{
"-c", configFileName, "-c", configFileName,
"--log-level", srv.options.logLevel.ToEnvoy(), "--log-level", srv.logLevel.ToEnvoy(),
"--log-format", "[LOG_FORMAT]%l--%n--%v", "--log-format", "[LOG_FORMAT]%l--%n--%v",
"--log-format-escaped", "--log-format-escaped",
} }
@ -160,6 +199,7 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error {
exePath, args := srv.prepareRunEnvoyCommand(ctx, args) exePath, args := srv.prepareRunEnvoyCommand(ctx, args)
cmd := exec.Command(exePath, args...) cmd := exec.Command(exePath, args...)
cmd.Dir = srv.wd cmd.Dir = srv.wd
cmd.Env = append(cmd.Env, srv.extraEnvVars...)
stderr, err := cmd.StderrPipe() stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
@ -181,7 +221,11 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error {
return fmt.Errorf("error starting envoy: %w", err) return fmt.Errorf("error starting envoy: %w", err)
} }
// call Wait to avoid zombie processes // call Wait to avoid zombie processes
go func() { _ = cmd.Wait() }() exited := make(chan struct{})
go func() {
defer close(exited)
_ = cmd.Wait()
}()
// monitor the process so we exit if it prematurely exits // monitor the process so we exit if it prematurely exits
var monitorProcessCtx context.Context var monitorProcessCtx context.Context
@ -220,6 +264,7 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error {
}() }()
} }
srv.cmd = cmd srv.cmd = cmd
srv.cmdExited = exited
return nil return nil
} }

View file

@ -19,7 +19,7 @@ import (
const ( const (
ownerRX = os.FileMode(0o500) ownerRX = os.FileMode(0o500)
maxExpandedEnvoySize = 1 << 30 maxExpandedEnvoySize = 2 * 1024 * 1024 * 1024 // 2GB
envoyPrefix = "pomerium-envoy" envoyPrefix = "pomerium-envoy"
) )