mirror of
https://github.com/pomerium/pomerium.git
synced 2025-06-04 03:42:49 +02:00
telemetry: add tracing
- telemetry/tace: add traces throughout code - telemetry/metrics: nest metrics and trace under telemetry - telemetry/tace: add service name span to HTTPMetricsHandler. - telemetry/metrics: removed chain dependency middleware_tests. - telemetry/metrics: wrap and encapsulate variatic view registration. - telemetry/tace: add jaeger support for tracing. - cmd/pomerium: move `parseOptions` to internal/config. - cmd/pomerium: offload server handling to httputil and sub pkgs. - httputil: standardize creation/shutdown of http listeners. - httputil: prefer curve X25519 to P256 when negotiating TLS. - fileutil: use standardized Getw Signed-off-by: Bobby DeSimone <bobbydesimone@gmail.com>
This commit is contained in:
parent
6b61a48fce
commit
5edfa7b03f
49 changed files with 1524 additions and 758 deletions
41
internal/telemetry/metrics/const.go
Normal file
41
internal/telemetry/metrics/const.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"go.opencensus.io/plugin/ocgrpc"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
)
|
||||
|
||||
// The following tags are applied to stats recorded by this package.
|
||||
var (
|
||||
TagKeyHTTPMethod tag.Key = tag.MustNewKey("http_method")
|
||||
TagKeyService tag.Key = tag.MustNewKey("service")
|
||||
TagKeyGRPCService tag.Key = tag.MustNewKey("grpc_service")
|
||||
TagKeyGRPCMethod tag.Key = tag.MustNewKey("grpc_method")
|
||||
TagKeyHost tag.Key = tag.MustNewKey("host")
|
||||
TagKeyDestination tag.Key = tag.MustNewKey("destination")
|
||||
)
|
||||
|
||||
// Default distributions used by views in this package.
|
||||
var (
|
||||
DefaulHTTPSizeDistribution = view.Distribution(
|
||||
1, 256, 512, 1024, 2048, 8192, 16384, 32768, 65536, 131072, 262144,
|
||||
524288, 1048576, 2097152, 4194304, 8388608)
|
||||
DefaultHTTPLatencyDistrubtion = view.Distribution(
|
||||
1, 2, 5, 7, 10, 25, 500, 750, 100, 250, 500, 750, 1000, 2500, 5000,
|
||||
7500, 10000, 25000, 50000, 75000, 100000)
|
||||
grpcSizeDistribution = view.Distribution(
|
||||
1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024,
|
||||
2048, 4096, 8192, 16384,
|
||||
)
|
||||
DefaultMillisecondsDistribution = ocgrpc.DefaultMillisecondsDistribution
|
||||
)
|
||||
|
||||
// DefaultViews are a set of default views to view HTTP and GRPC metrics.
|
||||
var (
|
||||
DefaultViews = [][]*view.View{
|
||||
GRPCServerViews,
|
||||
HTTPServerViews,
|
||||
GRPCClientViews,
|
||||
GRPCServerViews}
|
||||
)
|
189
internal/telemetry/metrics/grpc.go
Normal file
189
internal/telemetry/metrics/grpc.go
Normal file
|
@ -0,0 +1,189 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"go.opencensus.io/plugin/ocgrpc"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
"google.golang.org/grpc"
|
||||
grpcstats "google.golang.org/grpc/stats"
|
||||
)
|
||||
|
||||
// GRPC Views
|
||||
var (
|
||||
// GRPCClientViews contains opencensus views for GRPC Client metrics.
|
||||
GRPCClientViews = []*view.View{
|
||||
GRPCClientRequestCountView,
|
||||
GRPCClientRequestDurationView,
|
||||
GRPCClientResponseSizeView,
|
||||
GRPCClientRequestSizeView}
|
||||
// GRPCServerViews contains opencensus views for GRPC Server metrics.
|
||||
GRPCServerViews = []*view.View{
|
||||
GRPCServerRequestCountView,
|
||||
GRPCServerRequestDurationView,
|
||||
GRPCServerResponseSizeView,
|
||||
GRPCServerRequestSizeView}
|
||||
|
||||
// GRPCServerRequestCountView is an OpenCensus view which counts GRPC Server
|
||||
// requests by pomerium service, grpc service, grpc method, and status
|
||||
GRPCServerRequestCountView = &view.View{
|
||||
Name: "grpc/server/requests_total",
|
||||
Measure: ocgrpc.ServerLatency,
|
||||
Description: "Total grpc Requests",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyGRPCMethod, ocgrpc.KeyServerStatus, TagKeyGRPCService},
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
|
||||
// GRPCServerRequestDurationView is an OpenCensus view which tracks GRPC Server
|
||||
// request duration by pomerium service, grpc service, grpc method, and status
|
||||
GRPCServerRequestDurationView = &view.View{
|
||||
Name: "grpc/server/request_duration_ms",
|
||||
Measure: ocgrpc.ServerLatency,
|
||||
Description: "grpc Request duration in ms",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyGRPCMethod, ocgrpc.KeyServerStatus, TagKeyGRPCService},
|
||||
Aggregation: DefaultMillisecondsDistribution,
|
||||
}
|
||||
|
||||
// GRPCServerResponseSizeView is an OpenCensus view which tracks GRPC Server
|
||||
// response size by pomerium service, grpc service, grpc method, and status
|
||||
GRPCServerResponseSizeView = &view.View{
|
||||
Name: "grpc/server/response_size_bytes",
|
||||
Measure: ocgrpc.ServerSentBytesPerRPC,
|
||||
Description: "grpc Server Response Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyGRPCMethod, ocgrpc.KeyServerStatus, TagKeyGRPCService},
|
||||
Aggregation: grpcSizeDistribution,
|
||||
}
|
||||
|
||||
// GRPCServerRequestSizeView is an OpenCensus view which tracks GRPC Server
|
||||
// request size by pomerium service, grpc service, grpc method, and status
|
||||
GRPCServerRequestSizeView = &view.View{
|
||||
Name: "grpc/server/request_size_bytes",
|
||||
Measure: ocgrpc.ServerReceivedBytesPerRPC,
|
||||
Description: "grpc Server Request Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyGRPCMethod, ocgrpc.KeyServerStatus, TagKeyGRPCService},
|
||||
Aggregation: grpcSizeDistribution,
|
||||
}
|
||||
|
||||
// GRPCClientRequestCountView is an OpenCensus view which tracks GRPC Client
|
||||
// requests by pomerium service, target host, grpc service, grpc method, and status
|
||||
GRPCClientRequestCountView = &view.View{
|
||||
Name: "grpc/client/requests_total",
|
||||
Measure: ocgrpc.ClientRoundtripLatency,
|
||||
Description: "Total grpc Client Requests",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyGRPCMethod, TagKeyGRPCService, ocgrpc.KeyClientStatus},
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
|
||||
// GRPCClientRequestDurationView is an OpenCensus view which tracks GRPC Client
|
||||
// request duration by pomerium service, target host, grpc service, grpc method, and status
|
||||
GRPCClientRequestDurationView = &view.View{
|
||||
Name: "grpc/client/request_duration_ms",
|
||||
Measure: ocgrpc.ClientRoundtripLatency,
|
||||
Description: "grpc Client Request duration in ms",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyGRPCMethod, TagKeyGRPCService, ocgrpc.KeyClientStatus},
|
||||
Aggregation: DefaultMillisecondsDistribution,
|
||||
}
|
||||
|
||||
// GRPCClientResponseSizeView is an OpenCensus view which tracks GRPC Client
|
||||
// response size by pomerium service, target host, grpc service, grpc method, and status
|
||||
GRPCClientResponseSizeView = &view.View{
|
||||
Name: "grpc/client/response_size_bytes",
|
||||
Measure: ocgrpc.ClientReceivedBytesPerRPC,
|
||||
Description: "grpc Client Response Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyGRPCMethod, TagKeyGRPCService, ocgrpc.KeyClientStatus},
|
||||
Aggregation: grpcSizeDistribution,
|
||||
}
|
||||
|
||||
// GRPCClientRequestSizeView is an OpenCensus view which tracks GRPC Client
|
||||
// request size by pomerium service, target host, grpc service, grpc method, and status
|
||||
GRPCClientRequestSizeView = &view.View{
|
||||
Name: "grpc/client/request_size_bytes",
|
||||
Measure: ocgrpc.ClientSentBytesPerRPC,
|
||||
Description: "grpc Client Request Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyGRPCMethod, TagKeyGRPCService, ocgrpc.KeyClientStatus},
|
||||
Aggregation: grpcSizeDistribution,
|
||||
}
|
||||
)
|
||||
|
||||
// GRPCClientInterceptor creates a UnaryClientInterceptor which updates the RPC
|
||||
// context with metric tag metadata
|
||||
func GRPCClientInterceptor(service string) grpc.UnaryClientInterceptor {
|
||||
return func(
|
||||
ctx context.Context,
|
||||
method string,
|
||||
req interface{},
|
||||
reply interface{},
|
||||
cc *grpc.ClientConn,
|
||||
invoker grpc.UnaryInvoker,
|
||||
opts ...grpc.CallOption) error {
|
||||
|
||||
// Split the method into parts for better slicing
|
||||
rpcInfo := strings.SplitN(method, "/", 3)
|
||||
var rpcMethod string
|
||||
var rpcService string
|
||||
if len(rpcInfo) == 3 {
|
||||
rpcService = rpcInfo[1]
|
||||
rpcMethod = rpcInfo[2]
|
||||
}
|
||||
|
||||
taggedCtx, tagErr := tag.New(
|
||||
ctx,
|
||||
tag.Insert(TagKeyService, service),
|
||||
tag.Insert(TagKeyHost, cc.Target()),
|
||||
tag.Insert(TagKeyGRPCMethod, rpcMethod),
|
||||
tag.Insert(TagKeyGRPCService, rpcService),
|
||||
)
|
||||
if tagErr != nil {
|
||||
log.Warn().Err(tagErr).Str("context", "GRPCClientInterceptor").Msg("internal/telemetry: Failed to create context")
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
|
||||
// Calls the invoker to execute RPC
|
||||
return invoker(taggedCtx, method, req, reply, cc, opts...)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// GRPCServerStatsHandler provides a grpc stats.Handler for a pomerium service to add tags and track
|
||||
// metrics to server side calls
|
||||
type GRPCServerStatsHandler struct {
|
||||
service string
|
||||
grpcstats.Handler
|
||||
}
|
||||
|
||||
// TagRPC implements grpc.stats.Handler and adds tags to the context of a given RPC
|
||||
func (h *GRPCServerStatsHandler) TagRPC(ctx context.Context, tagInfo *grpcstats.RPCTagInfo) context.Context {
|
||||
|
||||
handledCtx := h.Handler.TagRPC(ctx, tagInfo)
|
||||
|
||||
// Split the method into parts for better slicing
|
||||
rpcInfo := strings.SplitN(tagInfo.FullMethodName, "/", 3)
|
||||
var rpcMethod string
|
||||
var rpcService string
|
||||
if len(rpcInfo) == 3 {
|
||||
rpcService = rpcInfo[1]
|
||||
rpcMethod = rpcInfo[2]
|
||||
}
|
||||
|
||||
taggedCtx, tagErr := tag.New(
|
||||
handledCtx,
|
||||
tag.Insert(TagKeyService, h.service),
|
||||
tag.Insert(TagKeyGRPCMethod, rpcMethod),
|
||||
tag.Insert(TagKeyGRPCService, rpcService),
|
||||
)
|
||||
if tagErr != nil {
|
||||
log.Warn().Err(tagErr).Str("context", "GRPCServerStatsHandler").Msg("internal/telemetry: Failed to create context")
|
||||
return handledCtx
|
||||
|
||||
}
|
||||
|
||||
return taggedCtx
|
||||
}
|
||||
|
||||
// NewGRPCServerStatsHandler creates a new GRPCServerStatsHandler for a pomerium service
|
||||
func NewGRPCServerStatsHandler(service string) grpcstats.Handler {
|
||||
return &GRPCServerStatsHandler{service: service, Handler: &ocgrpc.ServerHandler{}}
|
||||
}
|
184
internal/telemetry/metrics/grpc_test.go
Normal file
184
internal/telemetry/metrics/grpc_test.go
Normal file
|
@ -0,0 +1,184 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"go.opencensus.io/plugin/ocgrpc"
|
||||
"go.opencensus.io/stats/view"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type testProto struct {
|
||||
message string
|
||||
}
|
||||
|
||||
func (t testProto) Reset() {}
|
||||
func (t testProto) ProtoMessage() {}
|
||||
func (t testProto) String() string {
|
||||
return t.message
|
||||
}
|
||||
|
||||
func (t testProto) XXX_Size() int {
|
||||
return len([]byte(t.message))
|
||||
}
|
||||
|
||||
func (t testProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
return []byte(t.message), nil
|
||||
}
|
||||
|
||||
type testInvoker struct {
|
||||
invokeResult error
|
||||
statsHandler stats.Handler
|
||||
}
|
||||
|
||||
func (t testInvoker) UnaryInvoke(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
|
||||
r := reply.(*testProto)
|
||||
r.message = "hello"
|
||||
|
||||
ctx = t.statsHandler.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
|
||||
t.statsHandler.HandleRPC(ctx, &stats.InPayload{Client: true, Length: len(r.message)})
|
||||
t.statsHandler.HandleRPC(ctx, &stats.OutPayload{Client: true, Length: len(r.message)})
|
||||
t.statsHandler.HandleRPC(ctx, &stats.End{Client: true, Error: t.invokeResult})
|
||||
|
||||
return t.invokeResult
|
||||
}
|
||||
|
||||
func newTestCC(t *testing.T) *grpc.ClientConn {
|
||||
testCC, err := grpc.Dial("dns:localhost:9999", grpc.WithInsecure())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create testCC: %s", err)
|
||||
}
|
||||
return testCC
|
||||
}
|
||||
func Test_GRPCClientInterceptor(t *testing.T) {
|
||||
|
||||
interceptor := GRPCClientInterceptor("test_service")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
method string
|
||||
errorCode error
|
||||
wantgrpcClientResponseSize string
|
||||
wantgrpcClientRequestDuration string
|
||||
wantgrpcClientRequestCount string
|
||||
wantgrpcClientRequestSize string
|
||||
}{
|
||||
{
|
||||
name: "ok authorize",
|
||||
method: "/authorize.Authorizer/Authorize",
|
||||
errorCode: nil,
|
||||
wantgrpcClientResponseSize: "{ { {grpc_client_status OK}{grpc_method Authorize}{grpc_service authorize.Authorizer}{host dns:localhost:9999}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wantgrpcClientRequestDuration: "{ { {grpc_client_status OK}{grpc_method Authorize}{grpc_service authorize.Authorizer}{host dns:localhost:9999}{service test_service} }&{1",
|
||||
wantgrpcClientRequestCount: "{ { {grpc_client_status OK}{grpc_method Authorize}{grpc_service authorize.Authorizer}{host dns:localhost:9999}{service test_service} }&{1",
|
||||
wantgrpcClientRequestSize: "{ { {grpc_client_status OK}{grpc_method Authorize}{grpc_service authorize.Authorizer}{host dns:localhost:9999}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
},
|
||||
{
|
||||
name: "unknown validate",
|
||||
method: "/authenticate.Authenticator/Validate",
|
||||
errorCode: status.Error(14, ""),
|
||||
wantgrpcClientResponseSize: "{ { {grpc_client_status UNAVAILABLE}{grpc_method Validate}{grpc_service authenticate.Authenticator}{host dns:localhost:9999}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wantgrpcClientRequestDuration: "{ { {grpc_client_status UNAVAILABLE}{grpc_method Validate}{grpc_service authenticate.Authenticator}{host dns:localhost:9999}{service test_service} }&{1",
|
||||
wantgrpcClientRequestCount: "{ { {grpc_client_status UNAVAILABLE}{grpc_method Validate}{grpc_service authenticate.Authenticator}{host dns:localhost:9999}{service test_service} }&{1",
|
||||
wantgrpcClientRequestSize: "{ { {grpc_client_status UNAVAILABLE}{grpc_method Validate}{grpc_service authenticate.Authenticator}{host dns:localhost:9999}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
},
|
||||
{
|
||||
name: "broken method parsing",
|
||||
method: "f",
|
||||
errorCode: status.Error(14, ""),
|
||||
wantgrpcClientResponseSize: "{ { {grpc_client_status UNAVAILABLE}{host dns:localhost:9999}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wantgrpcClientRequestDuration: "{ { {grpc_client_status UNAVAILABLE}{host dns:localhost:9999}{service test_service} }&{1",
|
||||
wantgrpcClientRequestCount: "{ { {grpc_client_status UNAVAILABLE}{host dns:localhost:9999}{service test_service} }&{1",
|
||||
wantgrpcClientRequestSize: "{ { {grpc_client_status UNAVAILABLE}{host dns:localhost:9999}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
view.Unregister(GRPCClientViews...)
|
||||
view.Register(GRPCClientViews...)
|
||||
|
||||
invoker := testInvoker{
|
||||
invokeResult: tt.errorCode,
|
||||
statsHandler: &ocgrpc.ClientHandler{},
|
||||
}
|
||||
var reply testProto
|
||||
|
||||
interceptor(context.Background(), tt.method, nil, &reply, newTestCC(t), invoker.UnaryInvoke)
|
||||
|
||||
testDataRetrieval(GRPCClientResponseSizeView, t, tt.wantgrpcClientResponseSize)
|
||||
testDataRetrieval(GRPCClientRequestDurationView, t, tt.wantgrpcClientRequestDuration)
|
||||
testDataRetrieval(GRPCClientRequestCountView, t, tt.wantgrpcClientRequestCount)
|
||||
testDataRetrieval(GRPCClientRequestSizeView, t, tt.wantgrpcClientRequestSize)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func mockServerRPCHandle(statsHandler stats.Handler, method string, errorCode error) {
|
||||
message := "hello"
|
||||
ctx := statsHandler.TagRPC(context.Background(), &stats.RPCTagInfo{FullMethodName: method})
|
||||
statsHandler.HandleRPC(ctx, &stats.InPayload{Client: false, Length: len(message)})
|
||||
statsHandler.HandleRPC(ctx, &stats.OutPayload{Client: false, Length: len(message)})
|
||||
statsHandler.HandleRPC(ctx, &stats.End{Client: false, Error: errorCode})
|
||||
|
||||
}
|
||||
func Test_GRPCServerStatsHandler(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
method string
|
||||
errorCode error
|
||||
wantgrpcServerResponseSize string
|
||||
wantgrpcServerRequestDuration string
|
||||
wantgrpcServerRequestCount string
|
||||
wantgrpcServerRequestSizeView string
|
||||
}{
|
||||
{
|
||||
name: "ok authorize",
|
||||
method: "/authorize.Authorizer/Authorize",
|
||||
errorCode: nil,
|
||||
wantgrpcServerResponseSize: "{ { {grpc_method Authorize}{grpc_server_status OK}{grpc_service authorize.Authorizer}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wantgrpcServerRequestDuration: "{ { {grpc_method Authorize}{grpc_server_status OK}{grpc_service authorize.Authorizer}{service test_service} }&{1",
|
||||
wantgrpcServerRequestCount: "{ { {grpc_method Authorize}{grpc_server_status OK}{grpc_service authorize.Authorizer}{service test_service} }&{1",
|
||||
wantgrpcServerRequestSizeView: "{ { {grpc_method Authorize}{grpc_server_status OK}{grpc_service authorize.Authorizer}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
},
|
||||
{
|
||||
name: "unknown validate",
|
||||
method: "/authenticate.Authenticator/Validate",
|
||||
errorCode: status.Error(14, ""),
|
||||
wantgrpcServerResponseSize: "{ { {grpc_method Validate}{grpc_server_status UNAVAILABLE}{grpc_service authenticate.Authenticator}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wantgrpcServerRequestDuration: "{ { {grpc_method Validate}{grpc_server_status UNAVAILABLE}{grpc_service authenticate.Authenticator}{service test_service} }&{1",
|
||||
wantgrpcServerRequestCount: "{ { {grpc_method Validate}{grpc_server_status UNAVAILABLE}{grpc_service authenticate.Authenticator}{service test_service} }&{1",
|
||||
wantgrpcServerRequestSizeView: "{ { {grpc_method Validate}{grpc_server_status UNAVAILABLE}{grpc_service authenticate.Authenticator}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
},
|
||||
{
|
||||
name: "broken method parsing",
|
||||
method: "f",
|
||||
errorCode: status.Error(14, ""),
|
||||
wantgrpcServerResponseSize: "{ { {grpc_server_status UNAVAILABLE}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wantgrpcServerRequestDuration: "{ { {grpc_server_status UNAVAILABLE}{service test_service} }&{1",
|
||||
wantgrpcServerRequestCount: "{ { {grpc_server_status UNAVAILABLE}{service test_service} }&{1",
|
||||
wantgrpcServerRequestSizeView: "{ { {grpc_server_status UNAVAILABLE}{service test_service} }&{1 5 5 5 0 [0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
view.Unregister(GRPCServerViews...)
|
||||
view.Register(GRPCServerViews...)
|
||||
|
||||
statsHandler := NewGRPCServerStatsHandler("test_service")
|
||||
mockServerRPCHandle(statsHandler, tt.method, tt.errorCode)
|
||||
|
||||
testDataRetrieval(GRPCServerResponseSizeView, t, tt.wantgrpcServerResponseSize)
|
||||
testDataRetrieval(GRPCServerRequestDurationView, t, tt.wantgrpcServerRequestDuration)
|
||||
testDataRetrieval(GRPCServerRequestCountView, t, tt.wantgrpcServerRequestCount)
|
||||
testDataRetrieval(GRPCServerRequestSizeView, t, tt.wantgrpcServerRequestSizeView)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
38
internal/telemetry/metrics/helpers_test.go
Normal file
38
internal/telemetry/metrics/helpers_test.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"go.opencensus.io/metric/metricdata"
|
||||
)
|
||||
|
||||
func testMetricRetrieval(metrics []*metricdata.Metric, t *testing.T, labels []metricdata.LabelValue, value interface{}, name string) {
|
||||
switch value.(type) {
|
||||
case int64:
|
||||
case float64:
|
||||
case uint64:
|
||||
default:
|
||||
t.Errorf("Got an unexpected type for value: %T", value)
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, metric := range metrics {
|
||||
if metric.Descriptor.Name != name {
|
||||
found = true
|
||||
continue
|
||||
}
|
||||
gotLabels := metric.TimeSeries[0].LabelValues
|
||||
gotValue := metric.TimeSeries[0].Points[0].Value
|
||||
|
||||
if diff := cmp.Diff(gotLabels, labels); diff != "" {
|
||||
t.Errorf("Failed to find metric labels:\n%s", diff)
|
||||
}
|
||||
if diff := cmp.Diff(gotValue, value); diff != "" {
|
||||
t.Errorf("Failed to find metric value:\n%s", diff)
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Could not find metric %s", name)
|
||||
}
|
||||
}
|
157
internal/telemetry/metrics/http.go
Normal file
157
internal/telemetry/metrics/http.go
Normal file
|
@ -0,0 +1,157 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/tripper"
|
||||
"go.opencensus.io/plugin/ochttp"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
)
|
||||
|
||||
// HTTP Views
|
||||
var (
|
||||
// HTTPClientViews contains opencensus views for HTTP Client metrics.
|
||||
HTTPClientViews = []*view.View{
|
||||
HTTPClientRequestCountView,
|
||||
HTTPClientRequestDurationView,
|
||||
HTTPClientResponseSizeView}
|
||||
// HTTPServerViews contains opencensus views for HTTP Server metrics.
|
||||
HTTPServerViews = []*view.View{
|
||||
HTTPServerRequestCountView,
|
||||
HTTPServerRequestDurationView,
|
||||
HTTPServerRequestSizeView,
|
||||
HTTPServerResponseSizeView}
|
||||
|
||||
// HTTPServerRequestCountView is an OpenCensus View that tracks HTTP server
|
||||
// requests by pomerium service, host, method and status
|
||||
HTTPServerRequestCountView = &view.View{
|
||||
Name: "http/server/requests_total",
|
||||
Measure: ochttp.ServerLatency,
|
||||
Description: "Total HTTP Requests",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod, ochttp.StatusCode},
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
|
||||
// HTTPServerRequestDurationView is an OpenCensus view that tracks HTTP
|
||||
// server request duration by pomerium service, host, method and status
|
||||
HTTPServerRequestDurationView = &view.View{
|
||||
Name: "http/server/request_duration_ms",
|
||||
Measure: ochttp.ServerLatency,
|
||||
Description: "HTTP Request duration in ms",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod, ochttp.StatusCode},
|
||||
Aggregation: DefaultHTTPLatencyDistrubtion,
|
||||
}
|
||||
|
||||
// HTTPServerRequestSizeView is an OpenCensus view that tracks HTTP server
|
||||
// request size by pomerium service, host and method
|
||||
HTTPServerRequestSizeView = &view.View{
|
||||
Name: "http/server/request_size_bytes",
|
||||
Measure: ochttp.ServerRequestBytes,
|
||||
Description: "HTTP Server Request Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod},
|
||||
Aggregation: DefaulHTTPSizeDistribution,
|
||||
}
|
||||
|
||||
// HTTPServerResponseSizeView is an OpenCensus view that tracks HTTP server
|
||||
// response size by pomerium service, host, method and status
|
||||
HTTPServerResponseSizeView = &view.View{
|
||||
Name: "http/server/response_size_bytes",
|
||||
Measure: ochttp.ServerResponseBytes,
|
||||
Description: "HTTP Server Response Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod, ochttp.StatusCode},
|
||||
Aggregation: DefaulHTTPSizeDistribution,
|
||||
}
|
||||
|
||||
// HTTPClientRequestCountView is an OpenCensus View that tracks HTTP client
|
||||
// requests by pomerium service, destination, host, method and status
|
||||
HTTPClientRequestCountView = &view.View{
|
||||
Name: "http/client/requests_total",
|
||||
Measure: ochttp.ClientRoundtripLatency,
|
||||
Description: "Total HTTP Client Requests",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod, ochttp.StatusCode, TagKeyDestination},
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
|
||||
// HTTPClientRequestDurationView is an OpenCensus view that tracks HTTP
|
||||
// client request duration by pomerium service, destination, host, method and status
|
||||
HTTPClientRequestDurationView = &view.View{
|
||||
Name: "http/client/request_duration_ms",
|
||||
Measure: ochttp.ClientRoundtripLatency,
|
||||
Description: "HTTP Client Request duration in ms",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod, ochttp.StatusCode, TagKeyDestination},
|
||||
Aggregation: DefaultHTTPLatencyDistrubtion,
|
||||
}
|
||||
|
||||
// HTTPClientResponseSizeView is an OpenCensus view that tracks HTTP client
|
||||
// esponse size by pomerium service, destination, host, method and status
|
||||
HTTPClientResponseSizeView = &view.View{
|
||||
Name: "http/client/response_size_bytes",
|
||||
Measure: ochttp.ClientReceivedBytes,
|
||||
Description: "HTTP Client Response Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod, ochttp.StatusCode, TagKeyDestination},
|
||||
Aggregation: DefaulHTTPSizeDistribution,
|
||||
}
|
||||
|
||||
// HTTPClientRequestSizeView is an OpenCensus view that tracks HTTP client
|
||||
//request size by pomerium service, destination, host and method
|
||||
HTTPClientRequestSizeView = &view.View{
|
||||
Name: "http/client/response_size_bytes",
|
||||
Measure: ochttp.ClientSentBytes,
|
||||
Description: "HTTP Client Response Size in bytes",
|
||||
TagKeys: []tag.Key{TagKeyService, TagKeyHost, TagKeyHTTPMethod, TagKeyDestination},
|
||||
Aggregation: DefaulHTTPSizeDistribution,
|
||||
}
|
||||
)
|
||||
|
||||
// HTTPMetricsHandler creates a metrics middleware for incoming HTTP requests
|
||||
func HTTPMetricsHandler(service string) func(next http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, tagErr := tag.New(
|
||||
r.Context(),
|
||||
tag.Insert(TagKeyService, service),
|
||||
tag.Insert(TagKeyHost, r.Host),
|
||||
tag.Insert(TagKeyHTTPMethod, r.Method),
|
||||
)
|
||||
if tagErr != nil {
|
||||
log.Warn().Err(tagErr).Str("context", "HTTPMetricsHandler").
|
||||
Msg("telemetry/metrics: failed to create metrics tag")
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
ocHandler := ochttp.Handler{
|
||||
Handler: next,
|
||||
FormatSpanName: func(r *http.Request) string {
|
||||
return fmt.Sprintf("%s%s", r.Host, r.URL.Path)
|
||||
},
|
||||
}
|
||||
ocHandler.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// HTTPMetricsRoundTripper creates a metrics tracking tripper for outbound HTTP Requests
|
||||
func HTTPMetricsRoundTripper(service string, destination string) func(next http.RoundTripper) http.RoundTripper {
|
||||
return func(next http.RoundTripper) http.RoundTripper {
|
||||
return tripper.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
|
||||
ctx, tagErr := tag.New(
|
||||
r.Context(),
|
||||
tag.Insert(TagKeyService, service),
|
||||
tag.Insert(TagKeyHost, r.Host),
|
||||
tag.Insert(TagKeyHTTPMethod, r.Method),
|
||||
tag.Insert(TagKeyDestination, destination),
|
||||
)
|
||||
if tagErr != nil {
|
||||
log.Warn().Err(tagErr).Str("context", "HTTPMetricsRoundTripper").Msg("telemetry/metrics: failed to create metrics tag")
|
||||
return next.RoundTrip(r)
|
||||
}
|
||||
|
||||
ocTransport := ochttp.Transport{Base: next}
|
||||
return ocTransport.RoundTrip(r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
}
|
198
internal/telemetry/metrics/http_test.go
Normal file
198
internal/telemetry/metrics/http_test.go
Normal file
|
@ -0,0 +1,198 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/tripper"
|
||||
"go.opencensus.io/stats/view"
|
||||
)
|
||||
|
||||
func testDataRetrieval(v *view.View, t *testing.T, want string) {
|
||||
if v == nil {
|
||||
t.Fatalf("%s: nil view passed", t.Name())
|
||||
}
|
||||
name := v.Name
|
||||
data, err := view.RetrieveData(name)
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("%s: failed to retrieve data line %s", name, err)
|
||||
}
|
||||
|
||||
if want != "" && len(data) != 1 {
|
||||
t.Fatalf("%s: received incorrect number of data rows: %d", name, len(data))
|
||||
}
|
||||
if want == "" && len(data) > 0 {
|
||||
t.Fatalf("%s: received incorrect number of data rows: %d", name, len(data))
|
||||
} else if want == "" {
|
||||
return
|
||||
}
|
||||
|
||||
dataString := data[0].String()
|
||||
|
||||
if want != "" && !strings.HasPrefix(dataString, want) {
|
||||
t.Errorf("%s: Found unexpected data row: \nwant: %s\ngot: %s\n", name, want, dataString)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestMux() http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/good", func(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Fprintf(w, "Hello")
|
||||
})
|
||||
|
||||
return mux
|
||||
}
|
||||
|
||||
func Test_HTTPMetricsHandler(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
url string
|
||||
verb string
|
||||
wanthttpServerRequestSize string
|
||||
wanthttpServerResponseSize string
|
||||
wanthttpServerRequestDuration string
|
||||
wanthttpServerRequestCount string
|
||||
}{
|
||||
{
|
||||
name: "good get",
|
||||
url: "http://test.local/good",
|
||||
verb: "GET",
|
||||
wanthttpServerRequestSize: "{ { {host test.local}{http_method GET}{service test_service} }&{1 0 5e-324 0 0 [1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpServerResponseSize: "{ { {host test.local}{http.status 200}{http_method GET}{service test_service} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpServerRequestDuration: "{ { {host test.local}{http.status 200}{http_method GET}{service test_service} }&{1",
|
||||
wanthttpServerRequestCount: "{ { {host test.local}{http.status 200}{http_method GET}{service test_service} }&{1",
|
||||
},
|
||||
{
|
||||
name: "good post",
|
||||
url: "http://test.local/good",
|
||||
verb: "POST",
|
||||
wanthttpServerRequestSize: "{ { {host test.local}{http_method POST}{service test_service} }&{1 0 5e-324 0 0 [1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpServerResponseSize: "{ { {host test.local}{http.status 200}{http_method POST}{service test_service} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpServerRequestDuration: "{ { {host test.local}{http.status 200}{http_method POST}{service test_service} }&{1",
|
||||
wanthttpServerRequestCount: "{ { {host test.local}{http.status 200}{http_method POST}{service test_service} }&{1",
|
||||
},
|
||||
{
|
||||
name: "bad post",
|
||||
url: "http://test.local/bad",
|
||||
verb: "POST",
|
||||
wanthttpServerRequestSize: "{ { {host test.local}{http_method POST}{service test_service} }&{1 0 5e-324 0 0 [1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpServerResponseSize: "{ { {host test.local}{http.status 404}{http_method POST}{service test_service} }&{1 19 19 19 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpServerRequestDuration: "{ { {host test.local}{http.status 404}{http_method POST}{service test_service} }&{1",
|
||||
wanthttpServerRequestCount: "{ { {host test.local}{http.status 404}{http_method POST}{service test_service} }&{1",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
view.Unregister(HTTPServerRequestCountView, HTTPServerRequestDurationView, HTTPServerRequestSizeView, HTTPServerResponseSizeView)
|
||||
view.Register(HTTPServerRequestCountView, HTTPServerRequestDurationView, HTTPServerRequestSizeView, HTTPServerResponseSizeView)
|
||||
|
||||
req := httptest.NewRequest(tt.verb, tt.url, new(bytes.Buffer))
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
h := HTTPMetricsHandler("test_service")(newTestMux())
|
||||
h.ServeHTTP(rec, req)
|
||||
|
||||
testDataRetrieval(HTTPServerRequestSizeView, t, tt.wanthttpServerRequestSize)
|
||||
testDataRetrieval(HTTPServerResponseSizeView, t, tt.wanthttpServerResponseSize)
|
||||
testDataRetrieval(HTTPServerRequestDurationView, t, tt.wanthttpServerRequestDuration)
|
||||
testDataRetrieval(HTTPServerRequestCountView, t, tt.wanthttpServerRequestCount)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newTestTransport() http.RoundTripper {
|
||||
return tripper.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
|
||||
resp := httptest.NewRecorder()
|
||||
newTestMux().ServeHTTP(resp, r)
|
||||
resp.Flush()
|
||||
result := resp.Result()
|
||||
|
||||
// This really looks like a regression / bug?
|
||||
// https://github.com/golang/go/issues/16952
|
||||
result.ContentLength = int64(len(resp.Body.Bytes()))
|
||||
return result, nil
|
||||
})
|
||||
}
|
||||
|
||||
func newFailingTestTransport() http.RoundTripper {
|
||||
return tripper.RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
|
||||
return nil, errors.New("failure")
|
||||
})
|
||||
}
|
||||
|
||||
func Test_HTTPMetricsRoundTripper(t *testing.T) {
|
||||
chain := tripper.NewChain(HTTPMetricsRoundTripper("test_service", "test_destination"))
|
||||
rt := chain.Then(newTestTransport())
|
||||
client := http.Client{Transport: rt}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
url string
|
||||
verb string
|
||||
wanthttpClientRequestSize string
|
||||
wanthttpClientResponseSize string
|
||||
wanthttpClientRequestDuration string
|
||||
wanthttpClientRequestCount string
|
||||
}{
|
||||
{
|
||||
name: "good get",
|
||||
url: "http://test.local/good",
|
||||
verb: "GET",
|
||||
wanthttpClientRequestSize: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method GET}{service test_service} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpClientResponseSize: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method GET}{service test_service} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpClientRequestDuration: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method GET}{service test_service} }&{1",
|
||||
wanthttpClientRequestCount: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method GET}{service test_service} }&{1",
|
||||
},
|
||||
{
|
||||
name: "good post",
|
||||
url: "http://test.local/good",
|
||||
verb: "POST",
|
||||
wanthttpClientRequestSize: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method POST}{service test_service} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpClientResponseSize: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method POST}{service test_service} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpClientRequestDuration: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method POST}{service test_service} }&{1",
|
||||
wanthttpClientRequestCount: "{ { {destination test_destination}{host test.local}{http.status 200}{http_method POST}{service test_service} }&{1",
|
||||
},
|
||||
{
|
||||
name: "bad post",
|
||||
url: "http://test.local/bad",
|
||||
verb: "POST",
|
||||
wanthttpClientRequestSize: "{ { {destination test_destination}{host test.local}{http.status 404}{http_method POST}{service test_service} }&{1 19 19 19 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpClientResponseSize: "{ { {destination test_destination}{host test.local}{http.status 404}{http_method POST}{service test_service} }&{1 19 19 19 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]",
|
||||
wanthttpClientRequestDuration: "{ { {destination test_destination}{host test.local}{http.status 404}{http_method POST}{service test_service} }&{1",
|
||||
wanthttpClientRequestCount: "{ { {destination test_destination}{host test.local}{http.status 404}{http_method POST}{service test_service} }&{1",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
view.Unregister(HTTPClientRequestCountView, HTTPClientRequestDurationView, HTTPClientResponseSizeView, HTTPClientRequestSizeView)
|
||||
view.Register(HTTPClientRequestCountView, HTTPClientRequestDurationView, HTTPClientResponseSizeView, HTTPClientRequestSizeView)
|
||||
|
||||
req, _ := http.NewRequest(tt.verb, tt.url, new(bytes.Buffer))
|
||||
resp, err := client.Do(req)
|
||||
// must be done to record()
|
||||
ioutil.ReadAll(resp.Body)
|
||||
|
||||
t.Logf("response: %#v, %#v\n\n", resp, err)
|
||||
testDataRetrieval(HTTPClientRequestSizeView, t, tt.wanthttpClientRequestSize)
|
||||
testDataRetrieval(HTTPClientResponseSizeView, t, tt.wanthttpClientResponseSize)
|
||||
testDataRetrieval(HTTPClientRequestDurationView, t, tt.wanthttpClientRequestDuration)
|
||||
testDataRetrieval(HTTPClientRequestCountView, t, tt.wanthttpClientRequestCount)
|
||||
})
|
||||
}
|
||||
|
||||
// Check for transport Errors
|
||||
client = http.Client{Transport: chain.Then(newFailingTestTransport())}
|
||||
req, _ := http.NewRequest("GET", "http://test.local", new(bytes.Buffer))
|
||||
resp, err := client.Do(req)
|
||||
if err == nil || resp != nil {
|
||||
t.Error("Transport error not surfaced properly")
|
||||
}
|
||||
}
|
194
internal/telemetry/metrics/info.go
Normal file
194
internal/telemetry/metrics/info.go
Normal file
|
@ -0,0 +1,194 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/version"
|
||||
|
||||
"go.opencensus.io/metric"
|
||||
"go.opencensus.io/metric/metricdata"
|
||||
"go.opencensus.io/metric/metricproducer"
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/tag"
|
||||
)
|
||||
|
||||
var (
|
||||
// InfoViews contains opencensus views for informational metrics about
|
||||
// pomerium itself.
|
||||
InfoViews = []*view.View{ConfigLastReloadView, ConfigLastReloadSuccessView}
|
||||
|
||||
configLastReload = stats.Int64(
|
||||
"config_last_reload_success_timestamp",
|
||||
"Timestamp of last successful config reload",
|
||||
"seconds")
|
||||
configLastReloadSuccess = stats.Int64(
|
||||
"config_last_reload_success",
|
||||
"Returns 1 if last reload was successful",
|
||||
"1")
|
||||
registry = newMetricRegistry()
|
||||
|
||||
// ConfigLastReloadView contains the timestamp the configuration was last
|
||||
// reloaded, labeled by service.
|
||||
ConfigLastReloadView = &view.View{
|
||||
Name: configLastReload.Name(),
|
||||
Description: configLastReload.Description(),
|
||||
Measure: configLastReload,
|
||||
TagKeys: []tag.Key{TagKeyService},
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
|
||||
// ConfigLastReloadSuccessView contains the result of the last configuration
|
||||
// reload, labeled by service.
|
||||
ConfigLastReloadSuccessView = &view.View{
|
||||
Name: configLastReloadSuccess.Name(),
|
||||
Description: configLastReloadSuccess.Description(),
|
||||
Measure: configLastReloadSuccess,
|
||||
TagKeys: []tag.Key{TagKeyService},
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
)
|
||||
|
||||
// SetConfigInfo records the status, checksum and timestamp of a configuration
|
||||
// reload. You must register InfoViews or the related config views before calling
|
||||
func SetConfigInfo(service string, success bool, checksum string) {
|
||||
|
||||
if success {
|
||||
serviceTag := tag.Insert(TagKeyService, service)
|
||||
if err := stats.RecordWithTags(
|
||||
context.Background(),
|
||||
[]tag.Mutator{serviceTag},
|
||||
configLastReload.M(time.Now().Unix()),
|
||||
); err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to record config checksum timestamp")
|
||||
}
|
||||
|
||||
if err := stats.RecordWithTags(
|
||||
context.Background(),
|
||||
[]tag.Mutator{serviceTag},
|
||||
configLastReloadSuccess.M(1),
|
||||
); err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to record config reload")
|
||||
}
|
||||
} else {
|
||||
stats.Record(context.Background(), configLastReloadSuccess.M(0))
|
||||
}
|
||||
}
|
||||
|
||||
// metricRegistry holds the non-view metrics and handles safe
|
||||
// initialization and updates. Behavior without using newMetricRegistry()
|
||||
// is undefined.
|
||||
type metricRegistry struct {
|
||||
registry *metric.Registry
|
||||
buildInfo *metric.Int64Gauge
|
||||
policyCount *metric.Int64DerivedGauge
|
||||
configChecksum *metric.Float64Gauge
|
||||
sync.Once
|
||||
}
|
||||
|
||||
func newMetricRegistry() *metricRegistry {
|
||||
r := new(metricRegistry)
|
||||
r.init()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *metricRegistry) init() {
|
||||
r.Do(
|
||||
func() {
|
||||
r.registry = metric.NewRegistry()
|
||||
var err error
|
||||
r.buildInfo, err = r.registry.AddInt64Gauge("build_info",
|
||||
metric.WithDescription("Build Metadata"),
|
||||
metric.WithLabelKeys("service", "version", "revision", "goversion"),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to register build info metric")
|
||||
}
|
||||
|
||||
r.configChecksum, err = r.registry.AddFloat64Gauge("config_checksum_decimal",
|
||||
metric.WithDescription("Config checksum represented in decimal notation"),
|
||||
metric.WithLabelKeys("service"),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to register config checksum metric")
|
||||
}
|
||||
|
||||
r.policyCount, err = r.registry.AddInt64DerivedGauge("policy_count_total",
|
||||
metric.WithDescription("Total number of policies loaded"),
|
||||
metric.WithLabelKeys("service"),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to register policy count metric")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// SetBuildInfo records the pomerium build info. You must call RegisterInfoMetrics to
|
||||
// have this exported
|
||||
func (r *metricRegistry) setBuildInfo(service string) {
|
||||
if registry.buildInfo == nil {
|
||||
return
|
||||
}
|
||||
m, err := registry.buildInfo.GetEntry(
|
||||
metricdata.NewLabelValue(service),
|
||||
metricdata.NewLabelValue(version.FullVersion()),
|
||||
metricdata.NewLabelValue(version.GitCommit),
|
||||
metricdata.NewLabelValue((runtime.Version())),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to get build info metric")
|
||||
}
|
||||
|
||||
// This sets our build_info metric to a constant 1 per
|
||||
// https://www.robustperception.io/exposing-the-software-version-to-prometheus
|
||||
m.Set(1)
|
||||
}
|
||||
|
||||
// SetBuildInfo records the pomerium build info. You must call RegisterInfoMetrics to
|
||||
// have this exported
|
||||
func SetBuildInfo(service string) {
|
||||
registry.setBuildInfo(service)
|
||||
}
|
||||
|
||||
// Register non-view based metrics registry globally for export
|
||||
func RegisterInfoMetrics() {
|
||||
metricproducer.GlobalManager().AddProducer(registry.registry)
|
||||
}
|
||||
|
||||
func (r *metricRegistry) setConfigChecksum(service string, checksum uint64) {
|
||||
if r.configChecksum == nil {
|
||||
return
|
||||
}
|
||||
m, err := r.configChecksum.GetEntry(metricdata.NewLabelValue(service))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to get config checksum metric")
|
||||
}
|
||||
m.Set(float64(checksum))
|
||||
}
|
||||
|
||||
// SetConfigChecksum creates the configuration checksum metric. You must call RegisterInfoMetrics to
|
||||
// have this exported
|
||||
func SetConfigChecksum(service string, checksum uint64) {
|
||||
registry.setConfigChecksum(service, checksum)
|
||||
}
|
||||
|
||||
func (r *metricRegistry) addPolicyCountCallback(service string, f func() int64) {
|
||||
if r.policyCount == nil {
|
||||
return
|
||||
}
|
||||
err := r.policyCount.UpsertEntry(f, metricdata.NewLabelValue(service))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("internal/telemetry: failed to get policy count metric")
|
||||
}
|
||||
}
|
||||
|
||||
// AddPolicyCountCallback sets the function to call when exporting the
|
||||
// policy count metric. You must call RegisterInfoMetrics to have this
|
||||
// exported
|
||||
func AddPolicyCountCallback(service string, f func() int64) {
|
||||
registry.addPolicyCountCallback(service, f)
|
||||
}
|
85
internal/telemetry/metrics/info_test.go
Normal file
85
internal/telemetry/metrics/info_test.go
Normal file
|
@ -0,0 +1,85 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/version"
|
||||
|
||||
"go.opencensus.io/metric/metricdata"
|
||||
"go.opencensus.io/metric/metricproducer"
|
||||
"go.opencensus.io/stats/view"
|
||||
)
|
||||
|
||||
func Test_SetConfigInfo(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
success bool
|
||||
checksum string
|
||||
wantLastReload string
|
||||
wantLastReloadSuccess string
|
||||
}{
|
||||
{"success", true, "abcde", "{ { {service test_service} }&{1.", "{ { {service test_service} }&{1} }"},
|
||||
{"failed", false, "abcde", "", "{ { }&{0} }"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
view.Unregister(InfoViews...)
|
||||
view.Register(InfoViews...)
|
||||
SetConfigInfo("test_service", tt.success, tt.checksum)
|
||||
|
||||
testDataRetrieval(ConfigLastReloadView, t, tt.wantLastReload)
|
||||
testDataRetrieval(ConfigLastReloadSuccessView, t, tt.wantLastReloadSuccess)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_SetBuildInfo(t *testing.T) {
|
||||
registry = newMetricRegistry()
|
||||
|
||||
version.Version = "v0.0.1"
|
||||
version.GitCommit = "deadbeef"
|
||||
|
||||
wantLabels := []metricdata.LabelValue{
|
||||
{Value: "test_service", Present: true},
|
||||
{Value: version.FullVersion(), Present: true},
|
||||
{Value: version.GitCommit, Present: true},
|
||||
{Value: runtime.Version(), Present: true},
|
||||
}
|
||||
|
||||
SetBuildInfo("test_service")
|
||||
testMetricRetrieval(registry.registry.Read(), t, wantLabels, int64(1), "build_info")
|
||||
}
|
||||
|
||||
func Test_AddPolicyCountCallback(t *testing.T) {
|
||||
registry = newMetricRegistry()
|
||||
|
||||
wantValue := int64(42)
|
||||
wantLabels := []metricdata.LabelValue{{Value: "test_service", Present: true}}
|
||||
AddPolicyCountCallback("test_service", func() int64 { return wantValue })
|
||||
|
||||
testMetricRetrieval(registry.registry.Read(), t, wantLabels, wantValue, "policy_count_total")
|
||||
}
|
||||
|
||||
func Test_SetConfigChecksum(t *testing.T) {
|
||||
registry = newMetricRegistry()
|
||||
|
||||
wantValue := uint64(42)
|
||||
wantLabels := []metricdata.LabelValue{{Value: "test_service", Present: true}}
|
||||
SetConfigChecksum("test_service", wantValue)
|
||||
|
||||
testMetricRetrieval(registry.registry.Read(), t, wantLabels, float64(wantValue), "config_checksum_decimal")
|
||||
}
|
||||
|
||||
func Test_RegisterInfoMetrics(t *testing.T) {
|
||||
metricproducer.GlobalManager().DeleteProducer(registry.registry)
|
||||
RegisterInfoMetrics()
|
||||
// Make sure registration de-dupes on multiple calls
|
||||
RegisterInfoMetrics()
|
||||
|
||||
r := metricproducer.GlobalManager().GetAll()
|
||||
if len(r) != 2 {
|
||||
t.Error("Did not find enough registries")
|
||||
}
|
||||
}
|
39
internal/telemetry/metrics/providers.go
Normal file
39
internal/telemetry/metrics/providers.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
ocprom "contrib.go.opencensus.io/exporter/prometheus"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"go.opencensus.io/stats/view"
|
||||
)
|
||||
|
||||
// PrometheusHandler creates an exporter that exports stats to Prometheus
|
||||
// and returns a handler suitable for exporting metrics.
|
||||
func PrometheusHandler() (http.Handler, error) {
|
||||
if err := registerDefaultViews(); err != nil {
|
||||
return nil, fmt.Errorf("internal/telemetry: failed registering views")
|
||||
}
|
||||
reg := prom.DefaultRegisterer.(*prom.Registry)
|
||||
exporter, err := ocprom.NewExporter(
|
||||
ocprom.Options{
|
||||
Namespace: "pomerium",
|
||||
Registry: reg,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("internal/telemetry: prometheus exporter: %v", err)
|
||||
}
|
||||
view.RegisterExporter(exporter)
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", exporter)
|
||||
return mux, nil
|
||||
}
|
||||
|
||||
func registerDefaultViews() error {
|
||||
var views []*view.View
|
||||
for _, v := range DefaultViews {
|
||||
views = append(views, v...)
|
||||
}
|
||||
return view.Register(views...)
|
||||
}
|
29
internal/telemetry/metrics/providers_test.go
Normal file
29
internal/telemetry/metrics/providers_test.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package metrics // import "github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http/httptest"
|
||||
"regexp"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_PrometheusHandler(t *testing.T) {
|
||||
h, err := PrometheusHandler()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := httptest.NewRequest("GET", "http://test.local/metrics", new(bytes.Buffer))
|
||||
rec := httptest.NewRecorder()
|
||||
h.ServeHTTP(rec, req)
|
||||
resp := rec.Result()
|
||||
b, _ := ioutil.ReadAll(resp.Body)
|
||||
|
||||
if resp == nil || resp.StatusCode != 200 {
|
||||
t.Errorf("Metrics endpoint failed to respond: %s", b)
|
||||
}
|
||||
|
||||
if m, _ := regexp.Match("^# HELP .*", b); !m {
|
||||
t.Errorf("Metrics endpoint did not contain any help messages: %s", b)
|
||||
}
|
||||
}
|
74
internal/telemetry/trace/trace.go
Normal file
74
internal/telemetry/trace/trace.go
Normal file
|
@ -0,0 +1,74 @@
|
|||
package trace // import "github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
|
||||
"contrib.go.opencensus.io/exporter/jaeger"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
JaegerTracingProviderName = "jaeger"
|
||||
)
|
||||
|
||||
// TracingOptions contains the configurations settings for a http server.
|
||||
type TracingOptions struct {
|
||||
// Shared
|
||||
Provider string
|
||||
Service string
|
||||
Debug bool
|
||||
|
||||
// Jaeger
|
||||
|
||||
// CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector.
|
||||
// For example, http://localhost:14268/api/traces
|
||||
JaegerCollectorEndpoint string `mapstructure:"tracing_jaeger_collector_endpoint"`
|
||||
// AgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
|
||||
// For example, localhost:6831.
|
||||
JaegerAgentEndpoint string `mapstructure:"tracing_jaeger_agent_endpoint"`
|
||||
}
|
||||
|
||||
func RegisterTracing(opts *TracingOptions) error {
|
||||
var err error
|
||||
switch opts.Provider {
|
||||
case JaegerTracingProviderName:
|
||||
err = registerJaeger(opts)
|
||||
default:
|
||||
return fmt.Errorf("telemetry/trace: provider %s unknown", opts.Provider)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if opts.Debug {
|
||||
log.Debug().Msg("telemetry/trace: debug on, sample everything")
|
||||
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
|
||||
}
|
||||
log.Debug().Interface("Opts", opts).Msg("telemetry/trace: exporter created")
|
||||
return nil
|
||||
}
|
||||
|
||||
func registerJaeger(opts *TracingOptions) error {
|
||||
jex, err := jaeger.NewExporter(
|
||||
jaeger.Options{
|
||||
AgentEndpoint: opts.JaegerAgentEndpoint,
|
||||
CollectorEndpoint: opts.JaegerCollectorEndpoint,
|
||||
ServiceName: opts.Service,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trace.RegisterExporter(jex)
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartSpan starts a new child span of the current span in the context. If
|
||||
// there is no span in the context, creates a new trace and span.
|
||||
//
|
||||
// Returned context contains the newly created span. You can use it to
|
||||
// propagate the returned span in process.
|
||||
func StartSpan(ctx context.Context, name string, o ...trace.StartOption) (context.Context, *trace.Span) {
|
||||
return trace.StartSpan(ctx, name, o...)
|
||||
}
|
23
internal/telemetry/trace/trace_test.go
Normal file
23
internal/telemetry/trace/trace_test.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
package trace // import "github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestRegisterTracing(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
opts *TracingOptions
|
||||
wantErr bool
|
||||
}{
|
||||
{"jaeger", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger"}, false},
|
||||
{"jaeger with debug", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger", Debug: true}, false},
|
||||
{"jaeger no endpoint", &TracingOptions{JaegerAgentEndpoint: "", Service: "all", Provider: "jaeger"}, true},
|
||||
{"unknown provider", &TracingOptions{JaegerAgentEndpoint: "localhost:0", Service: "all", Provider: "Lucius Cornelius Sulla"}, true},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if err := RegisterTracing(tt.opts); (err != nil) != tt.wantErr {
|
||||
t.Errorf("RegisterTracing() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue