diff --git a/config/runtime_flags.go b/config/runtime_flags.go index 957e048f6..85b4af1ba 100644 --- a/config/runtime_flags.go +++ b/config/runtime_flags.go @@ -11,6 +11,8 @@ var ( // RuntimeFlagLegacyIdentityManager enables the legacy identity manager RuntimeFlagLegacyIdentityManager = runtimeFlag("legacy_identity_manager", false) + + RuntimeFlagEnvoyResourceManagerEnabled = runtimeFlag("envoy_resource_manager_enabled", true) ) // RuntimeFlag is a runtime flag that can flip on/off certain features diff --git a/go.mod b/go.mod index 54a32bf6f..948f403fc 100644 --- a/go.mod +++ b/go.mod @@ -79,6 +79,7 @@ require ( golang.org/x/net v0.24.0 golang.org/x/oauth2 v0.19.0 golang.org/x/sync v0.7.0 + golang.org/x/sys v0.20.0 golang.org/x/time v0.5.0 google.golang.org/api v0.177.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 @@ -209,7 +210,6 @@ require ( go.opentelemetry.io/proto/otlp v1.2.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.14.0 // indirect - golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.15.0 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect diff --git a/go.sum b/go.sum index d2a1b7b1f..34821e823 100644 --- a/go.sum +++ b/go.sum @@ -930,8 +930,9 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/internal/telemetry/metrics/const.go b/internal/telemetry/metrics/const.go index b7230e1c5..248e0d0a0 100644 --- a/internal/telemetry/metrics/const.go +++ b/internal/telemetry/metrics/const.go @@ -18,6 +18,9 @@ var ( TagKeyStorageOperation = tag.MustNewKey("operation") TagKeyStorageResult = tag.MustNewKey("result") TagKeyStorageBackend = tag.MustNewKey("backend") + + TagKeyCgroup = tag.MustNewKey("cgroup") + TagKeyActionName = tag.MustNewKey("action_name") ) // Default distributions used by views in this package. @@ -44,5 +47,6 @@ var ( HTTPServerViews, InfoViews, StorageViews, + EnvoyViews, } ) diff --git a/internal/telemetry/metrics/envoy.go b/internal/telemetry/metrics/envoy.go new file mode 100644 index 000000000..1961bc933 --- /dev/null +++ b/internal/telemetry/metrics/envoy.go @@ -0,0 +1,104 @@ +package metrics + +import ( + "context" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/pkg/metrics" +) + +var ( + EnvoyViews = []*view.View{ + EnvoyOverloadActionStateView, + EnvoyOverloadActionThresholdView, + EnvoyCgroupMemorySaturationView, + } + + EnvoyOverloadActionState = stats.Int64( + metrics.EnvoyOverloadActionState, + "Current state of envoy overload actions by cgroup", + stats.UnitDimensionless, + ) + + EnvoyOverloadActionThreshold = stats.Float64( + metrics.EnvoyOverloadActionThreshold, + "Injected memory usage minimum thresholds for envoy overload actions", + stats.UnitDimensionless, + ) + + EnvoyCgroupMemorySaturation = stats.Float64( + metrics.EnvoyCgroupMemorySaturation, + "Memory usage percent (0.0-1.0) of the cgroup in which envoy is running", + stats.UnitDimensionless, + ) + + EnvoyOverloadActionStateView = &view.View{ + Name: EnvoyOverloadActionState.Name(), + Description: EnvoyOverloadActionState.Description(), + TagKeys: []tag.Key{TagKeyCgroup, TagKeyActionName}, + Measure: EnvoyOverloadActionState, + Aggregation: view.LastValue(), + } + + EnvoyOverloadActionThresholdView = &view.View{ + Name: EnvoyOverloadActionThreshold.Name(), + Description: EnvoyOverloadActionThreshold.Description(), + TagKeys: []tag.Key{TagKeyActionName}, + Measure: EnvoyOverloadActionThreshold, + Aggregation: view.LastValue(), + } + + EnvoyCgroupMemorySaturationView = &view.View{ + Name: EnvoyCgroupMemorySaturation.Name(), + Description: EnvoyCgroupMemorySaturation.Description(), + TagKeys: []tag.Key{TagKeyCgroup}, + Measure: EnvoyCgroupMemorySaturation, + Aggregation: view.LastValue(), + } +) + +type EnvoyOverloadActionStateTags struct { + Cgroup string + ActionName string +} + +func RecordEnvoyOverloadActionState(ctx context.Context, tags EnvoyOverloadActionStateTags, state int64) { + err := stats.RecordWithTags(ctx, + []tag.Mutator{ + tag.Upsert(TagKeyCgroup, tags.Cgroup), + tag.Upsert(TagKeyActionName, tags.ActionName), + }, + EnvoyOverloadActionState.M(state), + ) + if err != nil { + log.Warn(ctx).Err(err).Msg("internal/telemetry/metrics: failed to record") + } +} + +func RecordEnvoyOverloadActionThreshold(ctx context.Context, actionName string, threshold float64) { + err := stats.RecordWithTags(ctx, + []tag.Mutator{ + tag.Upsert(TagKeyActionName, actionName), + }, + EnvoyOverloadActionThreshold.M(threshold), + ) + if err != nil { + log.Warn(ctx).Err(err).Msg("internal/telemetry/metrics: failed to record") + } +} + +func RecordEnvoyCgroupMemorySaturation(ctx context.Context, cgroup string, percent float64) { + err := stats.RecordWithTags(ctx, + []tag.Mutator{ + tag.Upsert(TagKeyCgroup, cgroup), + }, + EnvoyCgroupMemorySaturation.M(percent), + ) + if err != nil { + log.Warn(ctx).Err(err).Msg("internal/telemetry/metrics: failed to record") + } +} diff --git a/pkg/envoy/envoy.go b/pkg/envoy/envoy.go index 62f2d34c9..e2be26cde 100644 --- a/pkg/envoy/envoy.go +++ b/pkg/envoy/envoy.go @@ -47,6 +47,7 @@ type Server struct { cmd *exec.Cmd builder *envoyconfig.Builder + resourceMonitor ResourceMonitor grpcPort, httpPort string envoyPath string @@ -78,6 +79,12 @@ func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Buil } go srv.runProcessCollector(ctx) + if rm, err := NewSharedResourceMonitor(ctx, src, srv.wd); err == nil { + srv.resourceMonitor = rm + } else { + log.Error(ctx).Err(err).Str("service", "envoy").Msg("not starting resource monitor") + } + src.OnConfigChange(ctx, srv.onConfigChange) srv.onConfigChange(ctx, src.GetConfig()) @@ -181,6 +188,19 @@ func (srv *Server) run(ctx context.Context, cfg *config.Config) error { monitorProcessCtx, srv.monitorProcessCancel = context.WithCancel(context.Background()) go srv.monitorProcess(monitorProcessCtx, int32(cmd.Process.Pid)) + if srv.resourceMonitor != nil { + log.Debug(ctx).Str("service", "envoy").Msg("starting resource monitor") + go func() { + err := srv.resourceMonitor.Run(ctx, cmd.Process.Pid) + if err != nil { + if errors.Is(err, context.Canceled) { + log.Debug(ctx).Err(err).Str("service", "envoy").Msg("resource monitor stopped") + } else { + log.Error(ctx).Err(err).Str("service", "envoy").Msg("resource monitor exited with error") + } + } + }() + } srv.cmd = cmd return nil @@ -203,6 +223,9 @@ func (srv *Server) buildBootstrapConfig(ctx context.Context, cfg *config.Config) if err != nil { return nil, err } + if srv.resourceMonitor != nil { + srv.resourceMonitor.ApplyBootstrapConfig(bootstrapCfg) + } jsonBytes, err := protojson.Marshal(bootstrapCfg) if err != nil { diff --git a/pkg/envoy/resource_monitor.go b/pkg/envoy/resource_monitor.go new file mode 100644 index 000000000..eb837f1dc --- /dev/null +++ b/pkg/envoy/resource_monitor.go @@ -0,0 +1,12 @@ +package envoy + +import ( + "context" + + envoy_config_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3" +) + +type ResourceMonitor interface { + Run(ctx context.Context, envoyPid int) error + ApplyBootstrapConfig(bootstrap *envoy_config_bootstrap_v3.Bootstrap) +} diff --git a/pkg/envoy/resource_monitor_linux.go b/pkg/envoy/resource_monitor_linux.go new file mode 100644 index 000000000..57f22836f --- /dev/null +++ b/pkg/envoy/resource_monitor_linux.go @@ -0,0 +1,772 @@ +//go:build linux + +package envoy + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "slices" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + "unsafe" + + envoy_config_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3" + envoy_config_overload_v3 "github.com/envoyproxy/go-control-plane/envoy/config/overload/v3" + envoy_extensions_resource_monitors_injected_resource_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/resource_monitors/injected_resource/v3" + typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" + atomicfs "github.com/natefinch/atomic" + "golang.org/x/sys/unix" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/metrics" +) + +type CgroupFilePath int + +const ( + RootPath CgroupFilePath = iota + MemoryUsagePath + MemoryLimitPath +) + +type CgroupDriver interface { + CgroupForPid(pid int) (string, error) + Path(cgroup string, kind CgroupFilePath) string + Validate(cgroup string) error + MemoryUsage(cgroup string) (uint64, error) + MemoryLimit(cgroup string) (uint64, error) +} + +var ( + overloadActions = []struct { + ActionName string + Trigger *envoy_config_overload_v3.Trigger + }{ + // At 90%, envoy will shrink its heap every 10 seconds + // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/common/memory/heap_shrinker.cc + {"shrink_heap", memUsageThreshold(0.9)}, + + // At >85% memory usage, gradually start reducing timeouts, by up to 50%. + // https://www.envoyproxy.io/docs/envoy/latest/configuration/operations/overload_manager/overload_manager#reducing-timeouts + // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/server/overload_manager_impl.cc#L565-L572 + {"reduce_timeouts", memUsageScaled(0.85, 0.95)}, + + // At 90%, start resetting streams using the most memory. As memory usage + // increases, the eligibility threshold is reduced. + // https://www.envoyproxy.io/docs/envoy/latest/configuration/operations/overload_manager/overload_manager#reset-streams + // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/server/worker_impl.cc#L180 + {"reset_high_memory_stream", memUsageScaled(0.90, 0.98)}, + + // At 95%, stop accepting new connections, but keep existing ones open. + // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/server/worker_impl.cc#L168-L174 + {"stop_accepting_connections", memUsageThreshold(0.95)}, + + // At 98%, disable HTTP keepalive. This prevents new http/2 streams and + // ends all existing ones. + // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/common/http/conn_manager_impl.cc#L1735-L1755 + {"disable_http_keepalive", memUsageThreshold(0.98)}, + + // At 99%, drop all new requests. + // https://github.com/envoyproxy/envoy/blob/v1.30.1/source/common/http/conn_manager_impl.cc#L1203-L1225 + {"stop_accepting_requests", memUsageThreshold(0.99)}, + } + overloadActionConfigs = map[string]*anypb.Any{ + "reduce_timeouts": marshalAny(&envoy_config_overload_v3.ScaleTimersOverloadActionConfig{ + TimerScaleFactors: []*envoy_config_overload_v3.ScaleTimersOverloadActionConfig_ScaleTimer{ + { + Timer: envoy_config_overload_v3.ScaleTimersOverloadActionConfig_HTTP_DOWNSTREAM_CONNECTION_IDLE, + OverloadAdjust: &envoy_config_overload_v3.ScaleTimersOverloadActionConfig_ScaleTimer_MinScale{ + MinScale: &typev3.Percent{ + Value: 50, // reduce the idle timeout by 50% at most + }, + }, + }, + }, + }), + } + recordActionThresholdsOnce sync.Once + computedActionThresholds = make(map[string]float64) +) + +func init() { + for _, action := range overloadActions { + var minThreshold float64 + switch trigger := action.Trigger.TriggerOneof.(type) { + case *envoy_config_overload_v3.Trigger_Scaled: + minThreshold = trigger.Scaled.ScalingThreshold + case *envoy_config_overload_v3.Trigger_Threshold: + minThreshold = trigger.Threshold.Value + } + computedActionThresholds[action.ActionName] = minThreshold + } +} + +func recordActionThresholds() { + recordActionThresholdsOnce.Do(func() { + for name, minThreshold := range computedActionThresholds { + metrics.RecordEnvoyOverloadActionThreshold(context.Background(), name, minThreshold) + } + }) +} + +const ( + groupMemory = "memory" + + metricCgroupMemorySaturation = "cgroup_memory_saturation" +) + +type ResourceMonitorOptions struct { + driver CgroupDriver +} + +type ResourceMonitorOption func(*ResourceMonitorOptions) + +func (o *ResourceMonitorOptions) apply(opts ...ResourceMonitorOption) { + for _, op := range opts { + op(o) + } +} + +// WithCgroupDriver overrides the cgroup driver used for the resource monitor. +// If unset, it will be chosen automatically. +func WithCgroupDriver(driver CgroupDriver) ResourceMonitorOption { + return func(o *ResourceMonitorOptions) { + o.driver = driver + } +} + +// NewSharedResourceMonitor creates a new ResourceMonitor suitable for running +// envoy in the same cgroup as the parent process. It reports the cgroup's +// memory saturation to envoy as an injected resource. This allows envoy to +// react to actual memory pressure in the cgroup, taking into account memory +// usage from pomerium itself. +func NewSharedResourceMonitor(ctx context.Context, src config.Source, tempDir string, opts ...ResourceMonitorOption) (ResourceMonitor, error) { + options := ResourceMonitorOptions{} + options.apply(opts...) + if options.driver == nil { + var err error + options.driver, err = DetectCgroupDriver() + if err != nil { + return nil, err + } + } + recordActionThresholds() + + selfCgroup, err := options.driver.CgroupForPid(os.Getpid()) + if err != nil { + return nil, fmt.Errorf("failed to look up cgroup: %w", err) + } + if err := options.driver.Validate(selfCgroup); err != nil { + return nil, fmt.Errorf("cgroup not valid for resource monitoring: %w", err) + } + + if err := os.MkdirAll(filepath.Join(tempDir, "resource_monitor", groupMemory), 0o755); err != nil { + return nil, fmt.Errorf("failed to create resource monitor directory: %w", err) + } + + s := &sharedResourceMonitor{ + ResourceMonitorOptions: options, + cgroup: selfCgroup, + tempDir: filepath.Join(tempDir, "resource_monitor"), + } + readInitialConfig := make(chan struct{}) + src.OnConfigChange(ctx, func(ctx context.Context, c *config.Config) { + <-readInitialConfig + s.onConfigChange(ctx, c) + }) + s.onConfigChange(ctx, src.GetConfig()) + close(readInitialConfig) + + if err := s.writeMetricFile(groupMemory, metricCgroupMemorySaturation, "0", 0o644); err != nil { + return nil, fmt.Errorf("failed to initialize metrics: %w", err) + } + return s, nil +} + +type sharedResourceMonitor struct { + ResourceMonitorOptions + cgroup string + tempDir string + enabled atomic.Bool +} + +func (s *sharedResourceMonitor) onConfigChange(_ context.Context, cfg *config.Config) { + if cfg == nil || cfg.Options == nil { + s.enabled.Store(config.DefaultRuntimeFlags()[config.RuntimeFlagEnvoyResourceManagerEnabled]) + return + } + s.enabled.Store(cfg.Options.IsRuntimeFlagSet(config.RuntimeFlagEnvoyResourceManagerEnabled)) +} + +func (s *sharedResourceMonitor) metricFilename(group, name string) string { + return filepath.Join(s.tempDir, group, name) +} + +func memUsageScaled(scaling, saturation float64) *envoy_config_overload_v3.Trigger { + return &envoy_config_overload_v3.Trigger{ + Name: "envoy.resource_monitors.injected_resource", + TriggerOneof: &envoy_config_overload_v3.Trigger_Scaled{ + Scaled: &envoy_config_overload_v3.ScaledTrigger{ + ScalingThreshold: scaling, + SaturationThreshold: saturation, + }, + }, + } +} + +func memUsageThreshold(threshold float64) *envoy_config_overload_v3.Trigger { + return &envoy_config_overload_v3.Trigger{ + Name: "envoy.resource_monitors.injected_resource", + TriggerOneof: &envoy_config_overload_v3.Trigger_Threshold{ + Threshold: &envoy_config_overload_v3.ThresholdTrigger{ + Value: threshold, + }, + }, + } +} + +func (s *sharedResourceMonitor) ApplyBootstrapConfig(bootstrap *envoy_config_bootstrap_v3.Bootstrap) { + if bootstrap.OverloadManager == nil { + bootstrap.OverloadManager = &envoy_config_overload_v3.OverloadManager{} + } + + bootstrap.OverloadManager.ResourceMonitors = append(bootstrap.OverloadManager.ResourceMonitors, + &envoy_config_overload_v3.ResourceMonitor{ + Name: "envoy.resource_monitors.injected_resource", + ConfigType: &envoy_config_overload_v3.ResourceMonitor_TypedConfig{ + TypedConfig: marshalAny(&envoy_extensions_resource_monitors_injected_resource_v3.InjectedResourceConfig{ + Filename: s.metricFilename(groupMemory, metricCgroupMemorySaturation), + }), + }, + }, + ) + + for _, action := range overloadActions { + bootstrap.OverloadManager.Actions = append(bootstrap.OverloadManager.Actions, + &envoy_config_overload_v3.OverloadAction{ + Name: fmt.Sprintf("envoy.overload_actions.%s", action.ActionName), + Triggers: []*envoy_config_overload_v3.Trigger{action.Trigger}, + TypedConfig: overloadActionConfigs[action.ActionName], + }, + ) + } + + bootstrap.OverloadManager.BufferFactoryConfig = &envoy_config_overload_v3.BufferFactoryConfig{ + // https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/overload/v3/overload.proto#config-overload-v3-bufferfactoryconfig + MinimumAccountToTrackPowerOfTwo: 20, + } +} + +var ( + monitorInitialTickDelay = 1 * time.Second + monitorMaxTickInterval = 10 * time.Second + monitorMinTickInterval = 250 * time.Millisecond +) + +func (s *sharedResourceMonitor) Run(ctx context.Context, envoyPid int) error { + envoyCgroup, err := s.driver.CgroupForPid(envoyPid) + if err != nil { + return fmt.Errorf("failed to look up cgroup for envoy process: %w", err) + } + if envoyCgroup != s.cgroup { + return fmt.Errorf("envoy process is not in the expected cgroup: %s", envoyCgroup) + } + log.Info(ctx).Str("service", "envoy").Str("cgroup", s.cgroup).Msg("starting resource monitor") + + ctx, ca := context.WithCancelCause(ctx) + + limitWatcher := &memoryLimitWatcher{ + limitFilePath: filepath.Clean("/" + s.driver.Path(s.cgroup, MemoryLimitPath)), + } + + watcherExited := make(chan struct{}) + if err := limitWatcher.Watch(ctx); err != nil { + return fmt.Errorf("failed to start watch on cgroup memory limit: %w", err) + } + go func() { + limitWatcher.Wait() + ca(errors.New("memory limit watcher stopped")) + close(watcherExited) + }() + + // Set initial values for state metrics + s.updateActionStates(ctx, 0) + + // The interval at which we check memory usage is scaled based on the current + // memory saturation. When memory usage is low, we check less frequently, and + // as the saturation increases, we also increase the frequency of checks. Most + // of the thresholds at which some action is taken to reduce memory usage are + // very high (e.g. 95% of the limit). As memory usage approaches this limit, + // it becomes increasingly important to have accurate data, since memory usage + // can change rapidly; we want to avoid hitting the limit, but also delay + // taking disruptive actions for as long as possible. + + // the envoy default interval for the builtin heap monitor is 1s + + tick := time.NewTimer(monitorInitialTickDelay) + var lastValue string +LOOP: + for { + select { + case <-ctx.Done(): + tick.Stop() + break LOOP + case <-tick.C: + var saturation float64 + if s.enabled.Load() { + if limit := limitWatcher.Value(); limit > 0 { + usage, err := s.driver.MemoryUsage(s.cgroup) + if err != nil { + log.Error(ctx).Err(err).Msg("failed to get memory saturation") + continue + } + saturation = max(0.0, min(1.0, float64(usage)/float64(limit))) + } + } + + saturationStr := fmt.Sprintf("%.6f", saturation) + nextInterval := computeScaledTickInterval(saturation) + + if saturationStr != lastValue { + lastValue = saturationStr + if err := s.writeMetricFile(groupMemory, metricCgroupMemorySaturation, saturationStr, 0o644); err != nil { + log.Error(ctx).Err(err).Msg("failed to write metric file") + } + s.updateActionStates(ctx, saturation) + metrics.RecordEnvoyCgroupMemorySaturation(ctx, s.cgroup, saturation) + log.Debug(ctx). + Str("service", "envoy"). + Str("metric", metricCgroupMemorySaturation). + Str("value", saturationStr). + Dur("interval_ms", nextInterval). + Msg("updated metric") + } + + tick.Reset(nextInterval) + } + } + + <-watcherExited + return context.Cause(ctx) +} + +// Returns a value between monitorMinTickInterval and monitorMaxTickInterval, based +// on the given saturation value in the range [0.0, 1.0]. +func computeScaledTickInterval(saturation float64) time.Duration { + return monitorMaxTickInterval - (time.Duration(float64(monitorMaxTickInterval-monitorMinTickInterval) * max(0.0, min(1.0, saturation)))). + Round(time.Millisecond) +} + +func (s *sharedResourceMonitor) updateActionStates(ctx context.Context, pct float64) { + for name, minThreshold := range computedActionThresholds { + var state int64 + if pct >= minThreshold { + state = 1 + } + metrics.RecordEnvoyOverloadActionState(ctx, + metrics.EnvoyOverloadActionStateTags{ + Cgroup: s.cgroup, + ActionName: name, + }, + state, + ) + } +} + +func (s *sharedResourceMonitor) writeMetricFile(group, name, data string, mode fs.FileMode) error { + // Logic here is similar to atomic.WriteFile, but because envoy watches the + // parent directory for changes to any file, we write the temp file one level + // up before moving it into the watched location, to avoid triggering inotify + // events for the temp file. + f, err := os.CreateTemp(s.tempDir, name) + if err != nil { + return err + } + tempFilename := f.Name() + defer os.Remove(tempFilename) + defer f.Close() + if _, err := f.Write([]byte(data)); err != nil { + return err + } + if err := f.Sync(); err != nil { + return err + } + if err := f.Close(); err != nil { + return err + } + if err := os.Chmod(tempFilename, mode); err != nil { + return err + } + if err := atomicfs.ReplaceFile(tempFilename, filepath.Join(s.tempDir, group, name)); err != nil { + return err + } + return nil +} + +type cgroupV2Driver struct { + fs fs.FS + root string +} + +func (d *cgroupV2Driver) Path(cgroup string, kind CgroupFilePath) string { + switch kind { + case RootPath: + return d.root + case MemoryUsagePath: + return filepath.Join(d.root, cgroup, "memory.current") + case MemoryLimitPath: + return filepath.Join(d.root, cgroup, "memory.max") + } + return "" +} + +func (d *cgroupV2Driver) CgroupForPid(pid int) (string, error) { + data, err := fs.ReadFile(d.fs, fmt.Sprintf("proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + return parseCgroupName(data) +} + +// MemoryUsage implements CgroupDriver. +func (d *cgroupV2Driver) MemoryUsage(cgroup string) (uint64, error) { + current, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryUsagePath)) + if err != nil { + return 0, err + } + return strconv.ParseUint(strings.TrimSpace(string(current)), 10, 64) +} + +// MemoryLimit implements CgroupDriver. +func (d *cgroupV2Driver) MemoryLimit(cgroup string) (uint64, error) { + data, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryLimitPath)) + if err != nil { + return 0, err + } + max := strings.TrimSpace(string(data)) + if max == "max" { + return 0, nil + } + return strconv.ParseUint(max, 10, 64) +} + +// Validate implements CgroupDriver. +func (d *cgroupV2Driver) Validate(cgroup string) error { + if typ, err := fs.ReadFile(d.fs, filepath.Join(d.root, cgroup, "cgroup.type")); err != nil { + return err + } else if strings.TrimSpace(string(typ)) != "domain" { + return errors.New("not a domain cgroup") + } + + if controllers, err := d.enabledSubtreeControllers(cgroup); err != nil { + return err + } else if len(controllers) > 0 { + return errors.New("not a leaf cgroup") + } + + if controllers, err := d.enabledControllers(cgroup); err != nil { + return err + } else if !slices.Contains(controllers, "memory") { + return errors.New("memory controller not enabled") + } + + return nil +} + +func (d *cgroupV2Driver) enabledControllers(cgroup string) ([]string, error) { + data, err := fs.ReadFile(d.fs, filepath.Join(d.root, cgroup, "cgroup.controllers")) + if err != nil { + return nil, err + } + return strings.Fields(string(data)), nil +} + +func (d *cgroupV2Driver) enabledSubtreeControllers(cgroup string) ([]string, error) { + data, err := fs.ReadFile(d.fs, filepath.Join(d.root, cgroup, "cgroup.subtree_control")) + if err != nil { + return nil, err + } + return strings.Fields(string(data)), nil +} + +var _ CgroupDriver = (*cgroupV2Driver)(nil) + +type cgroupV1Driver struct { + fs fs.FS + root string +} + +func (d *cgroupV1Driver) Path(cgroup string, kind CgroupFilePath) string { + switch kind { + case RootPath: + return d.root + case MemoryUsagePath: + return filepath.Join(d.root, "memory", cgroup, "memory.usage_in_bytes") + case MemoryLimitPath: + return filepath.Join(d.root, "memory", cgroup, "memory.limit_in_bytes") + } + return "" +} + +func (d *cgroupV1Driver) CgroupForPid(pid int) (string, error) { + data, err := fs.ReadFile(d.fs, fmt.Sprintf("proc/%d/cgroup", pid)) + if err != nil { + return "", err + } + name, err := parseCgroupName(data) + if err != nil { + return "", err + } + + mountinfo, err := fs.ReadFile(d.fs, fmt.Sprintf("proc/%d/mountinfo", pid)) + if err != nil { + return "", err + } + scanner := bufio.NewScanner(bytes.NewReader(mountinfo)) + for scanner.Scan() { + line := strings.Fields(scanner.Text()) + if len(line) < 5 { + continue + } + + // Entries 3 and 4 contain the root path and the mountpoint, respectively. + // each resource will contain a separate mountpoint for the same path, so + // we can just pick one. + if line[4] == fmt.Sprintf("/%s/memory", d.root) { + mountpoint, err := filepath.Rel(line[3], name) + if err != nil { + return "", err + } + return filepath.Clean("/" + mountpoint), nil + } + } + return "", errors.New("cgroup not found") +} + +// MemoryUsage implements CgroupDriver. +func (d *cgroupV1Driver) MemoryUsage(cgroup string) (uint64, error) { + current, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryUsagePath)) + if err != nil { + return 0, err + } + return strconv.ParseUint(strings.TrimSpace(string(current)), 10, 64) +} + +// MemoryLimit implements CgroupDriver. +func (d *cgroupV1Driver) MemoryLimit(cgroup string) (uint64, error) { + data, err := fs.ReadFile(d.fs, d.Path(cgroup, MemoryLimitPath)) + if err != nil { + return 0, err + } + max := strings.TrimSpace(string(data)) + if max == "max" { + return 0, nil + } + return strconv.ParseUint(max, 10, 64) +} + +// Validate implements CgroupDriver. +func (d *cgroupV1Driver) Validate(cgroup string) error { + memoryPath := filepath.Join(d.root, "memory", cgroup) + info, err := fs.Stat(d.fs, memoryPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return errors.New("memory controller not enabled") + } + return fmt.Errorf("failed to stat cgroup: %w", err) + } + if !info.IsDir() { + return fmt.Errorf("%s is not a directory", memoryPath) + } + return nil +} + +var _ CgroupDriver = (*cgroupV1Driver)(nil) + +func DetectCgroupDriver() (CgroupDriver, error) { + osFs := os.DirFS("/") + + // fast path: cgroup2 only + var stat unix.Statfs_t + if err := unix.Statfs("/sys/fs/cgroup", &stat); err != nil { + return nil, err + } + if stat.Type == unix.CGROUP2_SUPER_MAGIC { + return &cgroupV2Driver{root: "sys/fs/cgroup", fs: osFs}, nil + } + + // find the hybrid mountpoint, or fall back to v1 + mountpoint, isV2, err := findMountpoint(osFs) + if err != nil { + return nil, err + } + if isV2 { + return &cgroupV2Driver{root: mountpoint, fs: osFs}, nil + } + return &cgroupV1Driver{root: mountpoint, fs: osFs}, nil +} + +func parseCgroupName(contents []byte) (string, error) { + scan := bufio.NewScanner(bytes.NewReader(contents)) + for scan.Scan() { + line := scan.Text() + if strings.HasPrefix(line, "0::") { + return strings.Split(strings.TrimPrefix(strings.TrimSpace(line), "0::"), " ")[0], nil + } + } + return "", errors.New("cgroup not found") +} + +func findMountpoint(fsys fs.FS) (mountpoint string, isV2 bool, err error) { + mounts, err := fs.ReadFile(fsys, fmt.Sprintf("proc/%d/mountinfo", os.Getpid())) + if err != nil { + return "", false, err + } + scanner := bufio.NewScanner(bytes.NewReader(mounts)) + var cgv1Root string + for scanner.Scan() { + line := strings.Fields(scanner.Text()) + fsType := line[slices.Index(line, "-")+1] + switch fsType { + case "cgroup2": + return line[4][1:], true, nil + case "cgroup": + if cgv1Root == "" { + cgv1Root = filepath.Dir(line[4][1:]) + } + } + } + if cgv1Root == "" { + return "", false, errors.New("no cgroup mount found") + } + return cgv1Root, false, nil +} + +func marshalAny(msg proto.Message) *anypb.Any { + data := new(anypb.Any) + _ = anypb.MarshalFrom(data, msg, proto.MarshalOptions{ + AllowPartial: true, + Deterministic: true, + }) + return data +} + +type memoryLimitWatcher struct { + limitFilePath string + + value atomic.Uint64 + + watches sync.WaitGroup +} + +func (w *memoryLimitWatcher) Value() uint64 { + return w.value.Load() +} + +func (w *memoryLimitWatcher) readValue() (uint64, error) { + data, err := os.ReadFile(w.limitFilePath) + if err != nil { + return 0, err + } + max := strings.TrimSpace(string(data)) + if max == "max" { + return 0, nil + } + return strconv.ParseUint(max, 10, 64) +} + +func (w *memoryLimitWatcher) Watch(ctx context.Context) error { + fd, err := unix.InotifyInit1(unix.IN_CLOEXEC) + if err != nil { + return err + } + closeInotify := sync.OnceFunc(func() { + log.Debug(ctx).Msg("stopping memory limit watcher") + unix.Close(fd) + }) + log.Debug(ctx).Str("file", w.limitFilePath).Msg("starting watch") + wd, err := unix.InotifyAddWatch(fd, w.limitFilePath, unix.IN_MODIFY) + if err != nil { + closeInotify() + return fmt.Errorf("failed to watch %s: %w", w.limitFilePath, err) + } + w.watches.Add(1) + closeWatch := sync.OnceFunc(func() { + log.Debug(ctx).Str("file", w.limitFilePath).Msg("stopping watch") + _, _ = unix.InotifyRmWatch(fd, uint32(wd)) + closeInotify() + w.watches.Done() + }) + + // perform the initial read synchronously and only after setting up the watch + v, err := w.readValue() + if err != nil { + closeWatch() + return err + } + w.value.Store(v) + log.Debug(ctx).Uint64("bytes", v).Msg("current memory limit") + + context.AfterFunc(ctx, closeWatch) // to unblock unix.Read below + go func() { + defer closeWatch() + var buf [unix.SizeofInotifyEvent]byte + for ctx.Err() == nil { + v, err := w.readValue() + if err != nil { + log.Error(ctx).Err(err).Msg("error reading memory limit") + } else if prev := w.value.Swap(v); prev != v { + log.Debug(ctx). + Uint64("prev", prev). + Uint64("current", v). + Msg("memory limit updated") + } + // After ctx is canceled, inotify_rm_watch sends an IN_IGNORED event, + // which unblocks this read and allows the loop to exit. + n, err := unix.Read(fd, buf[:]) + if err != nil { + if errors.Is(err, unix.EINTR) { + continue + } + return + } + if n == unix.SizeofInotifyEvent { + event := (*unix.InotifyEvent)(unsafe.Pointer(&buf)) + if (event.Mask & unix.IN_IGNORED) != 0 { + // watch was removed, or the file was deleted (this can happen if + // the memory controller is removed from the parent's subtree_control) + log.Info(ctx).Str("file", w.limitFilePath).Msg("watched file removed") + return + } + } + } + }() + + return nil +} + +// Wait blocks until all watches have been closed. +// +// Example use: +// +// ctx, ca := context.WithCancel(context.Background()) +// w := &memoryLimitWatcher{...} +// w.Watch(ctx) +// ... +// ca() +// w.Wait() // blocks until the previous watch is closed +func (w *memoryLimitWatcher) Wait() { + w.watches.Wait() +} diff --git a/pkg/envoy/resource_monitor_other.go b/pkg/envoy/resource_monitor_other.go new file mode 100644 index 000000000..d82ab9a10 --- /dev/null +++ b/pkg/envoy/resource_monitor_other.go @@ -0,0 +1,14 @@ +//go:build !linux + +package envoy + +import ( + "context" + "errors" + + "github.com/pomerium/pomerium/config" +) + +func NewSharedResourceMonitor(ctx context.Context, src config.Source, tempDir string) (ResourceMonitor, error) { + return nil, errors.New("unsupported platform") +} diff --git a/pkg/envoy/resource_monitor_test.go b/pkg/envoy/resource_monitor_test.go new file mode 100644 index 000000000..907dd2755 --- /dev/null +++ b/pkg/envoy/resource_monitor_test.go @@ -0,0 +1,886 @@ +//go:build linux + +package envoy + +import ( + "context" + "fmt" + "io/fs" + "maps" + "os" + "path" + "path/filepath" + "testing" + "testing/fstest" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/config/envoyconfig" + "github.com/pomerium/pomerium/config/envoyconfig/filemgr" + "github.com/pomerium/pomerium/internal/testutil" +) + +var ( + file = func(data string, mode fs.FileMode) *fstest.MapFile { + // ensure the data always ends with a \n + if data != "" && data[len(data)-1] != '\n' { + data += "\n" + } + return &fstest.MapFile{Data: []byte(data), Mode: mode} + } + + v2Fs = fstest.MapFS{ + "sys/fs/cgroup/test/cgroup.type": file("domain", 0o644), + "sys/fs/cgroup/test/cgroup.controllers": file("memory", 0o444), + "sys/fs/cgroup/test/cgroup.subtree_control": file("", 0o644), + "sys/fs/cgroup/test/memory.current": file("100", 0o644), + "sys/fs/cgroup/test/memory.max": file("200", 0o644), + + "proc/1/cgroup": file("0::/test\n", 0o444), + "proc/2/cgroup": file("0::/test2 (deleted)\n", 0o444), + + "proc/1/mountinfo": file(` +24 30 0:22 / /proc rw,nosuid,nodev,noexec,relatime shared:5 - proc proc rw +25 30 0:23 / /sys rw,nosuid,nodev,noexec,relatime shared:6 - sysfs sys rw +33 25 0:28 / /sys/fs/cgroup rw,nosuid,nodev,noexec,relatime shared:9 - cgroup2 cgroup2 rw,nsdelegate,memory_recursiveprot +`[1:], 0o444), + } + + v1Fs = fstest.MapFS{ + "sys/fs/cgroup/memory/test/memory.usage_in_bytes": file("100", 0o644), + "sys/fs/cgroup/memory/test/memory.limit_in_bytes": file("200", 0o644), + + "proc/1/cgroup": file(` +1:memory:/test +0::/test +`[1:], 0o444), + "proc/1/mountinfo": file(` +26 31 0:24 / /sys rw,nosuid,nodev,noexec,relatime shared:7 - sysfs sysfs rw +27 31 0:5 / /proc rw,nosuid,nodev,noexec,relatime shared:14 - proc proc rw +31 1 252:1 / / rw,relatime shared:1 - ext4 /dev/vda1 rw,errors=remount-ro +35 26 0:29 / /sys/fs/cgroup ro,nosuid,nodev,noexec shared:9 - tmpfs tmpfs ro,mode=755 +40 35 0:34 / /sys/fs/cgroup/memory rw,nosuid,nodev,noexec,relatime shared:15 - cgroup cgroup rw,memory +`[1:], 0o444), + } + + v1ContainerFs = fstest.MapFS{ + "sys/fs/cgroup/memory/test/memory.usage_in_bytes": file("100", 0o644), + "sys/fs/cgroup/memory/test/memory.limit_in_bytes": file("200", 0o644), + + "proc/1/cgroup": file(` +1:memory:/test +0::/test +`[1:], 0o444), + "proc/1/mountinfo": file(` +1574 1573 0:138 / /proc rw,nosuid,nodev,noexec,relatime - proc proc rw +1578 1573 0:133 / /sys ro,nosuid,nodev,noexec,relatime - sysfs sysfs ro +1579 1578 0:141 / /sys/fs/cgroup rw,nosuid,nodev,noexec,relatime - tmpfs tmpfs rw,mode=755 +1586 1579 0:39 /test /sys/fs/cgroup/memory ro,nosuid,nodev,noexec,relatime master:20 - cgroup cgroup rw,memory +1311 1574 0:138 /sys /proc/sys ro,nosuid,nodev,noexec,relatime - proc proc rw +`[1:], 0o444), + } + + hybridFs = fstest.MapFS{ + "sys/fs/cgroup/memory/test/memory.usage_in_bytes": file("100", 0o644), + "sys/fs/cgroup/memory/test/memory.limit_in_bytes": file("200", 0o644), + "sys/fs/cgroup/unified/test/cgroup.type": file("domain", 0o644), + "sys/fs/cgroup/unified/test/cgroup.controllers": file("memory", 0o444), + "sys/fs/cgroup/unified/test/cgroup.subtree_control": file("", 0o644), + "sys/fs/cgroup/unified/test/memory.current": file("100", 0o644), + "sys/fs/cgroup/unified/test/memory.max": file("200", 0o644), + + "proc/1/cgroup": file(` +1:memory:/test +0::/test +`[1:], 0o444), + "proc/1/mountinfo": file(` +26 31 0:24 / /sys rw,nosuid,nodev,noexec,relatime shared:7 - sysfs sysfs rw +27 31 0:5 / /proc rw,nosuid,nodev,noexec,relatime shared:14 - proc proc rw +35 26 0:29 / /sys/fs/cgroup ro,nosuid,nodev,noexec shared:9 - tmpfs tmpfs ro,mode=755 +36 35 0:30 / /sys/fs/cgroup/unified rw,nosuid,nodev,noexec,relatime shared:10 - cgroup2 cgroup2 rw,nsdelegate +46 35 0:40 / /sys/fs/cgroup/memory rw,nosuid,nodev,noexec,relatime shared:21 - cgroup cgroup rw,memory +`[1:], 0o444), + } + + with = func(dest, src fstest.MapFS) fstest.MapFS { + dest = maps.Clone(dest) + for k, v := range src { + dest[k] = v + } + return dest + } + + without = func(fs fstest.MapFS, keys ...string) fstest.MapFS { + fs = maps.Clone(fs) + for _, k := range keys { + delete(fs, k) + } + return fs + } +) + +func TestCgroupV2Driver(t *testing.T) { + d := cgroupV2Driver{ + root: "sys/fs/cgroup", + fs: v2Fs, + } + t.Run("Path", func(t *testing.T) { + assert.Equal(t, "sys/fs/cgroup", d.Path("test", RootPath)) + assert.Equal(t, "sys/fs/cgroup/test/memory.current", d.Path("test", MemoryUsagePath)) + assert.Equal(t, "sys/fs/cgroup/test/memory.max", d.Path("test", MemoryLimitPath)) + assert.Equal(t, "", d.Path("test", CgroupFilePath(0xF00))) + }) + + t.Run("CgroupForPid", func(t *testing.T) { + cgroup, err := d.CgroupForPid(1) + assert.NoError(t, err) + assert.Equal(t, "/test", cgroup) + + cgroup, err = d.CgroupForPid(2) + assert.NoError(t, err) + assert.Equal(t, "/test2", cgroup) + + _, err = d.CgroupForPid(12345) + assert.Error(t, err) + }) + + t.Run("MemoryUsage", func(t *testing.T) { + cases := []struct { + fs fstest.MapFS + err string + usage uint64 + }{ + 0: { + fs: v2Fs, + usage: 100, + }, + 1: { + fs: with(v2Fs, fstest.MapFS{ + "sys/fs/cgroup/test/memory.current": file("invalid", 0o644), + }), + err: "strconv.ParseUint: parsing \"invalid\": invalid syntax", + }, + 2: { + fs: without(v2Fs, "sys/fs/cgroup/test/memory.current"), + err: "open sys/fs/cgroup/test/memory.current: file does not exist", + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + driver := cgroupV2Driver{ + root: "sys/fs/cgroup", + fs: c.fs, + } + usage, err := driver.MemoryUsage("test") + if c.err == "" { + assert.NoError(t, err) + assert.Equal(t, c.usage, usage) + } else { + assert.EqualError(t, err, c.err) + } + }) + } + }) + + t.Run("MemoryLimit", func(t *testing.T) { + cases := []struct { + fs fstest.MapFS + err string + limit uint64 + }{ + 0: { + fs: v2Fs, + limit: 200, + }, + 1: { + fs: with(v2Fs, fstest.MapFS{ + "sys/fs/cgroup/test/memory.max": file("max", 0o644), + }), + limit: 0, + }, + 2: { + fs: without(v2Fs, "sys/fs/cgroup/test/memory.max"), + err: "open sys/fs/cgroup/test/memory.max: file does not exist", + }, + 3: { + fs: with(v2Fs, fstest.MapFS{ + "sys/fs/cgroup/test/memory.max": file("invalid", 0o644), + }), + err: "strconv.ParseUint: parsing \"invalid\": invalid syntax", + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + driver := cgroupV2Driver{ + root: "sys/fs/cgroup", + fs: c.fs, + } + limit, err := driver.MemoryLimit("test") + if c.err == "" { + assert.NoError(t, err) + assert.Equal(t, c.limit, limit) + } else { + assert.EqualError(t, err, c.err) + } + }) + } + }) + + t.Run("Validate", func(t *testing.T) { + cases := []struct { + fs fstest.MapFS + root string // optional + err string + }{ + 0: {fs: v2Fs}, + 1: {fs: hybridFs, root: "sys/fs/cgroup/unified"}, + 2: { + fs: with(v2Fs, fstest.MapFS{ + "sys/fs/cgroup/test/cgroup.type": file("threaded", 0o644), + }), + err: "not a domain cgroup", + }, + 3: { + fs: with(v2Fs, fstest.MapFS{ + "sys/fs/cgroup/test/cgroup.subtree_control": file("cpu", 0o644), + }), + err: "not a leaf cgroup", + }, + 4: { + fs: with(v2Fs, fstest.MapFS{ + "sys/fs/cgroup/test/cgroup.controllers": file("cpu io", 0o444), + }), + err: "memory controller not enabled", + }, + 5: { + fs: without(v2Fs, "sys/fs/cgroup/test/cgroup.controllers"), + err: "open sys/fs/cgroup/test/cgroup.controllers: file does not exist", + }, + 6: { + fs: without(v2Fs, "sys/fs/cgroup/test/cgroup.type"), + err: "open sys/fs/cgroup/test/cgroup.type: file does not exist", + }, + 7: { + fs: without(v2Fs, "sys/fs/cgroup/test/cgroup.subtree_control"), + err: "open sys/fs/cgroup/test/cgroup.subtree_control: file does not exist", + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + driver := cgroupV2Driver{ + root: "sys/fs/cgroup", + fs: c.fs, + } + if c.root != "" { + driver.root = c.root + } + err := driver.Validate("test") + if c.err == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, c.err) + } + }) + } + }) +} + +func TestCgroupV1Driver(t *testing.T) { + d := cgroupV1Driver{ + root: "sys/fs/cgroup", + fs: v1Fs, + } + t.Run("Path", func(t *testing.T) { + assert.Equal(t, "sys/fs/cgroup", d.Path("test", RootPath)) + assert.Equal(t, "sys/fs/cgroup/memory/test/memory.usage_in_bytes", d.Path("test", MemoryUsagePath)) + assert.Equal(t, "sys/fs/cgroup/memory/test/memory.limit_in_bytes", d.Path("test", MemoryLimitPath)) + assert.Equal(t, "", d.Path("test", CgroupFilePath(0xF00))) + }) + + t.Run("CgroupForPid", func(t *testing.T) { + cgroup, err := d.CgroupForPid(1) + assert.NoError(t, err) + assert.Equal(t, "/test", cgroup) + + _, err = d.CgroupForPid(12345) + assert.Error(t, err) + }) + + t.Run("MemoryUsage", func(t *testing.T) { + cases := []struct { + fs fstest.MapFS + err string + usage uint64 + }{ + 0: { + fs: v1Fs, + usage: 100, + }, + 1: { + fs: with(v1Fs, fstest.MapFS{ + "sys/fs/cgroup/memory/test/memory.usage_in_bytes": file("invalid", 0o644), + }), + err: "strconv.ParseUint: parsing \"invalid\": invalid syntax", + }, + 2: { + fs: without(v1Fs, "sys/fs/cgroup/memory/test/memory.usage_in_bytes"), + err: "open sys/fs/cgroup/memory/test/memory.usage_in_bytes: file does not exist", + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + driver := cgroupV1Driver{ + root: "sys/fs/cgroup", + fs: c.fs, + } + usage, err := driver.MemoryUsage("test") + if c.err == "" { + assert.NoError(t, err) + assert.Equal(t, c.usage, usage) + } else { + assert.EqualError(t, err, c.err) + } + }) + } + }) + + t.Run("MemoryLimit", func(t *testing.T) { + cases := []struct { + fs fstest.MapFS + err string + limit uint64 + }{ + 0: { + fs: v1Fs, + limit: 200, + }, + 1: { + fs: with(v1Fs, fstest.MapFS{ + "sys/fs/cgroup/memory/test/memory.limit_in_bytes": file("max", 0o644), + }), + limit: 0, + }, + 2: { + fs: with(v1Fs, fstest.MapFS{ + "sys/fs/cgroup/memory/test/memory.limit_in_bytes": file("invalid", 0o644), + }), + err: "strconv.ParseUint: parsing \"invalid\": invalid syntax", + }, + 3: { + fs: without(v1Fs, "sys/fs/cgroup/memory/test/memory.limit_in_bytes"), + err: "open sys/fs/cgroup/memory/test/memory.limit_in_bytes: file does not exist", + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + driver := cgroupV1Driver{ + root: "sys/fs/cgroup", + fs: c.fs, + } + limit, err := driver.MemoryLimit("test") + if c.err == "" { + assert.NoError(t, err) + assert.Equal(t, c.limit, limit) + } else { + assert.EqualError(t, err, c.err) + } + }) + } + }) + + t.Run("Validate", func(t *testing.T) { + cases := []struct { + fs fstest.MapFS + err string + }{ + 0: {fs: v1Fs}, + 1: {fs: v1ContainerFs}, + 2: {fs: hybridFs}, + 3: { + fs: without(v1Fs, + "sys/fs/cgroup/memory/test/memory.usage_in_bytes", + "sys/fs/cgroup/memory/test/memory.limit_in_bytes", + ), + err: "memory controller not enabled", + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + driver := cgroupV1Driver{ + root: "sys/fs/cgroup", + fs: c.fs, + } + err := driver.Validate("test") + if c.err == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, c.err) + } + }) + } + }) + + t.Run("Container FS", func(t *testing.T) { + driver := cgroupV1Driver{ + root: "sys/fs/cgroup", + fs: v1ContainerFs, + } + cgroup, err := driver.CgroupForPid(1) + assert.NoError(t, err) + assert.Equal(t, "/", cgroup) + }) + + t.Run("Hybrid FS", func(t *testing.T) { + driver := cgroupV1Driver{ + root: "sys/fs/cgroup", + fs: hybridFs, + } + cgroup, err := driver.CgroupForPid(1) + assert.NoError(t, err) + assert.Equal(t, "/test", cgroup) + + driver2 := cgroupV2Driver{ + root: "sys/fs/cgroup/unified", + fs: hybridFs, + } + cgroup, err = driver2.CgroupForPid(1) + assert.NoError(t, err) + assert.Equal(t, "/test", cgroup) + }) +} + +func TestFindMountpoint(t *testing.T) { + withActualPid := func(fs fstest.MapFS) fstest.MapFS { + fs = maps.Clone(fs) + fs[fmt.Sprintf("proc/%d/cgroup", os.Getpid())] = fs["proc/1/cgroup"] + fs[fmt.Sprintf("proc/%d/mountinfo", os.Getpid())] = fs["proc/1/mountinfo"] + return fs + } + cases := []struct { + fsys fs.FS + + mountpoint string + isV2 bool + err string + }{ + 0: { + fsys: withActualPid(v2Fs), + mountpoint: "sys/fs/cgroup", + isV2: true, + }, + 1: { + fsys: withActualPid(v1Fs), + mountpoint: "sys/fs/cgroup", + isV2: false, + }, + 2: { + fsys: withActualPid(hybridFs), + mountpoint: "sys/fs/cgroup/unified", + isV2: true, + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + mountpoint, isV2, err := findMountpoint(c.fsys) + if c.err == "" { + assert.NoError(t, err) + assert.Equal(t, c.mountpoint, mountpoint) + assert.Equal(t, c.isV2, isV2) + } else { + assert.EqualError(t, err, c.err) + } + }) + } +} + +type hybridTestFS struct { + base fstest.MapFS + tempDir string +} + +var _ fs.FS = (*hybridTestFS)(nil) + +func (fs *hybridTestFS) Open(name string) (fs.File, error) { + switch base := path.Base(name); base { + case "memory.current", "memory.max": + return os.Open(filepath.Join(fs.tempDir, ".fs", base)) + } + return fs.base.Open(name) +} + +func (fs *hybridTestFS) ReadFile(name string) ([]byte, error) { + switch base := path.Base(name); base { + case "memory.current", "memory.max": + return os.ReadFile(filepath.Join(fs.tempDir, ".fs", base)) + } + return fs.base.ReadFile(name) +} + +func (fs *hybridTestFS) Stat(name string) (fs.FileInfo, error) { + switch base := path.Base(name); base { + case "memory.current", "memory.max": + return os.Stat(filepath.Join(fs.tempDir, ".fs", base)) + } + return fs.base.Stat(name) +} + +type pathOverrideDriver struct { + CgroupDriver + overrides map[CgroupFilePath]string +} + +var _ CgroupDriver = (*pathOverrideDriver)(nil) + +func (d *pathOverrideDriver) Path(name string, path CgroupFilePath) string { + if override, ok := d.overrides[path]; ok { + return override + } + return d.CgroupDriver.Path(name, path) +} + +func TestSharedResourceMonitor(t *testing.T) { + // set shorter intervals for testing + var prevInitialDelay, prevMinInterval, prevMaxInterval time.Duration + monitorInitialTickDelay, prevInitialDelay = 0, monitorInitialTickDelay + monitorMaxTickInterval, prevMaxInterval = 100*time.Millisecond, monitorMaxTickInterval + monitorMinTickInterval, prevMinInterval = 10*time.Millisecond, monitorMinTickInterval + t.Cleanup(func() { + monitorInitialTickDelay = prevInitialDelay + monitorMaxTickInterval = prevMaxInterval + monitorMinTickInterval = prevMinInterval + }) + + testEnvoyPid := 99 + tempDir := t.TempDir() + require.NoError(t, os.Mkdir(filepath.Join(tempDir, ".fs"), 0o777)) + + testMemoryCurrentPath := filepath.Join(tempDir, ".fs/memory.current") + testMemoryMaxPath := filepath.Join(tempDir, ".fs/memory.max") + + updateMemoryCurrent := func(value string) { + t.Log("updating memory.current to", value) + f, err := os.OpenFile(testMemoryCurrentPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + require.NoError(t, err) + f.WriteString(value) + require.NoError(t, f.Close()) + } + + updateMemoryMax := func(value string) { + t.Log("updating memory.max to", value) + f, err := os.OpenFile(testMemoryMaxPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + require.NoError(t, err) + f.WriteString(value) + require.NoError(t, f.Close()) + } + + updateMemoryCurrent("100") + updateMemoryMax("200") + + driver := &pathOverrideDriver{ + CgroupDriver: &cgroupV2Driver{ + root: "sys/fs/cgroup", + fs: &hybridTestFS{ + base: with(v2Fs, fstest.MapFS{ + fmt.Sprintf("proc/%d/cgroup", os.Getpid()): v2Fs["proc/1/cgroup"], + fmt.Sprintf("proc/%d/mountinfo", os.Getpid()): v2Fs["proc/1/mountinfo"], + fmt.Sprintf("proc/%d/cgroup", testEnvoyPid): v2Fs["proc/1/cgroup"], + fmt.Sprintf("proc/%d/mountinfo", testEnvoyPid): v2Fs["proc/1/mountinfo"], + }), + tempDir: tempDir, + }, + }, + overrides: map[CgroupFilePath]string{ + MemoryUsagePath: testMemoryCurrentPath, + MemoryLimitPath: testMemoryMaxPath, + }, + } + + configSrc := config.NewStaticSource(&config.Config{}) + monitor, err := NewSharedResourceMonitor(context.Background(), configSrc, tempDir, WithCgroupDriver(driver)) + require.NoError(t, err) + + readMemorySaturation := func(t assert.TestingT) string { + f, err := os.ReadFile(filepath.Join(tempDir, "resource_monitor/memory/cgroup_memory_saturation")) + assert.NoError(t, err) + return string(f) + } + + assert.Equal(t, "0", readMemorySaturation(t)) + + ctx, ca := context.WithCancel(context.Background()) + + errC := make(chan error) + go func() { + defer close(errC) + errC <- monitor.Run(ctx, testEnvoyPid) + }() + + timeout := 1 * time.Second + interval := 10 * time.Millisecond + // 100/200 + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "0.500000", readMemorySaturation(c)) + }, timeout, interval) + + // 150/200 + updateMemoryCurrent("150") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "0.750000", readMemorySaturation(c)) + }, timeout, interval) + + // 150/300 + updateMemoryMax("300") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "0.500000", readMemorySaturation(c)) + }, timeout, interval) + + // 150/unlimited + updateMemoryMax("max") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "0.000000", readMemorySaturation(c)) + }, timeout, interval) + + // 150/145 (over limit) + updateMemoryMax("145") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "1.000000", readMemorySaturation(c)) + }, timeout, interval) + + // 150/150 + updateMemoryMax("150") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "1.000000", readMemorySaturation(c)) + }, timeout, interval) + + configSrc.SetConfig(ctx, &config.Config{ + Options: &config.Options{ + RuntimeFlags: config.RuntimeFlags{ + config.RuntimeFlagEnvoyResourceManagerEnabled: false, + }, + }, + }) + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "0.000000", readMemorySaturation(c)) + }, timeout, interval) + + configSrc.SetConfig(ctx, &config.Config{ + Options: &config.Options{ + RuntimeFlags: config.RuntimeFlags{ + config.RuntimeFlagEnvoyResourceManagerEnabled: true, + }, + }, + }) + + updateMemoryMax("150") + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "1.000000", readMemorySaturation(c)) + }, timeout, interval) + + ca() + assert.ErrorIs(t, <-errC, context.Canceled) + + // test deletion of memory.max + updateMemoryCurrent("150") + updateMemoryMax("300") + monitor, err = NewSharedResourceMonitor(context.Background(), configSrc, tempDir, WithCgroupDriver(driver)) + require.NoError(t, err) + + errC = make(chan error) + go func() { + defer close(errC) + errC <- monitor.Run(context.Background(), testEnvoyPid) + }() + + // 150/300 + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.Equal(c, "0.500000", readMemorySaturation(c)) + }, timeout, interval) + + require.NoError(t, os.Remove(testMemoryMaxPath)) + + assert.EqualError(t, <-errC, "memory limit watcher stopped") +} + +func TestBootstrapConfig(t *testing.T) { + b := envoyconfig.New("localhost:1111", "localhost:2222", "localhost:3333", filemgr.NewManager(), nil) + testEnvoyPid := 99 + tempDir := t.TempDir() + monitor, err := NewSharedResourceMonitor(context.Background(), config.NewStaticSource(nil), tempDir, WithCgroupDriver(&cgroupV2Driver{ + root: "sys/fs/cgroup", + fs: &hybridTestFS{ + base: with(v2Fs, fstest.MapFS{ + fmt.Sprintf("proc/%d/cgroup", os.Getpid()): v2Fs["proc/1/cgroup"], + fmt.Sprintf("proc/%d/mountinfo", os.Getpid()): v2Fs["proc/1/mountinfo"], + fmt.Sprintf("proc/%d/cgroup", testEnvoyPid): v2Fs["proc/1/cgroup"], + fmt.Sprintf("proc/%d/mountinfo", testEnvoyPid): v2Fs["proc/1/mountinfo"], + }), + tempDir: tempDir, + }, + })) + require.NoError(t, err) + + bootstrap, err := b.BuildBootstrap(context.Background(), &config.Config{ + Options: &config.Options{ + EnvoyAdminAddress: "localhost:9901", + }, + }, false) + assert.NoError(t, err) + + monitor.ApplyBootstrapConfig(bootstrap) + + testutil.AssertProtoJSONEqual(t, fmt.Sprintf(` + { + "actions": [ + { + "name": "envoy.overload_actions.shrink_heap", + "triggers": [ + { + "name": "envoy.resource_monitors.injected_resource", + "threshold": { + "value": 0.9 + } + } + ] + }, + { + "name": "envoy.overload_actions.reduce_timeouts", + "triggers": [ + { + "name": "envoy.resource_monitors.injected_resource", + "scaled": { + "saturationThreshold": 0.95, + "scalingThreshold": 0.85 + } + } + ], + "typedConfig": { + "@type": "type.googleapis.com/envoy.config.overload.v3.ScaleTimersOverloadActionConfig", + "timerScaleFactors": [ + { + "minScale": { + "value": 50 + }, + "timer": "HTTP_DOWNSTREAM_CONNECTION_IDLE" + } + ] + } + }, + { + "name": "envoy.overload_actions.reset_high_memory_stream", + "triggers": [ + { + "name": "envoy.resource_monitors.injected_resource", + "scaled": { + "saturationThreshold": 0.98, + "scalingThreshold": 0.9 + } + } + ] + }, + { + "name": "envoy.overload_actions.stop_accepting_connections", + "triggers": [ + { + "name": "envoy.resource_monitors.injected_resource", + "threshold": { + "value": 0.95 + } + } + ] + }, + { + "name": "envoy.overload_actions.disable_http_keepalive", + "triggers": [ + { + "name": "envoy.resource_monitors.injected_resource", + "threshold": { + "value": 0.98 + } + } + ] + }, + { + "name": "envoy.overload_actions.stop_accepting_requests", + "triggers": [ + { + "name": "envoy.resource_monitors.injected_resource", + "threshold": { + "value": 0.99 + } + } + ] + } + ], + "bufferFactoryConfig": { + "minimumAccountToTrackPowerOfTwo": 20 + }, + "resourceMonitors": [ + { + "name": "envoy.resource_monitors.global_downstream_max_connections", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.resource_monitors.downstream_connections.v3.DownstreamConnectionsConfig", + "maxActiveDownstreamConnections": "50000" + } + }, + { + "name": "envoy.resource_monitors.injected_resource", + "typedConfig": { + "@type": "type.googleapis.com/envoy.extensions.resource_monitors.injected_resource.v3.InjectedResourceConfig", + "filename": "%s/resource_monitor/memory/cgroup_memory_saturation" + } + } + ] + } + `, tempDir), bootstrap.OverloadManager) +} + +func TestComputeScaledTickInterval(t *testing.T) { + cases := []struct { + saturation float64 + expected time.Duration + }{ + 0: { + saturation: 0.0, + expected: 10000 * time.Millisecond, + }, + 1: { + saturation: 1.0, + expected: 250 * time.Millisecond, + }, + 2: { + saturation: 0.5, + expected: 5125 * time.Millisecond, + }, + 3: { + // duration should round to the nearest millisecond + saturation: 0.3333, + expected: 6750 * time.Millisecond, + }, + 4: { + saturation: -1.0, + expected: 10000 * time.Millisecond, + }, + 5: { + // saturation > 1 should be clamped to 1 + saturation: 1.5, + expected: 250 * time.Millisecond, + }, + } + + for i, c := range cases { + t.Run(fmt.Sprintf("case %d", i), func(t *testing.T) { + assert.Equal(t, c.expected, computeScaledTickInterval(c.saturation)) + }) + } +} diff --git a/pkg/metrics/constants.go b/pkg/metrics/constants.go index 84a8c0741..abcab6f96 100644 --- a/pkg/metrics/constants.go +++ b/pkg/metrics/constants.go @@ -41,6 +41,14 @@ const ( // IdentityManagerLastSessionRefreshSuccess is a counter of last session refresh success IdentityManagerLastSessionRefreshSuccess = "identity_manager_last_session_refresh_success" + // EnvoyOverloadActionState tracks the current state of envoy overload actions + EnvoyOverloadActionState = "envoy_overload_action_state" + // EnvoyOverloadActionThreshold tracks container memory usage minimum thresholds for envoy overload actions + EnvoyOverloadActionThreshold = "envoy_overload_action_threshold" + // EnvoyCgroupMemorySaturation tracks the memory usage percent (0.0-1.0) of the cgroup in which envoy is running. + // This metric is computed by pomerium and used as an injected resource in envoy's overload manager. + EnvoyCgroupMemorySaturation = "envoy_cgroup_memory_saturation" + // BuildInfo is a gauge that may be used to detect whether component is live, and also has version BuildInfo = "build_info" // PolicyCountTotal is total amount of routes currently configured