telemetry: add process collector for envoy (#1948)

* telemetry: add process collector for envoy

* add test

* maybe fix macos

* address comments
This commit is contained in:
Caleb Doxsey 2021-03-03 16:05:35 -07:00 committed by GitHub
parent f396c2a0f7
commit 92c3a4a56c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 245 additions and 0 deletions

1
go.mod
View file

@ -44,6 +44,7 @@ require (
github.com/pelletier/go-toml v1.8.1 // indirect github.com/pelletier/go-toml v1.8.1 // indirect
github.com/pomerium/csrf v1.7.0 github.com/pomerium/csrf v1.7.0
github.com/prometheus/client_golang v1.9.0 github.com/prometheus/client_golang v1.9.0
github.com/prometheus/procfs v0.2.0
github.com/rakyll/statik v0.1.7 github.com/rakyll/statik v0.1.7
github.com/rjeczalik/notify v0.9.3-0.20201210012515-e2a77dcc14cf github.com/rjeczalik/notify v0.9.3-0.20201210012515-e2a77dcc14cf
github.com/rs/cors v1.7.0 github.com/rs/cors v1.7.0

View file

@ -4,6 +4,7 @@ package envoy
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"context"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"errors" "errors"
@ -15,6 +16,7 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"regexp" "regexp"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -31,6 +33,7 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/natefinch/atomic" "github.com/natefinch/atomic"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.opencensus.io/stats/view"
"google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/durationpb"
@ -38,6 +41,7 @@ import (
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/internal/telemetry/trace"
) )
@ -109,6 +113,7 @@ func NewServer(src config.Source, grpcPort, httpPort string) (*Server, error) {
httpPort: httpPort, httpPort: httpPort,
envoyPath: envoyPath, envoyPath: envoyPath,
} }
go srv.runProcessCollector()
src.OnConfigChange(srv.onConfigChange) src.OnConfigChange(srv.onConfigChange)
srv.onConfigChange(src.GetConfig()) srv.onConfigChange(src.GetConfig())
@ -531,3 +536,35 @@ func (srv *Server) handleLogs(rc io.ReadCloser) {
Msg(msg) Msg(msg)
} }
} }
func (srv *Server) runProcessCollector() {
// macos is not supported
if runtime.GOOS != "linux" {
return
}
pc := metrics.NewProcessCollector("envoy")
if err := view.Register(pc.Views()...); err != nil {
log.Error().Err(err).Msg("failed to register envoy process metric views")
}
const collectInterval = time.Second * 10
ticker := time.NewTicker(collectInterval)
defer ticker.Stop()
for range ticker.C {
var pid int
srv.mu.Lock()
if srv.cmd != nil && srv.cmd.Process != nil {
pid = srv.cmd.Process.Pid
}
srv.mu.Unlock()
if pid > 0 {
err := pc.Measure(context.Background(), pid)
if err != nil {
log.Error().Err(err).Msg("failed to measure envoy process metrics")
}
}
}
}

View file

@ -0,0 +1,151 @@
package metrics
import (
"context"
"github.com/prometheus/procfs"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)
// A ProcessCollector collects stats about a process.
type ProcessCollector struct {
cpuTotal *stats.Float64Measure
openFDs *stats.Int64Measure
maxFDs *stats.Int64Measure
vsize *stats.Int64Measure
maxVsize *stats.Int64Measure
rss *stats.Int64Measure
startTime *stats.Float64Measure
views []*view.View
}
// NewProcessCollector creates a new ProcessCollector.
func NewProcessCollector(name string) *ProcessCollector {
pc := &ProcessCollector{
cpuTotal: stats.Float64(
name+"_process_cpu_seconds_total",
"Total user and system CPU time spent in seconds.",
stats.UnitSeconds,
),
openFDs: stats.Int64(
name+"_process_open_fds",
"Number of open file descriptors.",
"{file_descriptor}",
),
maxFDs: stats.Int64(
name+"_process_max_fds",
"Maximum number of open file descriptors.",
"{file_descriptor}",
),
vsize: stats.Int64(
name+"_process_virtual_memory_bytes",
"Virtual memory size in bytes.",
stats.UnitBytes,
),
maxVsize: stats.Int64(
name+"_process_virtual_memory_max_bytes",
"Maximum amount of virtual memory available in bytes.",
stats.UnitBytes,
),
rss: stats.Int64(
name+"_process_resident_memory_bytes",
"Resident memory size in bytes.",
stats.UnitBytes,
),
startTime: stats.Float64(
name+"_process_start_time_seconds",
"Start time of the process since unix epoch in seconds.",
stats.UnitSeconds,
),
}
pc.views = []*view.View{
{
Name: pc.cpuTotal.Name(),
Description: pc.cpuTotal.Description(),
Measure: pc.cpuTotal,
Aggregation: view.Sum(),
},
{
Name: pc.openFDs.Name(),
Description: pc.openFDs.Description(),
Measure: pc.openFDs,
Aggregation: view.LastValue(),
},
{
Name: pc.maxFDs.Name(),
Description: pc.maxFDs.Description(),
Measure: pc.maxFDs,
Aggregation: view.LastValue(),
},
{
Name: pc.vsize.Name(),
Description: pc.vsize.Description(),
Measure: pc.vsize,
Aggregation: view.LastValue(),
},
{
Name: pc.maxVsize.Name(),
Description: pc.maxVsize.Description(),
Measure: pc.maxVsize,
Aggregation: view.LastValue(),
},
{
Name: pc.rss.Name(),
Description: pc.rss.Description(),
Measure: pc.rss,
Aggregation: view.LastValue(),
},
{
Name: pc.startTime.Name(),
Description: pc.startTime.Description(),
Measure: pc.startTime,
Aggregation: view.LastValue(),
},
}
return pc
}
// Views returns the views for the process collector.
func (pc *ProcessCollector) Views() []*view.View {
return pc.views
}
// Measure measures the stats for a process.
func (pc *ProcessCollector) Measure(ctx context.Context, pid int) error {
proc, err := procfs.NewProc(pid)
if err != nil {
return err
}
procStat, err := proc.Stat()
if err != nil {
return err
}
procStartTime, err := procStat.StartTime()
if err != nil {
return err
}
procFDLen, err := proc.FileDescriptorsLen()
if err != nil {
return err
}
procLimits, err := proc.Limits()
if err != nil {
return err
}
stats.Record(ctx,
pc.cpuTotal.M(procStat.CPUTime()),
pc.openFDs.M(int64(procFDLen)),
pc.maxFDs.M(procLimits.OpenFiles),
pc.vsize.M(int64(procStat.VSize)),
pc.maxVsize.M(procLimits.AddressSpace),
pc.rss.M(int64(procStat.RSS)),
pc.startTime.M(procStartTime),
)
return nil
}

View file

@ -0,0 +1,56 @@
package metrics
import (
"context"
"net/http/httptest"
"os"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
)
func TestProcessCollector(t *testing.T) {
if runtime.GOOS != "linux" {
t.SkipNow()
}
exp, err := getGlobalExporter()
require.NoError(t, err)
pc := NewProcessCollector("example")
err = view.Register(pc.Views()...)
require.NoError(t, err)
defer view.Unregister(pc.Views()...)
err = pc.Measure(context.Background(), os.Getpid())
require.NoError(t, err)
expect := []string{
"pomerium_example_process_cpu_seconds_total",
"pomerium_example_process_max_fds",
"pomerium_example_process_open_fds",
"pomerium_example_process_resident_memory_bytes",
"pomerium_example_process_start_time_seconds",
"pomerium_example_process_virtual_memory_bytes",
"pomerium_example_process_virtual_memory_max_bytes",
}
assert.Eventually(t, func() bool {
req := httptest.NewRequest("GET", "http://test.local/metrics", nil)
rec := httptest.NewRecorder()
exp.ServeHTTP(rec, req)
str := rec.Body.String()
for _, nm := range expect {
if !strings.Contains(str, nm) {
return false
}
}
return true
}, time.Second*3, time.Millisecond*50,
"prometheus exporter should contain process metrics: %v",
expect)
}