From 77338bd4e909bb73cf3c47d0661ee7a160608663 Mon Sep 17 00:00:00 2001 From: Travis Groth Date: Fri, 21 Jun 2019 18:54:39 -0400 Subject: [PATCH] Add GRPC Client Metrics --- docs/reference/readme.md | 3 + go.sum | 2 + internal/metrics/helpers_test.go | 25 +++++ internal/metrics/interceptors.go | 148 ++++++++++++++++++++++++++ internal/metrics/interceptors_test.go | 103 ++++++++++++++++++ internal/metrics/middleware.go | 15 +-- internal/metrics/middleware_test.go | 68 ++---------- internal/metrics/tags.go | 13 +++ proxy/clients/clients.go | 3 + 9 files changed, 311 insertions(+), 69 deletions(-) create mode 100644 internal/metrics/helpers_test.go create mode 100644 internal/metrics/interceptors.go create mode 100644 internal/metrics/interceptors_test.go create mode 100644 internal/metrics/tags.go diff --git a/docs/reference/readme.md b/docs/reference/readme.md index a31eda438..8ec263db3 100644 --- a/docs/reference/readme.md +++ b/docs/reference/readme.md @@ -185,6 +185,9 @@ Expose a prometheus format HTTP endpoint on the specified port. Disabled by def |http_client_requests_total| Counter | Total HTTP client requests made by service| |http_client_response_size_bytes| Histogram | HTTP client response size by service| |http_client_request_duration_ms| Histogram | HTTP client request duration by service| +|grpc_client_requests_total| Counter | Total GRPC client requests made by service| +|grpc_client_response_size_bytes| Histogram | GRPC client response size by service| +|grpc_client_request_duration_ms| Histogram | GRPC client request duration by service| ### Policy diff --git a/go.sum b/go.sum index 15a8ce1d2..64d5da978 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,7 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -57,6 +58,7 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= diff --git a/internal/metrics/helpers_test.go b/internal/metrics/helpers_test.go new file mode 100644 index 000000000..04c06d1eb --- /dev/null +++ b/internal/metrics/helpers_test.go @@ -0,0 +1,25 @@ +package metrics + +import ( + "strings" + "testing" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" +) + +func testDataRetrieval(measure stats.Measure, t *testing.T, want string) { + name := measure.Name() + data, err := view.RetrieveData(name) + + if err != nil { + t.Fatalf("%s: failed to retrieve data line %s", name, err) + } + if len(data) != 1 { + t.Errorf("%s: received too many data rows: %d", name, len(data)) + } + + if !strings.HasPrefix(data[0].String(), want) { + t.Errorf("%s: Found unexpected data row: \nwant: %s\ngot: %s\n", name, want, data[0].String()) + } +} diff --git a/internal/metrics/interceptors.go b/internal/metrics/interceptors.go new file mode 100644 index 000000000..1da783f88 --- /dev/null +++ b/internal/metrics/interceptors.go @@ -0,0 +1,148 @@ +package metrics // import "github.com/pomerium/pomerium/internal/metrics" + +import ( + "context" + "strings" + "time" + + "github.com/golang/protobuf/proto" + "github.com/pomerium/pomerium/internal/log" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "google.golang.org/grpc" + "google.golang.org/grpc/status" +) + +var ( + grpcServerRequestCount = stats.Int64("grpc_server_requests_total", "Total grpc Requests", "1") + grpcServerResponseSize = stats.Int64("grpc_server_response_size_bytes", "grpc Server Response Size in bytes", "bytes") + grpcServerRequestDuration = stats.Int64("grpc_server_request_duration_ms", "grpc Request duration in ms", "ms") + + grpcClientRequestCount = stats.Int64("grpc_client_requests_total", "Total grpc Client Requests", "1") + grpcClientResponseSize = stats.Int64("grpc_client_response_size_bytes", "grpc Client Response Size in bytes", "bytes") + grpcClientRequestDuration = stats.Int64("grpc_client_request_duration_ms", "grpc Client Request duration in ms", "ms") + + grpcViews = []*view.View{ + //grpc Server + { + Name: grpcServerRequestCount.Name(), + Measure: grpcServerRequestCount, + Description: grpcServerRequestCount.Description(), + TagKeys: []tag.Key{keyService, keyHost, keyMethod, keyStatus, keyGRPCService}, + Aggregation: view.Count(), + }, + { + Name: grpcServerRequestDuration.Name(), + Measure: grpcServerRequestDuration, + Description: grpcServerRequestDuration.Description(), + TagKeys: []tag.Key{keyService, keyHost, keyMethod, keyStatus, keyGRPCService}, + Aggregation: view.Distribution( + 1, 2, 5, 7, 10, 25, 500, 750, + 100, 250, 500, 750, + 1000, 2500, 5000, 7500, + 10000, 25000, 50000, 75000, + 100000, + ), + }, + { + Name: grpcServerResponseSize.Name(), + Measure: grpcServerResponseSize, + Description: grpcServerResponseSize.Description(), + TagKeys: []tag.Key{keyService, keyHost, keyMethod, keyStatus, keyGRPCService}, + Aggregation: view.Distribution( + 1, 256, 512, 1024, 2048, 8192, 16384, 32768, 65536, 131072, 262144, 524288, + 1048576, 2097152, 4194304, 8388608, + ), + }, + + //grpc Client + { + Name: grpcClientRequestCount.Name(), + Measure: grpcClientRequestCount, + Description: grpcClientRequestCount.Description(), + TagKeys: []tag.Key{keyService, keyHost, keyMethod, keyStatus, keyGRPCService}, + Aggregation: view.Count(), + }, + { + Name: grpcClientRequestDuration.Name(), + Measure: grpcClientRequestDuration, + Description: grpcClientRequestDuration.Description(), + TagKeys: []tag.Key{keyService, keyHost, keyMethod, keyStatus, keyGRPCService}, + Aggregation: view.Distribution( + 1, 2, 5, 7, 10, 25, 500, 750, + 100, 250, 500, 750, + 1000, 2500, 5000, 7500, + 10000, 25000, 50000, 75000, + 100000, + ), + }, + { + Name: grpcClientResponseSize.Name(), + Measure: grpcClientResponseSize, + Description: grpcClientResponseSize.Description(), + TagKeys: []tag.Key{keyService, keyHost, keyMethod, keyStatus, keyGRPCService}, + Aggregation: view.Distribution( + 1, 256, 512, 1024, 2048, 8192, 16384, 32768, 65536, 131072, 262144, 524288, + 1048576, 2097152, 4194304, 8388608, + ), + }, + } +) + +func init() { + view.Register(grpcViews...) +} + +// GRPCClientInterceptor creates a UnaryClientInterceptor which tracks metrics of grpc client requests +func GRPCClientInterceptor(service string) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req interface{}, + reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption) error { + + startTime := time.Now() + + // Calls the invoker to execute RPC + err := invoker(ctx, method, req, reply, cc, opts...) + + // Split the method into parts for better slicing + rpcInfo := strings.SplitN(method, "/", 3) + var rpcMethod string + var rpcService string + if len(rpcInfo) == 3 { + rpcService = rpcInfo[1] + rpcMethod = rpcInfo[2] + } + + responseStatus, _ := status.FromError(err) + ctx, tagErr := tag.New( + context.Background(), + tag.Insert(keyService, service), + tag.Insert(keyHost, cc.Target()), + tag.Insert(keyMethod, rpcMethod), + tag.Insert(keyGRPCService, rpcService), + tag.Insert(keyStatus, responseStatus.Code().String()), + ) + + if tagErr != nil { + log.Warn().Err(tagErr).Str("context", "HTTPMetricsRoundTripper").Msg("Failed to create context tag") + } else { + responseProto := reply.(proto.Message) + responseSize := proto.Size(responseProto) + + stats.Record(ctx, + grpcClientRequestCount.M(1), + grpcClientRequestDuration.M(time.Since(startTime).Nanoseconds()/int64(time.Millisecond)), + grpcClientResponseSize.M(int64(responseSize)), + ) + } + + return err + } + +} diff --git a/internal/metrics/interceptors_test.go b/internal/metrics/interceptors_test.go new file mode 100644 index 000000000..403653901 --- /dev/null +++ b/internal/metrics/interceptors_test.go @@ -0,0 +1,103 @@ +package metrics + +import ( + "context" + "testing" + + "go.opencensus.io/stats/view" + "google.golang.org/grpc" + "google.golang.org/grpc/status" +) + +type testProto struct { + message string +} + +func (t testProto) Reset() {} +func (t testProto) ProtoMessage() {} +func (t testProto) String() string { + return t.message +} + +func (t testProto) XXX_Size() int { + return len([]byte(t.message)) +} + +func (t testProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return []byte(t.message), nil +} + +type testInvoker struct { + invokeResult error +} + +func (t testInvoker) UnaryInvoke(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + r := reply.(*testProto) + r.message = "hello" + return t.invokeResult +} + +func newTestCC(t *testing.T) *grpc.ClientConn { + testCC, err := grpc.Dial("dns:localhost:9999", grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to create testCC: %s", err) + } + return testCC +} +func Test_GRPCClientInterceptor(t *testing.T) { + + interceptor := GRPCClientInterceptor("test_service") + + tests := []struct { + name string + method string + errorCode error + wantgrpcClientResponseSize string + wantgrpcClientRequestDuration string + wantgrpcClientRequestCount string + }{ + { + name: "ok authorize", + method: "/authorize.Authorizer/Authorize", + errorCode: nil, + wantgrpcClientResponseSize: "{ { {grpc_service authorize.Authorizer}{host dns:localhost:9999}{method Authorize}{service test_service}{status OK} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]", + wantgrpcClientRequestDuration: "{ { {grpc_service authorize.Authorizer}{host dns:localhost:9999}{method Authorize}{service test_service}{status OK} }&{1", + wantgrpcClientRequestCount: "{ { {grpc_service authorize.Authorizer}{host dns:localhost:9999}{method Authorize}{service test_service}{status OK} }&{1", + }, + { + name: "unknown validate", + method: "/authenticate.Authenticator/Validate", + errorCode: status.Error(14, ""), + wantgrpcClientResponseSize: "{ { {grpc_service authenticate.Authenticator}{host dns:localhost:9999}{method Validate}{service test_service}{status Unavailable} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]", + wantgrpcClientRequestDuration: "{ { {grpc_service authenticate.Authenticator}{host dns:localhost:9999}{method Validate}{service test_service}{status Unavailable} }&{1", + wantgrpcClientRequestCount: "{ { {grpc_service authenticate.Authenticator}{host dns:localhost:9999}{method Validate}{service test_service}{status Unavailable} }&{1", + }, + { + name: "broken method parsing", + method: "f", + errorCode: status.Error(14, ""), + wantgrpcClientResponseSize: "{ { {host dns:localhost:9999}{service test_service}{status Unavailable} }&{1 5 5 5 0 [0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]", + wantgrpcClientRequestDuration: "{ { {host dns:localhost:9999}{service test_service}{status Unavailable} }&{1", + wantgrpcClientRequestCount: "{ { {host dns:localhost:9999}{service test_service}{status Unavailable} }&{1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + view.Unregister(grpcViews...) + view.Register(grpcViews...) + + invoker := testInvoker{ + invokeResult: tt.errorCode, + } + var reply testProto + + interceptor(context.Background(), tt.method, nil, &reply, newTestCC(t), invoker.UnaryInvoke) + + testDataRetrieval(grpcClientResponseSize, t, tt.wantgrpcClientResponseSize) + testDataRetrieval(grpcClientRequestDuration, t, tt.wantgrpcClientRequestDuration) + testDataRetrieval(grpcClientRequestCount, t, tt.wantgrpcClientRequestCount) + }) + } +} diff --git a/internal/metrics/middleware.go b/internal/metrics/middleware.go index b0eb88243..3a4ed7bc5 100644 --- a/internal/metrics/middleware.go +++ b/internal/metrics/middleware.go @@ -6,6 +6,8 @@ import ( "strconv" "time" + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/middleware/responsewriter" "github.com/pomerium/pomerium/internal/tripper" "go.opencensus.io/stats" @@ -14,11 +16,6 @@ import ( ) var ( - keyMethod, _ = tag.NewKey("method") - keyStatus, _ = tag.NewKey("status") - keyService, _ = tag.NewKey("service") - keyHost, _ = tag.NewKey("host") - httpServerRequestCount = stats.Int64("http_server_requests_total", "Total HTTP Requests", "1") httpServerResponseSize = stats.Int64("http_server_response_size_bytes", "HTTP Server Response Size in bytes", "bytes") httpServerRequestDuration = stats.Int64("http_server_request_duration_ms", "HTTP Request duration in ms", "ms") @@ -116,7 +113,9 @@ func HTTPMetricsHandler(service string) func(next http.Handler) http.Handler { tag.Insert(keyStatus, strconv.Itoa(m.Status())), ) - if tagErr == nil { + if tagErr != nil { + log.Warn().Err(tagErr).Str("context", "HTTPMetricsHandler").Msg("Failed to create metrics context tag") + } else { stats.Record(ctx, httpServerRequestCount.M(1), httpServerRequestDuration.M(time.Since(startTime).Nanoseconds()/int64(time.Millisecond)), @@ -145,7 +144,9 @@ func HTTPMetricsRoundTripper(service string) func(next http.RoundTripper) http.R tag.Insert(keyStatus, strconv.Itoa(resp.StatusCode)), ) - if tagErr == nil { + if tagErr != nil { + log.Warn().Err(tagErr).Str("context", "HTTPMetricsRoundTripper").Msg("Failed to create context tag") + } else { stats.Record(ctx, httpClientRequestCount.M(1), httpClientRequestDuration.M(time.Since(startTime).Nanoseconds()/int64(time.Millisecond)), diff --git a/internal/metrics/middleware_test.go b/internal/metrics/middleware_test.go index a93e58983..482051740 100644 --- a/internal/metrics/middleware_test.go +++ b/internal/metrics/middleware_test.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "strings" "testing" "github.com/pomerium/pomerium/internal/middleware" @@ -77,36 +76,9 @@ func Test_HTTPMetricsHandler(t *testing.T) { rec := httptest.NewRecorder() chainHandler.ServeHTTP(rec, req) - // httpResponseSize - data, _ := view.RetrieveData(httpServerResponseSize.Name()) - if len(data) != 1 { - t.Errorf("httpServerResponseSize: received wrong number of data rows: %d", len(data)) - return - } - - if !strings.HasPrefix(data[0].String(), tt.wanthttpServerResponseSize) { - t.Errorf("httpServerResponseSize: Found unexpected data row: \nwant: %s\ngot: %s\n", tt.wanthttpServerResponseSize, data[0].String()) - } - - // httpRequestDuration - data, _ = view.RetrieveData(httpServerRequestDuration.Name()) - if len(data) != 1 { - t.Errorf("httpServerRequestDuration: received too many data rows: %d", len(data)) - } - - if !strings.HasPrefix(data[0].String(), tt.wanthttpServerRequestDuration) { - t.Errorf("httpServerRequestDuration: Found unexpected data row: \nwant: %s\ngot: %s\n", tt.wanthttpServerRequestDuration, data[0].String()) - } - - // httpRequestCount - data, _ = view.RetrieveData(httpServerRequestCount.Name()) - if len(data) != 1 { - t.Errorf("httpServerRequestCount: received too many data rows: %d", len(data)) - } - - if !strings.HasPrefix(data[0].String(), tt.wanthttpServerRequestCount) { - t.Errorf("httpServerRequestCount: Found unexpected data row: \nwant: %s\ngot: %s\n", tt.wanthttpServerRequestCount, data[0].String()) - } + testDataRetrieval(httpServerResponseSize, t, tt.wanthttpServerResponseSize) + testDataRetrieval(httpServerRequestDuration, t, tt.wanthttpServerRequestDuration) + testDataRetrieval(httpServerRequestCount, t, tt.wanthttpServerRequestCount) }) } } @@ -178,37 +150,9 @@ func Test_HTTPMetricsRoundTripper(t *testing.T) { resp, err := client.Do(req) t.Logf("response: %#v, %#v", resp, err) - - // httpClientResponseSize - data, _ := view.RetrieveData(httpClientResponseSize.Name()) - if len(data) != 1 { - t.Errorf("httpClientResponseSize: received wrong number of data rows: %d", len(data)) - return - } - - if !strings.HasPrefix(data[0].String(), tt.wanthttpClientResponseSize) { - t.Errorf("httpResponseSize: Found unexpected data row: \nwant: %s\ngot: %s\n", tt.wanthttpClientResponseSize, data[0].String()) - } - - // httpClientRequestDuration - data, _ = view.RetrieveData(httpClientRequestDuration.Name()) - if len(data) != 1 { - t.Errorf("httpClientRequestDuration: received too many data rows: %d", len(data)) - } - - if !strings.HasPrefix(data[0].String(), tt.wanthttpClientRequestDuration) { - t.Errorf("httpClientRequestDuration: Found unexpected data row: \nwant: %s\ngot: %s\n", tt.wanthttpClientRequestDuration, data[0].String()) - } - - // httpClientRequestCount - data, _ = view.RetrieveData(httpClientRequestCount.Name()) - if len(data) != 1 { - t.Errorf("httpRequestCount: received too many data rows: %d", len(data)) - } - - if !strings.HasPrefix(data[0].String(), tt.wanthttpClientRequestCount) { - t.Errorf("httpRequestCount: Found unexpected data row: \nwant: %s\ngot: %s\n", tt.wanthttpClientRequestCount, data[0].String()) - } + testDataRetrieval(httpClientResponseSize, t, tt.wanthttpClientResponseSize) + testDataRetrieval(httpClientRequestDuration, t, tt.wanthttpClientRequestDuration) + testDataRetrieval(httpClientRequestCount, t, tt.wanthttpClientRequestCount) }) } diff --git a/internal/metrics/tags.go b/internal/metrics/tags.go new file mode 100644 index 000000000..f610dd5bf --- /dev/null +++ b/internal/metrics/tags.go @@ -0,0 +1,13 @@ +package metrics + +import ( + "go.opencensus.io/tag" +) + +var ( + keyMethod tag.Key = tag.MustNewKey("method") + keyStatus tag.Key = tag.MustNewKey("status") + keyService tag.Key = tag.MustNewKey("service") + keyGRPCService tag.Key = tag.MustNewKey("grpc_service") + keyHost tag.Key = tag.MustNewKey("host") +) diff --git a/proxy/clients/clients.go b/proxy/clients/clients.go index 7d62976af..ec86d1c37 100644 --- a/proxy/clients/clients.go +++ b/proxy/clients/clients.go @@ -9,6 +9,8 @@ import ( "io/ioutil" "strings" + "github.com/pomerium/pomerium/internal/metrics" + "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -102,5 +104,6 @@ func NewGRPCClientConn(opts *Options) (*grpc.ClientConn, error) { connAddr, grpc.WithTransportCredentials(cert), grpc.WithPerRPCCredentials(grpcAuth), + grpc.WithUnaryInterceptor(metrics.GRPCClientInterceptor("proxy")), ) }