mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-06 10:21:05 +02:00
tracing: support dynamic reloading, more aggressive envoy restart (#2262)
* tracing: support dynamic reloading, more aggressive envoy restart * set exporter to nil * actually register tracer
This commit is contained in:
parent
40ddc2c4b3
commit
513859665a
7 changed files with 164 additions and 83 deletions
|
@ -12,7 +12,6 @@ import (
|
||||||
"github.com/pomerium/pomerium/internal/urlutil"
|
"github.com/pomerium/pomerium/internal/urlutil"
|
||||||
|
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
octrace "go.opencensus.io/trace"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TracingOptions are the options for tracing.
|
// TracingOptions are the options for tracing.
|
||||||
|
@ -58,7 +57,7 @@ func NewTracingOptions(o *Options) (*TracingOptions, error) {
|
||||||
type TraceManager struct {
|
type TraceManager struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
traceOpts *TracingOptions
|
traceOpts *TracingOptions
|
||||||
exporter octrace.Exporter
|
provider trace.Provider
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTraceManager creates a new TraceManager.
|
// NewTraceManager creates a new TraceManager.
|
||||||
|
@ -77,10 +76,11 @@ func (mgr *TraceManager) Close() error {
|
||||||
mgr.mu.Lock()
|
mgr.mu.Lock()
|
||||||
defer mgr.mu.Unlock()
|
defer mgr.mu.Unlock()
|
||||||
|
|
||||||
if mgr.exporter != nil {
|
var err error
|
||||||
trace.UnregisterTracing(mgr.exporter)
|
if mgr.provider != nil {
|
||||||
|
err = mgr.provider.Unregister()
|
||||||
}
|
}
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnConfigChange updates the manager whenever the configuration is changed.
|
// OnConfigChange updates the manager whenever the configuration is changed.
|
||||||
|
@ -100,9 +100,9 @@ func (mgr *TraceManager) OnConfigChange(ctx context.Context, cfg *Config) {
|
||||||
}
|
}
|
||||||
mgr.traceOpts = traceOpts
|
mgr.traceOpts = traceOpts
|
||||||
|
|
||||||
if mgr.exporter != nil {
|
if mgr.provider != nil {
|
||||||
trace.UnregisterTracing(mgr.exporter)
|
_ = mgr.provider.Unregister()
|
||||||
mgr.exporter = nil
|
mgr.provider = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if !traceOpts.Enabled() {
|
if !traceOpts.Enabled() {
|
||||||
|
@ -111,7 +111,13 @@ func (mgr *TraceManager) OnConfigChange(ctx context.Context, cfg *Config) {
|
||||||
|
|
||||||
log.Info(ctx).Interface("options", traceOpts).Msg("trace: starting exporter")
|
log.Info(ctx).Interface("options", traceOpts).Msg("trace: starting exporter")
|
||||||
|
|
||||||
mgr.exporter, err = trace.RegisterTracing(traceOpts)
|
mgr.provider, err = trace.GetProvider(traceOpts)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(ctx).Err(err).Msg("trace: failed to register exporter")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = mgr.provider.Register(traceOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(ctx).Err(err).Msg("trace: failed to register exporter")
|
log.Error(ctx).Err(err).Msg("trace: failed to register exporter")
|
||||||
return
|
return
|
||||||
|
|
|
@ -76,10 +76,19 @@ func (srv *Server) prepareRunEnvoyCommand(ctx context.Context, sharedArgs []stri
|
||||||
|
|
||||||
restartEpoch.Lock()
|
restartEpoch.Lock()
|
||||||
if baseID, ok := readBaseID(); ok {
|
if baseID, ok := readBaseID(); ok {
|
||||||
args = append(args, "--base-id", strconv.Itoa(baseID), "--restart-epoch", strconv.Itoa(restartEpoch.value))
|
args = append(args,
|
||||||
|
"--base-id", strconv.Itoa(baseID),
|
||||||
|
"--restart-epoch", strconv.Itoa(restartEpoch.value),
|
||||||
|
"--drain-time-s", "60",
|
||||||
|
"--parent-shutdown-time-s", "120",
|
||||||
|
"--drain-strategy", "immediate",
|
||||||
|
)
|
||||||
restartEpoch.value++
|
restartEpoch.value++
|
||||||
} else {
|
} else {
|
||||||
args = append(args, "--use-dynamic-base-id", "--base-id-path", baseIDPath)
|
args = append(args,
|
||||||
|
"--use-dynamic-base-id",
|
||||||
|
"--base-id-path", baseIDPath,
|
||||||
|
)
|
||||||
restartEpoch.value = 1
|
restartEpoch.value = 1
|
||||||
}
|
}
|
||||||
restartEpoch.Unlock()
|
restartEpoch.Unlock()
|
||||||
|
|
34
internal/telemetry/trace/datadog.go
Normal file
34
internal/telemetry/trace/datadog.go
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
package trace
|
||||||
|
|
||||||
|
import (
|
||||||
|
datadog "github.com/DataDog/opencensus-go-exporter-datadog"
|
||||||
|
octrace "go.opencensus.io/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
type datadogProvider struct {
|
||||||
|
exporter *datadog.Exporter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *datadogProvider) Register(opts *TracingOptions) error {
|
||||||
|
dOpts := datadog.Options{
|
||||||
|
Service: opts.Service,
|
||||||
|
TraceAddr: opts.DatadogAddress,
|
||||||
|
}
|
||||||
|
dex, err := datadog.NewExporter(dOpts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
octrace.RegisterExporter(dex)
|
||||||
|
provider.exporter = dex
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *datadogProvider) Unregister() error {
|
||||||
|
if provider.exporter == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
octrace.UnregisterExporter(provider.exporter)
|
||||||
|
provider.exporter.Stop()
|
||||||
|
provider.exporter = nil
|
||||||
|
return nil
|
||||||
|
}
|
37
internal/telemetry/trace/jaeger.go
Normal file
37
internal/telemetry/trace/jaeger.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package trace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"contrib.go.opencensus.io/exporter/jaeger"
|
||||||
|
octrace "go.opencensus.io/trace"
|
||||||
|
)
|
||||||
|
|
||||||
|
type jaegerProvider struct {
|
||||||
|
exporter *jaeger.Exporter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *jaegerProvider) Register(opts *TracingOptions) error {
|
||||||
|
jOpts := jaeger.Options{
|
||||||
|
ServiceName: opts.Service,
|
||||||
|
AgentEndpoint: opts.JaegerAgentEndpoint,
|
||||||
|
}
|
||||||
|
if opts.JaegerCollectorEndpoint != nil {
|
||||||
|
jOpts.CollectorEndpoint = opts.JaegerCollectorEndpoint.String()
|
||||||
|
}
|
||||||
|
jex, err := jaeger.NewExporter(jOpts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
octrace.RegisterExporter(jex)
|
||||||
|
provider.exporter = jex
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *jaegerProvider) Unregister() error {
|
||||||
|
if provider.exporter == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
octrace.UnregisterExporter(provider.exporter)
|
||||||
|
provider.exporter.Flush()
|
||||||
|
provider.exporter = nil
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -5,12 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
|
||||||
"contrib.go.opencensus.io/exporter/jaeger"
|
octrace "go.opencensus.io/trace"
|
||||||
ocZipkin "contrib.go.opencensus.io/exporter/zipkin"
|
|
||||||
datadog "github.com/DataDog/opencensus-go-exporter-datadog"
|
|
||||||
"github.com/openzipkin/zipkin-go"
|
|
||||||
zipkinHTTP "github.com/openzipkin/zipkin-go/reporter/http"
|
|
||||||
"go.opencensus.io/trace"
|
|
||||||
|
|
||||||
"github.com/pomerium/pomerium/internal/log"
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
)
|
)
|
||||||
|
@ -24,6 +19,12 @@ const (
|
||||||
ZipkinTracingProviderName = "zipkin"
|
ZipkinTracingProviderName = "zipkin"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Provider is a trace provider.
|
||||||
|
type Provider interface {
|
||||||
|
Register(options *TracingOptions) error
|
||||||
|
Unregister() error
|
||||||
|
}
|
||||||
|
|
||||||
// TracingOptions contains the configurations settings for a http server.
|
// TracingOptions contains the configurations settings for a http server.
|
||||||
type TracingOptions struct {
|
type TracingOptions struct {
|
||||||
// Shared
|
// Shared
|
||||||
|
@ -58,75 +59,23 @@ func (t *TracingOptions) Enabled() bool {
|
||||||
return t.Provider != ""
|
return t.Provider != ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterTracing creates a new trace exporter from TracingOptions.
|
// GetProvider creates a new trace provider from TracingOptions.
|
||||||
func RegisterTracing(opts *TracingOptions) (trace.Exporter, error) {
|
func GetProvider(opts *TracingOptions) (Provider, error) {
|
||||||
var exporter trace.Exporter
|
var provider Provider
|
||||||
var err error
|
|
||||||
switch opts.Provider {
|
switch opts.Provider {
|
||||||
case DatadogTracingProviderName:
|
case DatadogTracingProviderName:
|
||||||
exporter, err = registerDatadog(opts)
|
provider = new(datadogProvider)
|
||||||
case JaegerTracingProviderName:
|
case JaegerTracingProviderName:
|
||||||
exporter, err = registerJaeger(opts)
|
provider = new(jaegerProvider)
|
||||||
case ZipkinTracingProviderName:
|
case ZipkinTracingProviderName:
|
||||||
exporter, err = registerZipkin(opts)
|
provider = new(zipkinProvider)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("telemetry/trace: provider %s unknown", opts.Provider)
|
return nil, fmt.Errorf("telemetry/trace: provider %s unknown", opts.Provider)
|
||||||
}
|
}
|
||||||
if err != nil {
|
octrace.ApplyConfig(octrace.Config{DefaultSampler: octrace.ProbabilitySampler(opts.SampleRate)})
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(opts.SampleRate)})
|
|
||||||
|
|
||||||
log.Debug(context.TODO()).Interface("Opts", opts).Msg("telemetry/trace: exporter created")
|
log.Debug(context.TODO()).Interface("Opts", opts).Msg("telemetry/trace: provider created")
|
||||||
return exporter, nil
|
return provider, nil
|
||||||
}
|
|
||||||
|
|
||||||
// UnregisterTracing unregisters a trace exporter.
|
|
||||||
func UnregisterTracing(exporter trace.Exporter) {
|
|
||||||
trace.UnregisterExporter(exporter)
|
|
||||||
}
|
|
||||||
|
|
||||||
func registerDatadog(opts *TracingOptions) (trace.Exporter, error) {
|
|
||||||
dOpts := datadog.Options{
|
|
||||||
Service: opts.Service,
|
|
||||||
TraceAddr: opts.DatadogAddress,
|
|
||||||
}
|
|
||||||
dex, err := datadog.NewExporter(dOpts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
trace.RegisterExporter(dex)
|
|
||||||
return dex, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func registerJaeger(opts *TracingOptions) (trace.Exporter, error) {
|
|
||||||
jOpts := jaeger.Options{
|
|
||||||
ServiceName: opts.Service,
|
|
||||||
AgentEndpoint: opts.JaegerAgentEndpoint,
|
|
||||||
}
|
|
||||||
if opts.JaegerCollectorEndpoint != nil {
|
|
||||||
jOpts.CollectorEndpoint = opts.JaegerCollectorEndpoint.String()
|
|
||||||
}
|
|
||||||
jex, err := jaeger.NewExporter(jOpts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
trace.RegisterExporter(jex)
|
|
||||||
return jex, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func registerZipkin(opts *TracingOptions) (trace.Exporter, error) {
|
|
||||||
localEndpoint, err := zipkin.NewEndpoint(opts.Service, "")
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("telemetry/trace: could not create local endpoint: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
reporter := zipkinHTTP.NewReporter(opts.ZipkinEndpoint.String())
|
|
||||||
|
|
||||||
exporter := ocZipkin.NewExporter(reporter, localEndpoint)
|
|
||||||
trace.RegisterExporter(exporter)
|
|
||||||
|
|
||||||
return exporter, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartSpan starts a new child span of the current span in the context. If
|
// StartSpan starts a new child span of the current span in the context. If
|
||||||
|
@ -134,6 +83,6 @@ func registerZipkin(opts *TracingOptions) (trace.Exporter, error) {
|
||||||
//
|
//
|
||||||
// Returned context contains the newly created span. You can use it to
|
// Returned context contains the newly created span. You can use it to
|
||||||
// propagate the returned span in process.
|
// propagate the returned span in process.
|
||||||
func StartSpan(ctx context.Context, name string, o ...trace.StartOption) (context.Context, *trace.Span) {
|
func StartSpan(ctx context.Context, name string, o ...octrace.StartOption) (context.Context, *octrace.Span) {
|
||||||
return trace.StartSpan(ctx, name, o...)
|
return octrace.StartSpan(ctx, name, o...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestRegisterTracing(t *testing.T) {
|
func TestGetProvider(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
opts *TracingOptions
|
opts *TracingOptions
|
||||||
|
@ -13,13 +13,13 @@ func TestRegisterTracing(t *testing.T) {
|
||||||
}{
|
}{
|
||||||
{"jaeger", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger"}, false},
|
{"jaeger", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger"}, false},
|
||||||
{"jaeger with debug", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger", Debug: true}, false},
|
{"jaeger with debug", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger", Debug: true}, false},
|
||||||
{"jaeger no endpoint", &TracingOptions{JaegerAgentEndpoint: "", Service: "all", Provider: "jaeger"}, true},
|
{"jaeger no endpoint", &TracingOptions{JaegerAgentEndpoint: "", Service: "all", Provider: "jaeger"}, false},
|
||||||
{"unknown provider", &TracingOptions{JaegerAgentEndpoint: "localhost:0", Service: "all", Provider: "Lucius Cornelius Sulla"}, true},
|
{"unknown provider", &TracingOptions{JaegerAgentEndpoint: "localhost:0", Service: "all", Provider: "Lucius Cornelius Sulla"}, true},
|
||||||
{"zipkin with debug", &TracingOptions{ZipkinEndpoint: &url.URL{Host: "localhost"}, Service: "all", Provider: "zipkin", Debug: true}, false},
|
{"zipkin with debug", &TracingOptions{ZipkinEndpoint: &url.URL{Host: "localhost"}, Service: "all", Provider: "zipkin", Debug: true}, false},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
if _, err := RegisterTracing(tt.opts); (err != nil) != tt.wantErr {
|
if _, err := GetProvider(tt.opts); (err != nil) != tt.wantErr {
|
||||||
t.Errorf("RegisterTracing() error = %v, wantErr %v", err, tt.wantErr)
|
t.Errorf("RegisterTracing() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
46
internal/telemetry/trace/zipkin.go
Normal file
46
internal/telemetry/trace/zipkin.go
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
package trace
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
stdlog "log"
|
||||||
|
|
||||||
|
oczipkin "contrib.go.opencensus.io/exporter/zipkin"
|
||||||
|
"github.com/openzipkin/zipkin-go"
|
||||||
|
"github.com/openzipkin/zipkin-go/reporter"
|
||||||
|
zipkinHTTP "github.com/openzipkin/zipkin-go/reporter/http"
|
||||||
|
octrace "go.opencensus.io/trace"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type zipkinProvider struct {
|
||||||
|
reporter reporter.Reporter
|
||||||
|
exporter *oczipkin.Exporter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *zipkinProvider) Register(opts *TracingOptions) error {
|
||||||
|
localEndpoint, err := zipkin.NewEndpoint(opts.Service, "")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("telemetry/trace: could not create local endpoint: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
provider.reporter = zipkinHTTP.NewReporter(opts.ZipkinEndpoint.String(),
|
||||||
|
zipkinHTTP.Logger(stdlog.New(log.Logger(), "", 0)))
|
||||||
|
provider.exporter = oczipkin.NewExporter(provider.reporter, localEndpoint)
|
||||||
|
octrace.RegisterExporter(provider.exporter)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (provider *zipkinProvider) Unregister() error {
|
||||||
|
if provider.exporter != nil {
|
||||||
|
octrace.UnregisterExporter(provider.exporter)
|
||||||
|
provider.exporter = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if provider.reporter != nil {
|
||||||
|
err = provider.reporter.Close()
|
||||||
|
provider.reporter = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue