envoyconfig: use zipkin tracer (#2265)

This commit is contained in:
Caleb Doxsey 2021-06-03 09:28:00 -06:00 committed by GitHub
parent 9dc90d02d0
commit c3286aa355
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 258 additions and 130 deletions

View file

@ -12,7 +12,6 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/trace"
)
// BuildBootstrapAdmin builds the admin config for the envoy bootstrap.
@ -29,8 +28,8 @@ func (b *Builder) BuildBootstrapAdmin(cfg *config.Config) (*envoy_config_bootstr
}
// BuildBootstrapStaticResources builds the static resources for the envoy bootstrap. It includes the control plane
// cluster as well as a datadog-apm cluster (if datadog is used).
func (b *Builder) BuildBootstrapStaticResources(cfg *config.Config) (*envoy_config_bootstrap_v3.Bootstrap_StaticResources, error) {
// cluster.
func (b *Builder) BuildBootstrapStaticResources() (*envoy_config_bootstrap_v3.Bootstrap_StaticResources, error) {
grpcAddr, err := parseAddress(b.localGRPCAddress)
if err != nil {
return nil, fmt.Errorf("envoyconfig: invalid local gRPC address: %w", err)
@ -72,44 +71,6 @@ func (b *Builder) BuildBootstrapStaticResources(cfg *config.Config) (*envoy_conf
},
}
if cfg.Options.TracingProvider == trace.DatadogTracingProviderName {
addr, _ := parseAddress("127.0.0.1:8126")
if cfg.Options.TracingDatadogAddress != "" {
addr, err = parseAddress(cfg.Options.TracingDatadogAddress)
if err != nil {
return nil, fmt.Errorf("envoyconfig: invalid tracing datadog address: %w", err)
}
}
staticCfg.Clusters = append(staticCfg.Clusters, &envoy_config_cluster_v3.Cluster{
Name: "datadog-apm",
ConnectTimeout: &durationpb.Duration{
Seconds: 5,
},
ClusterDiscoveryType: &envoy_config_cluster_v3.Cluster_Type{
Type: envoy_config_cluster_v3.Cluster_STATIC,
},
LbPolicy: envoy_config_cluster_v3.Cluster_ROUND_ROBIN,
LoadAssignment: &envoy_config_endpoint_v3.ClusterLoadAssignment{
ClusterName: "datadog-apm",
Endpoints: []*envoy_config_endpoint_v3.LocalityLbEndpoints{
{
LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{
{
HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_config_endpoint_v3.Endpoint{
Address: addr,
},
},
},
},
},
},
},
})
}
return staticCfg, nil
}

View file

@ -7,7 +7,6 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/config/envoyconfig/filemgr"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/testutil"
)
@ -44,11 +43,7 @@ func TestBuilder_BuildBootstrapAdmin(t *testing.T) {
func TestBuilder_BuildBootstrapStaticResources(t *testing.T) {
t.Run("valid", func(t *testing.T) {
b := New("localhost:1111", "localhost:2222", filemgr.NewManager(), nil)
staticCfg, err := b.BuildBootstrapStaticResources(&config.Config{
Options: &config.Options{
TracingProvider: trace.DatadogTracingProviderName,
},
})
staticCfg, err := b.BuildBootstrapStaticResources()
assert.NoError(t, err)
testutil.AssertProtoJSONEqual(t, `
{
@ -73,26 +68,6 @@ func TestBuilder_BuildBootstrapStaticResources(t *testing.T) {
}]
}]
}
},
{
"name": "datadog-apm",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "datadog-apm",
"endpoints": [{
"lbEndpoints": [{
"endpoint": {
"address": {
"socketAddress":{
"address": "127.0.0.1",
"portValue": 8126
}
}
}
}]
}]
}
}
]
}
@ -100,19 +75,7 @@ func TestBuilder_BuildBootstrapStaticResources(t *testing.T) {
})
t.Run("bad gRPC address", func(t *testing.T) {
b := New("xyz:zyx", "localhost:2222", filemgr.NewManager(), nil)
_, err := b.BuildBootstrapStaticResources(&config.Config{
Options: &config.Options{},
})
assert.Error(t, err)
})
t.Run("bad datadog address", func(t *testing.T) {
b := New("localhost:1111", "localhost:2222", filemgr.NewManager(), nil)
_, err := b.BuildBootstrapStaticResources(&config.Config{
Options: &config.Options{
TracingProvider: trace.DatadogTracingProviderName,
TracingDatadogAddress: "not-valid:zyx",
},
})
_, err := b.BuildBootstrapStaticResources()
assert.Error(t, err)
})
}

View file

@ -64,6 +64,13 @@ func (b *Builder) BuildClusters(ctx context.Context, cfg *config.Config) ([]*env
authZ,
}
tracingCluster, err := buildTracingCluster(cfg.Options)
if err != nil {
return nil, err
} else if tracingCluster != nil {
clusters = append(clusters, tracingCluster)
}
if config.IsProxy(cfg.Options.Services) {
for i, p := range cfg.Options.GetAllPolicies() {
policy := p
@ -335,18 +342,7 @@ func (b *Builder) buildCluster(
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": marshalAny(buildUpstreamProtocolOptions(endpoints, forceHTTP2)),
}
// for IPs we use a static discovery type, otherwise we use DNS
allIP := true
for _, lbe := range lbEndpoints {
if net.ParseIP(urlutil.StripPort(lbe.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())) == nil {
allIP = false
}
}
if allIP {
cluster.ClusterDiscoveryType = &envoy_config_cluster_v3.Cluster_Type{Type: envoy_config_cluster_v3.Cluster_STATIC}
} else {
cluster.ClusterDiscoveryType = &envoy_config_cluster_v3.Cluster_Type{Type: envoy_config_cluster_v3.Cluster_STRICT_DNS}
}
cluster.ClusterDiscoveryType = getClusterDiscoveryType(lbEndpoints)
return cluster.Validate()
}
@ -478,3 +474,17 @@ func validateClusterNamesUnique(clusters []*envoy_config_cluster_v3.Cluster) err
return nil
}
func getClusterDiscoveryType(lbEndpoints []*envoy_config_endpoint_v3.LbEndpoint) *envoy_config_cluster_v3.Cluster_Type {
// for IPs we use a static discovery type, otherwise we use DNS
allIP := true
for _, lbe := range lbEndpoints {
if net.ParseIP(urlutil.StripPort(lbe.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())) == nil {
allIP = false
}
}
if allIP {
return &envoy_config_cluster_v3.Cluster_Type{Type: envoy_config_cluster_v3.Cluster_STATIC}
}
return &envoy_config_cluster_v3.Cluster_Type{Type: envoy_config_cluster_v3.Cluster_STRICT_DNS}
}

View file

@ -432,7 +432,7 @@ func (b *Builder) buildMainHTTPConnectionManagerFilter(
if err != nil {
return nil, err
}
tracingProvider, err := b.buildTracingProvider(options)
tracingProvider, err := buildTracingHTTP(options)
if err != nil {
return nil, err
}

View file

@ -2,15 +2,99 @@ package envoyconfig
import (
"fmt"
"net"
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_config_trace_v3 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/telemetry/trace"
)
func (b *Builder) buildTracingProvider(options *config.Options) (*envoy_config_trace_v3.Tracing_Http, error) {
func buildTracingCluster(options *config.Options) (*envoy_config_cluster_v3.Cluster, error) {
tracingOptions, err := config.NewTracingOptions(options)
if err != nil {
return nil, fmt.Errorf("envoyconfig: invalid tracing config: %w", err)
}
switch tracingOptions.Provider {
case trace.DatadogTracingProviderName:
addr, _ := parseAddress("127.0.0.1:8126")
if options.TracingDatadogAddress != "" {
addr, err = parseAddress(options.TracingDatadogAddress)
if err != nil {
return nil, fmt.Errorf("envoyconfig: invalid tracing datadog address: %w", err)
}
}
endpoints := []*envoy_config_endpoint_v3.LbEndpoint{{
HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_config_endpoint_v3.Endpoint{
Address: addr,
},
},
}}
return &envoy_config_cluster_v3.Cluster{
Name: "datadog-apm",
ConnectTimeout: &durationpb.Duration{
Seconds: 5,
},
ClusterDiscoveryType: getClusterDiscoveryType(endpoints),
LbPolicy: envoy_config_cluster_v3.Cluster_ROUND_ROBIN,
LoadAssignment: &envoy_config_endpoint_v3.ClusterLoadAssignment{
ClusterName: "datadog-apm",
Endpoints: []*envoy_config_endpoint_v3.LocalityLbEndpoints{{
LbEndpoints: endpoints,
}},
},
}, nil
case trace.ZipkinTracingProviderName:
host := tracingOptions.ZipkinEndpoint.Host
if _, port, _ := net.SplitHostPort(host); port == "" {
if tracingOptions.ZipkinEndpoint.Scheme == "https" {
host = net.JoinHostPort(host, "443")
} else {
host = net.JoinHostPort(host, "80")
}
}
addr, err := parseAddress(host)
if err != nil {
return nil, fmt.Errorf("envoyconfig: invalid tracing zipkin address: %w", err)
}
endpoints := []*envoy_config_endpoint_v3.LbEndpoint{{
HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_config_endpoint_v3.Endpoint{
Address: addr,
},
},
}}
return &envoy_config_cluster_v3.Cluster{
Name: "zipkin",
ConnectTimeout: &durationpb.Duration{
Seconds: 5,
},
ClusterDiscoveryType: getClusterDiscoveryType(endpoints),
LbPolicy: envoy_config_cluster_v3.Cluster_ROUND_ROBIN,
LoadAssignment: &envoy_config_endpoint_v3.ClusterLoadAssignment{
ClusterName: "zipkin",
Endpoints: []*envoy_config_endpoint_v3.LocalityLbEndpoints{{
LbEndpoints: endpoints,
}},
},
}, nil
default:
return nil, nil
}
}
func buildTracingHTTP(options *config.Options) (*envoy_config_trace_v3.Tracing_Http, error) {
tracingOptions, err := config.NewTracingOptions(options)
if err != nil {
return nil, fmt.Errorf("invalid tracing config: %w", err)
@ -29,29 +113,13 @@ func (b *Builder) buildTracingProvider(options *config.Options) (*envoy_config_t
},
}, nil
case trace.ZipkinTracingProviderName:
if tracingOptions.ZipkinEndpoint.String() == "" {
return nil, fmt.Errorf("missing zipkin url")
}
tracingTC, _ := anypb.New(
&envoy_config_trace_v3.OpenCensusConfig{
ZipkinExporterEnabled: true,
ZipkinUrl: tracingOptions.ZipkinEndpoint.String(),
IncomingTraceContext: []envoy_config_trace_v3.OpenCensusConfig_TraceContext{
envoy_config_trace_v3.OpenCensusConfig_B3,
envoy_config_trace_v3.OpenCensusConfig_TRACE_CONTEXT,
envoy_config_trace_v3.OpenCensusConfig_CLOUD_TRACE_CONTEXT,
envoy_config_trace_v3.OpenCensusConfig_GRPC_TRACE_BIN,
},
OutgoingTraceContext: []envoy_config_trace_v3.OpenCensusConfig_TraceContext{
envoy_config_trace_v3.OpenCensusConfig_B3,
envoy_config_trace_v3.OpenCensusConfig_TRACE_CONTEXT,
envoy_config_trace_v3.OpenCensusConfig_GRPC_TRACE_BIN,
},
},
)
tracingTC, _ := anypb.New(&envoy_config_trace_v3.ZipkinConfig{
CollectorCluster: "zipkin",
CollectorEndpoint: tracingOptions.ZipkinEndpoint.Path,
CollectorEndpointVersion: envoy_config_trace_v3.ZipkinConfig_HTTP_JSON,
})
return &envoy_config_trace_v3.Tracing_Http{
Name: "envoy.tracers.opencensus",
Name: "envoy.tracers.zipkin",
ConfigType: &envoy_config_trace_v3.Tracing_Http_TypedConfig{
TypedConfig: tracingTC,
},

View file

@ -0,0 +1,135 @@
package envoyconfig
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/testutil"
)
func TestBuildTracingCluster(t *testing.T) {
t.Run("datadog", func(t *testing.T) {
c, err := buildTracingCluster(&config.Options{
TracingProvider: "datadog",
})
require.NoError(t, err)
testutil.AssertProtoJSONEqual(t, `
{
"name": "datadog-apm",
"type": "STATIC",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "datadog-apm",
"endpoints": [{
"lbEndpoints": [{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8126
}
}
}
}]
}]
}
}
`, c)
c, err = buildTracingCluster(&config.Options{
TracingProvider: "datadog",
TracingDatadogAddress: "example.com:8126",
})
require.NoError(t, err)
testutil.AssertProtoJSONEqual(t, `
{
"name": "datadog-apm",
"type": "STRICT_DNS",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "datadog-apm",
"endpoints": [{
"lbEndpoints": [{
"endpoint": {
"address": {
"socketAddress": {
"address": "example.com",
"portValue": 8126
}
}
}
}]
}]
}
}
`, c)
})
t.Run("zipkin", func(t *testing.T) {
c, err := buildTracingCluster(&config.Options{
TracingProvider: "zipkin",
ZipkinEndpoint: "https://example.com/api/v2/spans",
})
require.NoError(t, err)
testutil.AssertProtoJSONEqual(t, `
{
"name": "zipkin",
"type": "STRICT_DNS",
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "zipkin",
"endpoints": [{
"lbEndpoints": [{
"endpoint": {
"address": {
"socketAddress": {
"address": "example.com",
"portValue": 443
}
}
}
}]
}]
}
}
`, c)
})
}
func TestBuildTracingHTTP(t *testing.T) {
t.Run("datadog", func(t *testing.T) {
h, err := buildTracingHTTP(&config.Options{
TracingProvider: "datadog",
})
require.NoError(t, err)
testutil.AssertProtoJSONEqual(t, `
{
"name": "envoy.tracers.datadog",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.trace.v3.DatadogConfig",
"collectorCluster": "datadog-apm",
"serviceName": "pomerium"
}
}
`, h)
})
t.Run("zipkin", func(t *testing.T) {
h, err := buildTracingHTTP(&config.Options{
TracingProvider: "zipkin",
ZipkinEndpoint: "https://example.com/api/v2/spans",
})
require.NoError(t, err)
testutil.AssertProtoJSONEqual(t, `
{
"name": "envoy.tracers.zipkin",
"typedConfig": {
"@type": "type.googleapis.com/envoy.config.trace.v3.ZipkinConfig",
"collectorCluster": "zipkin",
"collectorEndpoint": "/api/v2/spans",
"collectorEndpointVersion": "HTTP_JSON"
}
}
`, h)
})
}

View file

@ -33,7 +33,6 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/config/envoyconfig"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry/trace"
)
const (
@ -45,9 +44,8 @@ const (
var Checksum string
type serverOptions struct {
services string
logLevel string
tracingOptions trace.TracingOptions
services string
logLevel string
}
// A Server is a pomerium proxy implemented via envoy.
@ -147,16 +145,9 @@ func (srv *Server) update(ctx context.Context, cfg *config.Config) {
srv.mu.Lock()
defer srv.mu.Unlock()
tracingOptions, err := config.NewTracingOptions(cfg.Options)
if err != nil {
log.Error(ctx).Err(err).Str("service", "envoy").Msg("invalid tracing config")
return
}
options := serverOptions{
services: cfg.Options.Services,
logLevel: firstNonEmpty(cfg.Options.ProxyLogLevel, cfg.Options.LogLevel, "debug"),
tracingOptions: *tracingOptions,
services: cfg.Options.Services,
logLevel: firstNonEmpty(cfg.Options.ProxyLogLevel, cfg.Options.LogLevel, "debug"),
}
if cmp.Equal(srv.options, options, cmp.AllowUnexported(serverOptions{})) {
@ -271,7 +262,7 @@ func (srv *Server) buildBootstrapConfig(cfg *config.Config) ([]byte, error) {
},
}
staticCfg, err := srv.builder.BuildBootstrapStaticResources(cfg)
staticCfg, err := srv.builder.BuildBootstrapStaticResources()
if err != nil {
return nil, err
}