From 3c4513a91e21e3ec6bd40ecb53d97ffb7e88c503 Mon Sep 17 00:00:00 2001 From: Travis Groth Date: Thu, 30 Jul 2020 18:19:23 -0400 Subject: [PATCH] telmetry: add databroker storage metrics and tracing (#1161) * telmetry: add databroker storage metrics and tracing --- config/helpers.go | 6 +++ config/options.go | 6 +-- docs/reference/readme.md | 5 ++ internal/databroker/server.go | 5 +- internal/telemetry/metrics/const.go | 5 ++ internal/telemetry/metrics/kv.go | 34 ++++++++++++ internal/telemetry/metrics/kv_test.go | 34 ++++++++++++ internal/telemetry/metrics/registry.go | 30 +++++++++++ internal/telemetry/metrics/storage.go | 61 ++++++++++++++++++++++ internal/telemetry/metrics/storage_test.go | 33 ++++++++++++ pkg/storage/inmemory/inmemory.go | 3 +- pkg/storage/redis/redis.go | 48 ++++++++++++++--- 12 files changed, 255 insertions(+), 15 deletions(-) create mode 100644 internal/telemetry/metrics/kv.go create mode 100644 internal/telemetry/metrics/kv_test.go create mode 100644 internal/telemetry/metrics/storage.go create mode 100644 internal/telemetry/metrics/storage_test.go diff --git a/config/helpers.go b/config/helpers.go index 541862116..fb6001cec 100644 --- a/config/helpers.go +++ b/config/helpers.go @@ -11,6 +11,12 @@ const ( ServiceAuthenticate = "authenticate" // ServiceCache represents running the cache service component ServiceCache = "cache" + + // RedisName is the name of the redis storage backend + StorageRedisName = "redis" + + // InMemoryName is the name of the in-memory storage backend + StorageInMemoryName = "memory" ) // IsValidService checks to see if a service is a valid service mode diff --git a/config/options.go b/config/options.go index f39019125..029f2afb6 100644 --- a/config/options.go +++ b/config/options.go @@ -25,8 +25,6 @@ import ( "github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/urlutil" "github.com/pomerium/pomerium/pkg/cryptutil" - "github.com/pomerium/pomerium/pkg/storage/inmemory" - "github.com/pomerium/pomerium/pkg/storage/redis" ) // DisableHeaderKey is the key used to check whether to disable setting header @@ -491,8 +489,8 @@ func (o *Options) Validate() error { o.DataBrokerURLString = o.CacheURLString } switch o.DataBrokerStorageType { - case inmemory.Name: - case redis.Name: + case StorageInMemoryName: + case StorageRedisName: if o.DataBrokerStorageConnectionString == "" { return errors.New("config: missing databroker storage backend dsn") } diff --git a/docs/reference/readme.md b/docs/reference/readme.md index 5def8fc0b..c13db667f 100644 --- a/docs/reference/readme.md +++ b/docs/reference/readme.md @@ -479,6 +479,11 @@ Expose a prometheus format HTTP endpoint on the specified port. Disabled by defa | pomerium_config_checksum_int64 | Gauge | Currently loaded configuration checksum by service | | pomerium_config_last_reload_success | Gauge | Whether the last configuration reload succeeded by service | | pomerium_config_last_reload_success_timestamp | Gauge | The timestamp of the last successful configuration reload by service | +| redis_conns | Gauge | Number of total connections in the pool | +| redis_idle_conns | Gauge | Total number of times free connection was found in the pool | +| redis_wait_count_total | Counter | Total number of connections waited for | +| redis_wait_duration_ms_total | Counter | Total time spent waiting for connections | +| storage_operation_duration_ms | Histogram | Storage operation duration by operation, result, backend and service | #### Envoy Proxy Metrics diff --git a/internal/databroker/server.go b/internal/databroker/server.go index d32a39faf..7acab1c3f 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" + "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/signal" "github.com/pomerium/pomerium/internal/telemetry/trace" @@ -348,9 +349,9 @@ func (srv *Server) getDB(recordType string) (storage.Backend, error) { func (srv *Server) newDB(recordType string) (db storage.Backend, err error) { switch srv.cfg.storageType { - case inmemory.Name: + case config.StorageInMemoryName: db = inmemory.NewDB(recordType, srv.cfg.btreeDegree) - case redis.Name: + case config.StorageRedisName: db, err = redis.New(srv.cfg.storageConnectionString, recordType, int64(srv.cfg.deletePermanentlyAfter.Seconds())) if err != nil { return nil, fmt.Errorf("failed to create new redis storage: %w", err) diff --git a/internal/telemetry/metrics/const.go b/internal/telemetry/metrics/const.go index cbad0556e..eda0386a4 100644 --- a/internal/telemetry/metrics/const.go +++ b/internal/telemetry/metrics/const.go @@ -14,6 +14,10 @@ var ( TagKeyGRPCMethod = tag.MustNewKey("grpc_method") TagKeyHost = tag.MustNewKey("host") TagKeyDestination = tag.MustNewKey("destination") + + TagKeyStorageOperation = tag.MustNewKey("operation") + TagKeyStorageResult = tag.MustNewKey("result") + TagKeyStorageBackend = tag.MustNewKey("backend") ) // Default distributions used by views in this package. @@ -39,5 +43,6 @@ var ( HTTPClientViews, HTTPServerViews, InfoViews, + StorageViews, } ) diff --git a/internal/telemetry/metrics/kv.go b/internal/telemetry/metrics/kv.go new file mode 100644 index 000000000..8f56f3e1d --- /dev/null +++ b/internal/telemetry/metrics/kv.go @@ -0,0 +1,34 @@ +package metrics + +import ( + "github.com/gomodule/redigo/redis" +) + +// AddRedisMetrics registers a metrics handler against a redis Client's PoolStats() method +func AddRedisMetrics(stats func() redis.PoolStats) { + gaugeMetrics := []struct { + name string + desc string + f func() int64 + }{ + {"redis_conns", "Number of total connections in the pool", func() int64 { return int64(stats().ActiveCount) }}, + {"redis_idle_conns", "Number of idle connections in the pool", func() int64 { return int64(stats().IdleCount) }}, + } + + for _, m := range gaugeMetrics { + registry.addInt64DerivedGaugeMetric(m.name, m.desc, "redis", m.f) + } + + cumulativeMetrics := []struct { + name string + desc string + f func() int64 + }{ + {"redis_wait_count_total", "Total number of connections waited for", func() int64 { return stats().WaitCount }}, + {"redis_wait_duration_ms_total", "Total time spent waiting for connections", func() int64 { return stats().WaitDuration.Milliseconds() }}, + } + + for _, m := range cumulativeMetrics { + registry.addInt64DerivedCumulativeMetric(m.name, m.desc, "redis", m.f) + } +} diff --git a/internal/telemetry/metrics/kv_test.go b/internal/telemetry/metrics/kv_test.go new file mode 100644 index 000000000..2aea734f6 --- /dev/null +++ b/internal/telemetry/metrics/kv_test.go @@ -0,0 +1,34 @@ +package metrics + +import ( + "testing" + + "github.com/gomodule/redigo/redis" + "go.opencensus.io/metric/metricdata" +) + +func Test_AddRedisMetrics(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + stat redis.PoolStats + want int64 + }{ + {"redis_conns", redis.PoolStats{ActiveCount: 7}, 7}, + {"redis_idle_conns", redis.PoolStats{IdleCount: 3}, 3}, + {"redis_wait_count_total", redis.PoolStats{WaitCount: 2}, 2}, + } + + labelValues := []metricdata.LabelValue{ + metricdata.NewLabelValue("redis"), + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + AddRedisMetrics(func() redis.PoolStats { return tt.stat }) + testMetricRetrieval(registry.registry.Read(), t, labelValues, tt.want, tt.name) + }) + } + +} diff --git a/internal/telemetry/metrics/registry.go b/internal/telemetry/metrics/registry.go index 79c2a921a..79005b504 100644 --- a/internal/telemetry/metrics/registry.go +++ b/internal/telemetry/metrics/registry.go @@ -106,3 +106,33 @@ func (r *metricRegistry) setConfigChecksum(service string, checksum uint64) { } m.Set(float64(checksum)) } + +func (r *metricRegistry) addInt64DerivedGaugeMetric(name string, desc string, service string, f func() int64) { + + m, err := r.registry.AddInt64DerivedGauge(name, metric.WithDescription(desc), metric.WithLabelKeys("service")) + if err != nil { + log.Error().Err(err).Str("service", service).Msg("telemetry/metrics: failed to register metric") + return + } + + err = m.UpsertEntry(f, metricdata.NewLabelValue(service)) + if err != nil { + log.Error().Err(err).Str("service", service).Msg("telemetry/metrics: failed to update metric") + return + } +} + +func (r *metricRegistry) addInt64DerivedCumulativeMetric(name string, desc string, service string, f func() int64) { + + m, err := r.registry.AddInt64DerivedCumulative(name, metric.WithDescription(desc), metric.WithLabelKeys("service")) + if err != nil { + log.Error().Err(err).Str("service", service).Msg("telemetry/metrics: failed to register metric") + return + } + + err = m.UpsertEntry(f, metricdata.NewLabelValue(service)) + if err != nil { + log.Error().Err(err).Str("service", service).Msg("telemetry/metrics: failed to update metric") + return + } +} diff --git a/internal/telemetry/metrics/storage.go b/internal/telemetry/metrics/storage.go new file mode 100644 index 000000000..1e21d0739 --- /dev/null +++ b/internal/telemetry/metrics/storage.go @@ -0,0 +1,61 @@ +package metrics + +import ( + "context" + "time" + + "github.com/pomerium/pomerium/internal/log" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +var ( + // StorageViews contains opencensus views for storage system metrics + StorageViews = []*view.View{StorageOperationDurationView} + + storageOperationDuration = stats.Int64( + "storage_operation_duration_ms", + "Storage operation duration in ms", + "ms") + + // StorageOperationDurationView is an OpenCensus view that tracks storage client + // latency by operation, result and backend + StorageOperationDurationView = &view.View{ + Name: storageOperationDuration.Name(), + Description: storageOperationDuration.Description(), + Measure: storageOperationDuration, + TagKeys: []tag.Key{TagKeyStorageOperation, TagKeyStorageResult, TagKeyStorageBackend, TagKeyService}, + Aggregation: DefaultMillisecondsDistribution, + } +) + +// StorageOperationTags contains tags to apply when recording a storage operation +type StorageOperationTags struct { + Operation string + Error error + Backend string +} + +// RecordStorageOperation records the duration of a storage operation with the corresponding tags +func RecordStorageOperation(ctx context.Context, tags *StorageOperationTags, duration time.Duration) { + result := "success" + if tags.Error != nil { + result = "error" + } + + err := stats.RecordWithTags(ctx, + []tag.Mutator{ + tag.Upsert(TagKeyStorageOperation, tags.Operation), + tag.Upsert(TagKeyStorageResult, result), + tag.Upsert(TagKeyStorageBackend, tags.Backend), + //TODO service tag does not consistently come in from RPCs. Requires + // follow up + tag.Upsert(TagKeyService, "databroker"), + }, + storageOperationDuration.M(duration.Milliseconds()), + ) + if err != nil { + log.Warn().Err(err).Msg("internal/telemetry/metrics: failed to record") + } +} diff --git a/internal/telemetry/metrics/storage_test.go b/internal/telemetry/metrics/storage_test.go new file mode 100644 index 000000000..6073e791b --- /dev/null +++ b/internal/telemetry/metrics/storage_test.go @@ -0,0 +1,33 @@ +package metrics + +import ( + "context" + "errors" + "testing" + "time" + + "go.opencensus.io/stats/view" +) + +func Test_RecordStorageOperation(t *testing.T) { + + tests := []struct { + name string + tags *StorageOperationTags + duration time.Duration + want string + }{ + {"success", &StorageOperationTags{Operation: "test", Backend: "testengine"}, time.Millisecond * 5, "{ { {backend testengine}{operation test}{result success}{service databroker} }&{1 5 5 5 0"}, + {"error", &StorageOperationTags{Operation: "failtest", Backend: "failengine", Error: errors.New("failure")}, time.Millisecond * 5, "{ { {backend failengine}{operation failtest}{result error}{service databroker} }&{1 5 5 5 0"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + view.Unregister(StorageViews...) + view.Register(StorageViews...) + RecordStorageOperation(context.Background(), tt.tags, tt.duration) + + testDataRetrieval(StorageOperationDurationView, t, tt.want) + }) + } +} diff --git a/pkg/storage/inmemory/inmemory.go b/pkg/storage/inmemory/inmemory.go index f4a043ff4..715fedaec 100644 --- a/pkg/storage/inmemory/inmemory.go +++ b/pkg/storage/inmemory/inmemory.go @@ -14,13 +14,14 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/signal" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" ) // Name is the storage type name for inmemory backend. -const Name = "memory" +const Name = config.StorageInMemoryName var _ storage.Backend = (*DB)(nil) diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index d7362e5ee..a5d4c2337 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -13,13 +13,16 @@ import ( "github.com/gomodule/redigo/redis" "google.golang.org/protobuf/types/known/anypb" + "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/metrics" + "github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" ) // Name is the storage type name for redis backend. -const Name = "redis" +const Name = config.StorageRedisName const watchAction = "zadd" var _ storage.Backend = (*DB)(nil) @@ -63,13 +66,18 @@ func New(rawURL, recordType string, deletePermanentAfter int64) (*DB, error) { deletedSet: recordType + "_deleted_set", lastVersionKey: recordType + "_last_version", } + metrics.AddRedisMetrics(db.pool.Stats) return db, nil } // Put sets new record for given id with input data. -func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) error { +func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) (err error) { c := db.pool.Get() + _, span := trace.StartSpan(ctx, "databroker.redis.Put") + defer span.End() + defer recordOperation(ctx, time.Now(), "put", err) defer c.Close() + record, err := db.Get(ctx, id) if err != nil { record = new(databroker.Record) @@ -101,8 +109,11 @@ func (db *DB) Put(ctx context.Context, id string, data *anypb.Any) error { } // Get retrieves a record from redis. -func (db *DB) Get(_ context.Context, id string) (*databroker.Record, error) { +func (db *DB) Get(ctx context.Context, id string) (rec *databroker.Record, err error) { c := db.pool.Get() + _, span := trace.StartSpan(ctx, "databroker.redis.Get") + defer span.End() + defer recordOperation(ctx, time.Now(), "get", err) defer c.Close() b, err := redis.Bytes(c.Do("HGET", db.recordType, id)) @@ -114,15 +125,21 @@ func (db *DB) Get(_ context.Context, id string) (*databroker.Record, error) { } // GetAll retrieves all records from redis. -func (db *DB) GetAll(ctx context.Context) ([]*databroker.Record, error) { +func (db *DB) GetAll(ctx context.Context) (recs []*databroker.Record, err error) { + _, span := trace.StartSpan(ctx, "databroker.redis.GetAll") + defer span.End() + defer recordOperation(ctx, time.Now(), "get_all", err) return db.getAll(ctx, func(record *databroker.Record) bool { return true }) } // List retrieves all records since given version. // // "version" is in hex format, invalid version will be treated as 0. -func (db *DB) List(ctx context.Context, sinceVersion string) ([]*databroker.Record, error) { +func (db *DB) List(ctx context.Context, sinceVersion string) (rec []*databroker.Record, err error) { c := db.pool.Get() + _, span := trace.StartSpan(ctx, "databroker.redis.List") + defer span.End() + defer recordOperation(ctx, time.Now(), "list", err) defer c.Close() v, err := strconv.ParseUint(sinceVersion, 16, 64) @@ -151,8 +168,11 @@ func (db *DB) List(ctx context.Context, sinceVersion string) ([]*databroker.Reco } // Delete sets a record DeletedAt field and set its TTL. -func (db *DB) Delete(ctx context.Context, id string) error { +func (db *DB) Delete(ctx context.Context, id string) (err error) { c := db.pool.Get() + _, span := trace.StartSpan(ctx, "databroker.redis.Delete") + defer span.End() + defer recordOperation(ctx, time.Now(), "delete", err) defer c.Close() r, err := db.Get(ctx, id) @@ -184,8 +204,12 @@ func (db *DB) Delete(ctx context.Context, id string) error { } // ClearDeleted clears all the currently deleted records older than the given cutoff. -func (db *DB) ClearDeleted(_ context.Context, cutoff time.Time) { +func (db *DB) ClearDeleted(ctx context.Context, cutoff time.Time) { c := db.pool.Get() + _, span := trace.StartSpan(ctx, "databroker.redis.ClearDeleted") + defer span.End() + var opErr error + defer recordOperation(ctx, time.Now(), "clear_deleted", opErr) defer c.Close() ids, _ := redis.Strings(c.Do("SMEMBERS", db.deletedSet)) @@ -204,7 +228,7 @@ func (db *DB) ClearDeleted(_ context.Context, cutoff time.Time) { {"ZREM": {db.versionSet, id}}, {"SREM": {db.deletedSet, id}}, } - _ = db.tx(c, cmds) + opErr = db.tx(c, cmds) } } } @@ -329,3 +353,11 @@ func (db *DB) tx(c redis.Conn, commands []map[string][]interface{}) error { _, err := c.Do("EXEC") return err } + +func recordOperation(ctx context.Context, startTime time.Time, operation string, err error) { + metrics.RecordStorageOperation(ctx, &metrics.StorageOperationTags{ + Operation: operation, + Error: err, + Backend: Name, + }, time.Since(startTime)) +}