config: allow reloading of telemetry settings (#1255)

* metrics: support dynamic configuration settings

* add test

* trace: update configuration when settings change

* config: allow logging options to be configured when settings change

* envoy: allow changing log settings

* fix unexpected doc change

* fix tests

* pick a port at random

* update based on review
This commit is contained in:
Caleb Doxsey 2020-08-12 08:14:15 -06:00 committed by GitHub
parent 0d611c2a40
commit f822c9a5d2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 538 additions and 266 deletions

View file

@ -55,6 +55,7 @@ type Source interface {
// A StaticSource always returns the same config. Useful for testing.
type StaticSource struct {
cfg *Config
lis []ChangeListener
}
// NewStaticSource creates a new StaticSource.
@ -67,8 +68,18 @@ func (src *StaticSource) GetConfig() *Config {
return src.cfg
}
// SetConfig sets the config.
func (src *StaticSource) SetConfig(cfg *Config) {
src.cfg = cfg
for _, li := range src.lis {
li(cfg)
}
}
// OnConfigChange is ignored for the StaticSource.
func (src *StaticSource) OnConfigChange(ChangeListener) {}
func (src *StaticSource) OnConfigChange(li ChangeListener) {
src.lis = append(src.lis, li)
}
// A FileOrEnvironmentSource retrieves config options from a file or the environment.
type FileOrEnvironmentSource struct {

41
config/log.go Normal file
View file

@ -0,0 +1,41 @@
package config
import (
"sync"
"github.com/pomerium/pomerium/internal/log"
)
// The LogManager configures logging based on options.
type LogManager struct {
mu sync.Mutex
}
// NewLogManager creates a new LogManager.
func NewLogManager(src Source) *LogManager {
mgr := &LogManager{}
src.OnConfigChange(mgr.OnConfigChange)
mgr.OnConfigChange(src.GetConfig())
return mgr
}
// Close closes the log manager.
func (mgr *LogManager) Close() error {
return nil
}
// OnConfigChange is called whenever configuration changes.
func (mgr *LogManager) OnConfigChange(cfg *Config) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
if cfg.Options.Debug {
log.EnableDebug()
} else {
log.DisableDebug()
}
if cfg.Options.LogLevel != "" {
log.SetLevel(cfg.Options.LogLevel)
}
}

98
config/metrics.go Normal file
View file

@ -0,0 +1,98 @@
package config
import (
"net/http"
"sync"
"github.com/pomerium/pomerium/internal/httputil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
)
// A MetricsManager manages metrics for a given configuration.
type MetricsManager struct {
mu sync.Mutex
serviceName string
addr string
srv *http.Server
}
// NewMetricsManager creates a new MetricsManager.
func NewMetricsManager(src Source) *MetricsManager {
mgr := &MetricsManager{}
metrics.RegisterInfoMetrics()
src.OnConfigChange(mgr.OnConfigChange)
mgr.OnConfigChange(src.GetConfig())
return mgr
}
// Close closes any underlying http server.
func (mgr *MetricsManager) Close() error {
mgr.mu.Lock()
defer mgr.mu.Unlock()
var err error
if mgr.srv != nil {
err = mgr.srv.Close()
mgr.srv = nil
}
return err
}
// OnConfigChange updates the metrics manager when configuration is changed.
func (mgr *MetricsManager) OnConfigChange(cfg *Config) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.updateInfo(cfg)
mgr.updateServer(cfg)
}
func (mgr *MetricsManager) updateInfo(cfg *Config) {
serviceName := telemetry.ServiceName(cfg.Options.Services)
if serviceName == mgr.serviceName {
return
}
metrics.SetBuildInfo(serviceName)
mgr.serviceName = serviceName
}
func (mgr *MetricsManager) updateServer(cfg *Config) {
if cfg.Options.MetricsAddr == mgr.addr {
return
}
if mgr.srv != nil {
err := mgr.srv.Close()
if err != nil {
log.Warn().Err(err).Msg("metrics: error closing http server")
}
mgr.srv = nil
}
mgr.addr = cfg.Options.MetricsAddr
if mgr.addr == "" {
log.Info().Msg("metrics: http server disabled")
return
}
log.Info().Str("addr", mgr.addr).Msg("metrics: starting http server")
handler, err := metrics.PrometheusHandler(EnvoyAdminURL)
if err != nil {
log.Error().Err(err).Msg("metrics: failed to create prometheus handler")
return
}
mgr.srv, err = httputil.NewServer(&httputil.ServerOptions{
Addr: mgr.addr,
Insecure: true,
Service: "metrics",
}, handler, new(sync.WaitGroup))
if err != nil {
log.Error().Err(err).Msg("metrics: failed to create metrics http server")
return
}
}

56
config/metrics_test.go Normal file
View file

@ -0,0 +1,56 @@
package config
import (
"fmt"
"net"
"net/http"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMetricsManager(t *testing.T) {
li1, err := net.Listen("tcp", "127.0.0.1:0")
if !assert.NoError(t, err) {
return
}
addr1 := li1.Addr().String()
li2, err := net.Listen("tcp", "127.0.0.1:0")
if !assert.NoError(t, err) {
return
}
addr2 := li2.Addr().String()
li1.Close()
li2.Close()
src := NewStaticSource(&Config{
Options: &Options{
MetricsAddr: addr1,
},
})
mgr := NewMetricsManager(src)
defer mgr.Close()
getStatusCode := func(addr string) int {
res, err := http.Get(fmt.Sprintf("http://%s/metrics", addr))
if err != nil {
return 500
}
defer res.Body.Close()
return res.StatusCode
}
assert.Equal(t, 200, getStatusCode(addr1))
assert.Equal(t, 500, getStatusCode(addr2))
src.SetConfig(&Config{
Options: &Options{
MetricsAddr: addr2,
},
})
assert.Equal(t, 500, getStatusCode(addr1))
assert.Equal(t, 200, getStatusCode(addr2))
}

View file

@ -320,12 +320,6 @@ func newOptionsFromConfig(configFile string) (*Options, error) {
if err != nil {
return nil, fmt.Errorf("config: options from config file %w", err)
}
if o.Debug {
log.SetDebugMode()
}
if o.LogLevel != "" {
log.SetLevel(o.LogLevel)
}
serviceName := telemetry.ServiceName(o.Services)
metrics.AddPolicyCountCallback(serviceName, func() int64 {
return int64(len(o.Policies))

View file

@ -2,44 +2,19 @@ package config
import (
"fmt"
"net/url"
"reflect"
"sync"
octrace "go.opencensus.io/trace"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/urlutil"
)
const (
// JaegerTracingProviderName is the name of the tracing provider Jaeger.
JaegerTracingProviderName = "jaeger"
// ZipkinTracingProviderName is the name of the tracing provider Zipkin.
ZipkinTracingProviderName = "zipkin"
)
// TracingOptions contains the configurations settings for a http server.
type TracingOptions struct {
// Shared
Provider string
Service string
Debug bool
// Jaeger
// CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector.
// For example, http://localhost:14268/api/traces
JaegerCollectorEndpoint *url.URL
// AgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
// For example, localhost:6831.
JaegerAgentEndpoint string
// Zipkin
// ZipkinEndpoint configures the zipkin collector URI
// Example: http://zipkin:9411/api/v2/spans
ZipkinEndpoint *url.URL
// SampleRate is percentage of requests which are sampled
SampleRate float64
}
// TracingOptions are the options for tracing.
type TracingOptions = trace.TracingOptions
// NewTracingOptions builds a new TracingOptions from core Options
func NewTracingOptions(o *Options) (*TracingOptions, error) {
@ -51,7 +26,7 @@ func NewTracingOptions(o *Options) (*TracingOptions, error) {
}
switch o.TracingProvider {
case JaegerTracingProviderName:
case trace.JaegerTracingProviderName:
if o.TracingJaegerCollectorEndpoint != "" {
jaegerCollectorEndpoint, err := urlutil.ParseAndValidateURL(o.TracingJaegerCollectorEndpoint)
if err != nil {
@ -60,7 +35,7 @@ func NewTracingOptions(o *Options) (*TracingOptions, error) {
tracingOpts.JaegerCollectorEndpoint = jaegerCollectorEndpoint
tracingOpts.JaegerAgentEndpoint = o.TracingJaegerAgentEndpoint
}
case ZipkinTracingProviderName:
case trace.ZipkinTracingProviderName:
zipkinEndpoint, err := urlutil.ParseAndValidateURL(o.ZipkinEndpoint)
if err != nil {
return nil, fmt.Errorf("config: invalid zipkin endpoint url: %w", err)
@ -73,10 +48,65 @@ func NewTracingOptions(o *Options) (*TracingOptions, error) {
}
return &tracingOpts, nil
}
// Enabled indicates whether tracing is enabled on a given TracingOptions
func (t *TracingOptions) Enabled() bool {
return t.Provider != ""
// A TraceManager manages setting up a trace exporter based on configuration options.
type TraceManager struct {
mu sync.Mutex
traceOpts *TracingOptions
exporter octrace.Exporter
}
// NewTraceManager creates a new TraceManager.
func NewTraceManager(src Source) *TraceManager {
mgr := &TraceManager{}
src.OnConfigChange(mgr.OnConfigChange)
mgr.OnConfigChange(src.GetConfig())
return mgr
}
// Close closes any underlying trace exporter.
func (mgr *TraceManager) Close() error {
mgr.mu.Lock()
defer mgr.mu.Unlock()
if mgr.exporter != nil {
trace.UnregisterTracing(mgr.exporter)
}
return nil
}
// OnConfigChange updates the manager whenever the configuration is changed.
func (mgr *TraceManager) OnConfigChange(cfg *Config) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
traceOpts, err := NewTracingOptions(cfg.Options)
if err != nil {
log.Error().Err(err).Msg("trace: failed to build tracing options")
return
}
if reflect.DeepEqual(traceOpts, mgr.traceOpts) {
log.Debug().Msg("no change detected in trace options")
return
}
mgr.traceOpts = traceOpts
if mgr.exporter != nil {
trace.UnregisterTracing(mgr.exporter)
mgr.exporter = nil
}
if !traceOpts.Enabled() {
return
}
log.Info().Interface("options", traceOpts).Msg("trace: starting exporter")
mgr.exporter, err = trace.RegisterTracing(traceOpts)
if err != nil {
log.Error().Err(err).Msg("trace: failed to register exporter")
return
}
}

View file

@ -1,11 +1,18 @@
package config
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/pomerium/pomerium/internal/telemetry/trace"
)
func Test_NewTracingOptions(t *testing.T) {
@ -77,3 +84,73 @@ func Test_TracingEnabled(t *testing.T) {
})
}
}
func TestTraceManager(t *testing.T) {
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*30)
defer clearTimeout()
type Request struct {
URL string
Name string
}
incoming := make(chan Request, 100)
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var objs []struct {
Name string
}
json.NewDecoder(r.Body).Decode(&objs)
for _, obj := range objs {
incoming <- Request{Name: obj.Name, URL: r.Host}
}
})
srv1 := httptest.NewServer(h)
defer srv1.Close()
srv2 := httptest.NewServer(h)
defer srv2.Close()
src := NewStaticSource(&Config{Options: &Options{
TracingProvider: "zipkin",
ZipkinEndpoint: srv1.URL,
TracingSampleRate: 1,
}})
mgr := NewTraceManager(src)
_ = mgr
_, span := trace.StartSpan(ctx, "Example")
span.End()
src.SetConfig(&Config{Options: &Options{
TracingProvider: "zipkin",
ZipkinEndpoint: srv2.URL,
TracingSampleRate: 1,
}})
_, span = trace.StartSpan(ctx, "Example")
span.End()
expect := map[Request]struct{}{
{Name: "Example", URL: srv1.Listener.Addr().String()}: {},
{Name: "Example", URL: srv2.Listener.Addr().String()}: {},
}
for len(expect) > 0 {
var req Request
select {
case <-ctx.Done():
t.Error("timeout waiting for requests")
return
case req = <-incoming:
}
if _, ok := expect[req]; ok {
delete(expect, req)
} else {
t.Error("unexpected request", req)
return
}
}
}

View file

@ -502,7 +502,7 @@ All metrics coming from envoy will be labeled with `service="pomerium"` or `serv
- Options: `debug` `info` `warn` `error`
- Default: value of `log_level` or `debug` if both are unset
Log level sets the logging level for the pomerium proxy service. Only logs of the desired level and above will be logged.
Proxy log level sets the logging level for the pomerium proxy service access logs. Only logs of the desired level and above will be logged.
### Service Mode

View file

@ -8,7 +8,6 @@ import (
"net"
"os"
"os/signal"
"sync"
"syscall"
envoy_service_auth_v2 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v2"
@ -22,11 +21,7 @@ import (
"github.com/pomerium/pomerium/internal/controlplane"
"github.com/pomerium/pomerium/internal/databroker"
"github.com/pomerium/pomerium/internal/envoy"
"github.com/pomerium/pomerium/internal/httputil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/urlutil"
"github.com/pomerium/pomerium/internal/version"
"github.com/pomerium/pomerium/proxy"
@ -50,14 +45,14 @@ func Run(ctx context.Context, configFile string) error {
src = databroker.NewConfigSource(src)
cfg := src.GetConfig()
logMgr := config.NewLogManager(src)
defer logMgr.Close()
metricsMgr := config.NewMetricsManager(src)
defer metricsMgr.Close()
traceMgr := config.NewTraceManager(src)
defer traceMgr.Close()
if err := setupMetrics(ctx, cfg.Options); err != nil {
return err
}
if err := setupTracing(ctx, cfg.Options); err != nil {
return err
}
cfg := src.GetConfig()
// setup the control plane
controlPlane, err := controlplane.NewServer(cfg.Options.Services)
@ -179,33 +174,6 @@ func setupCache(opt *config.Options, controlPlane *controlplane.Server) (*cache.
return svc, nil
}
func setupMetrics(ctx context.Context, opt *config.Options) error {
serviceName := telemetry.ServiceName(opt.Services)
if opt.MetricsAddr != "" {
handler, err := metrics.PrometheusHandler(config.EnvoyAdminURL)
if err != nil {
return err
}
metrics.SetBuildInfo(serviceName)
metrics.RegisterInfoMetrics()
serverOpts := &httputil.ServerOptions{
Addr: opt.MetricsAddr,
Insecure: true,
Service: "metrics",
}
var wg sync.WaitGroup
srv, err := httputil.NewServer(serverOpts, handler, &wg)
if err != nil {
return err
}
go func() {
<-ctx.Done()
_ = srv.Close()
}()
}
return nil
}
func setupProxy(opt *config.Options, controlPlane *controlplane.Server) error {
if !config.IsProxy(opt.Services) {
return nil
@ -218,21 +186,3 @@ func setupProxy(opt *config.Options, controlPlane *controlplane.Server) error {
controlPlane.HTTPRouter.PathPrefix("/").Handler(svc)
return nil
}
func setupTracing(ctx context.Context, opt *config.Options) error {
traceOpts, err := config.NewTracingOptions(opt)
if err != nil {
return fmt.Errorf("error setting up tracing: %w", err)
}
if traceOpts.Enabled() {
exporter, err := trace.RegisterTracing(traceOpts)
if err != nil {
return err
}
go func() {
<-ctx.Done()
trace.UnregisterTracing(exporter)
}()
}
return nil
}

View file

@ -4,61 +4,10 @@ import (
"context"
"io/ioutil"
"os"
"os/signal"
"syscall"
"testing"
"time"
"github.com/pomerium/pomerium/config"
)
func Test_setupTracing(t *testing.T) {
tests := []struct {
name string
opt *config.Options
}{
{"good jaeger", &config.Options{TracingProvider: "jaeger", TracingJaegerAgentEndpoint: "localhost:0", TracingJaegerCollectorEndpoint: "localhost:0"}},
{"dont register aything", &config.Options{}},
{"bad provider", &config.Options{TracingProvider: "bad provider"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
setupTracing(context.Background(), tt.opt)
})
}
}
func Test_setupMetrics(t *testing.T) {
tests := []struct {
name string
opt *config.Options
}{
{"dont register aything", &config.Options{}},
{"good metrics server", &config.Options{MetricsAddr: "localhost:0"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT)
defer signal.Stop(c)
setupMetrics(context.Background(), tt.opt)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
waitSig(t, c, syscall.SIGINT)
})
}
}
func waitSig(t *testing.T, c <-chan os.Signal, sig os.Signal) {
select {
case s := <-c:
if s != sig {
t.Fatalf("signal was %v, want %v", s, sig)
}
case <-time.After(1 * time.Second):
t.Fatalf("timeout waiting for %v", sig)
}
}
func Test_run(t *testing.T) {
os.Clearenv()
t.Parallel()

View file

@ -32,6 +32,7 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/trace"
)
const (
@ -46,7 +47,6 @@ type Server struct {
grpcPort, httpPort string
opts *config.Options
logLevel string
}
// NewServer creates a new server with traffic routed by envoy.
@ -64,12 +64,6 @@ func NewServer(opts *config.Options, grpcPort, httpPort string) (*Server, error)
opts: opts,
}
if srv.opts.ProxyLogLevel != "" {
srv.logLevel = srv.opts.ProxyLogLevel
} else {
srv.logLevel = srv.opts.LogLevel
}
err = srv.writeConfig()
if err != nil {
return nil, fmt.Errorf("error writing initial envoy configuration: %w", err)
@ -88,7 +82,7 @@ func (srv *Server) Run(ctx context.Context) error {
srv.cmd = exec.CommandContext(ctx, envoyPath,
"-c", configFileName,
"--log-level", srv.logLevel,
"--log-level", "trace",
"--log-format", "[LOG_FORMAT]%l--%n--%v",
"--log-format-escaped",
"--disable-hot-restart",
@ -268,7 +262,7 @@ func (srv *Server) addTraceConfig(traceOpts *config.TracingOptions, bootCfg *env
}
// We only support zipkin in envoy currently
if traceOpts.Provider != config.ZipkinTracingProviderName {
if traceOpts.Provider != trace.ZipkinTracingProviderName {
return nil
}
@ -354,6 +348,11 @@ func (srv *Server) handleLogs(rc io.ReadCloser) {
msg = s
}
// ignore empty messages
if msg == "" {
continue
}
log.WithLevel(lvl).
Str("service", "envoy").
Str("name", name).

View file

@ -16,6 +16,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/internal/testutil"
)
@ -40,19 +41,19 @@ func Test_addTraceConfig(t *testing.T) {
}{
{
"good zipkin",
&config.TracingOptions{Provider: config.ZipkinTracingProviderName, ZipkinEndpoint: &url.URL{Host: "localhost:9411"}},
&config.TracingOptions{Provider: trace.ZipkinTracingProviderName, ZipkinEndpoint: &url.URL{Host: "localhost:9411"}},
`{"tracing":{"http":{"name":"envoy.tracers.opencensus","typedConfig":{"@type":"type.googleapis.com/envoy.config.trace.v3.OpenCensusConfig","zipkinExporterEnabled":true,"zipkinUrl":"//localhost:9411","incomingTraceContext":["B3","TRACE_CONTEXT","CLOUD_TRACE_CONTEXT","GRPC_TRACE_BIN"],"outgoingTraceContext":["B3","TRACE_CONTEXT","GRPC_TRACE_BIN"]}}}}`,
false,
},
{
"good jaeger",
&config.TracingOptions{Provider: config.JaegerTracingProviderName},
&config.TracingOptions{Provider: trace.JaegerTracingProviderName},
`{}`,
false,
},
{
"bad zipkin",
&config.TracingOptions{Provider: config.ZipkinTracingProviderName, ZipkinEndpoint: &url.URL{}},
&config.TracingOptions{Provider: trace.ZipkinTracingProviderName, ZipkinEndpoint: &url.URL{}},
`{}`,
true,
},

View file

@ -1,34 +0,0 @@
// Adapted from https://golang.org/src/log/example_test.go
// Copyright 2013 The Go Authors. See 3RD-PARTY file for license.
package log_test
import (
"bytes"
"fmt"
stdlog "log"
"os"
"github.com/pomerium/pomerium/internal/log"
"github.com/rs/zerolog"
)
func ExampleLogger() {
log.Logger = zerolog.New(os.Stdout).With().Str("level-logging?", "yep!").Logger()
var (
buf bytes.Buffer
logger = stdlog.New(&log.StdLogWrapper{Logger: &log.Logger}, "", 0)
)
logger.Print("Hello logger!")
log.SetDebugMode()
logger.Print("Debug")
fmt.Print(&buf)
// Output:
// {"level":"error","level-logging?":"yep!","message":"Hello logger!"}
//<nil> ERR Debug level-logging?=yep!
}

View file

@ -5,19 +5,37 @@ import (
"context"
"net/http"
"os"
"sync/atomic"
"github.com/rs/zerolog"
)
// Logger is the global logger.
var Logger = zerolog.New(os.Stdout).With().Timestamp().Logger()
var (
logger atomic.Value
)
// SetDebugMode tells the logger to use standard out and pretty print output.
func SetDebugMode() {
Logger = Logger.Output(zerolog.ConsoleWriter{Out: os.Stdout})
func init() {
DisableDebug()
}
// SetLevel sets the minimum global log level. Options are 'debu' 'info' 'warn' and 'error'.
// DisableDebug tells the logger to use stdout and json output.
func DisableDebug() {
l := zerolog.New(os.Stdout).With().Timestamp().Logger()
logger.Store(&l)
}
// EnableDebug tells the logger to use stdout and pretty print output.
func EnableDebug() {
l := zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{Out: os.Stdout})
logger.Store(&l)
}
// Logger returns the global logger.
func Logger() *zerolog.Logger {
return logger.Load().(*zerolog.Logger)
}
// SetLevel sets the minimum global log level. Options are 'debug' 'info' 'warn' and 'error'.
// Defaults to 'debug'
func SetLevel(level string) {
switch level {
@ -34,40 +52,40 @@ func SetLevel(level string) {
// With creates a child logger with the field added to its context.
func With() zerolog.Context {
return Logger.With()
return Logger().With()
}
// Level creates a child logger with the minimum accepted level set to level.
func Level(level zerolog.Level) zerolog.Logger {
return Logger.Level(level)
return Logger().Level(level)
}
// Debug starts a new message with debug level.
//
// You must call Msg on the returned event in order to send the event.
func Debug() *zerolog.Event {
return Logger.Debug()
return Logger().Debug()
}
// Info starts a new message with info level.
//
// You must call Msg on the returned event in order to send the event.
func Info() *zerolog.Event {
return Logger.Info()
return Logger().Info()
}
// Warn starts a new message with warn level.
//
// You must call Msg on the returned event in order to send the event.
func Warn() *zerolog.Event {
return Logger.Warn()
return Logger().Warn()
}
// Error starts a new message with error level.
//
// You must call Msg on the returned event in order to send the event.
func Error() *zerolog.Event {
return Logger.Error()
return Logger().Error()
}
// Fatal starts a new message with fatal level. The os.Exit(1) function
@ -75,7 +93,7 @@ func Error() *zerolog.Event {
//
// You must call Msg on the returned event in order to send the event.
func Fatal() *zerolog.Event {
return Logger.Fatal()
return Logger().Fatal()
}
// Panic starts a new message with panic level. The message is also sent
@ -83,14 +101,14 @@ func Fatal() *zerolog.Event {
//
// You must call Msg on the returned event in order to send the event.
func Panic() *zerolog.Event {
return Logger.Panic()
return Logger().Panic()
}
// WithLevel starts a new message with level.
//
// You must call Msg on the returned event in order to send the event.
func WithLevel(level zerolog.Level) *zerolog.Event {
return Logger.WithLevel(level)
return Logger().WithLevel(level)
}
// Log starts a new message with no level. Setting zerolog.GlobalLevel to
@ -98,19 +116,19 @@ func WithLevel(level zerolog.Level) *zerolog.Event {
//
// You must call Msg on the returned event in order to send the event.
func Log() *zerolog.Event {
return Logger.Log()
return Logger().Log()
}
// Print sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Print.
func Print(v ...interface{}) {
Logger.Print(v...)
Logger().Print(v...)
}
// Printf sends a log event using debug level and no extra field.
// Arguments are handled in the manner of fmt.Printf.
func Printf(format string, v ...interface{}) {
Logger.Printf(format, v...)
Logger().Printf(format, v...)
}
// Ctx returns the Logger associated with the ctx. If no logger

View file

@ -3,11 +3,11 @@ package log_test
import (
"errors"
"flag"
"os"
"time"
"github.com/pomerium/pomerium/internal/log"
"github.com/rs/zerolog"
"github.com/pomerium/pomerium/internal/log"
)
// setup would normally be an init() function, however, there seems
@ -24,7 +24,7 @@ func setup() {
zerolog.TimestampFunc = func() time.Time {
return time.Date(2008, 1, 8, 17, 5, 5, 0, time.UTC)
}
log.Logger = zerolog.New(os.Stdout).With().Timestamp().Logger()
log.DisableDebug()
}
// Simple logging example using the Print function in the log package

View file

@ -14,12 +14,12 @@ import (
)
// NewHandler injects log into requests context.
func NewHandler(log zerolog.Logger) func(http.Handler) http.Handler {
func NewHandler(getLogger func() *zerolog.Logger) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Create a copy of the logger (including internal context slice)
// to prevent data race when using UpdateContext.
l := log.With().Logger()
l := getLogger().With().Logger()
r = r.WithContext(l.WithContext(r.Context()))
next.ServeHTTP(w, r)
})

View file

@ -43,7 +43,7 @@ func TestNewHandler(t *testing.T) {
log := zerolog.New(nil).With().
Str("foo", "bar").
Logger()
lh := NewHandler(log)
lh := NewHandler(func() *zerolog.Logger { return &log })
h := lh(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
l := FromRequest(r)
if !reflect.DeepEqual(*l, log) {
@ -62,7 +62,8 @@ func TestURLHandler(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"url":"/path?foo=bar"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -78,7 +79,8 @@ func TestMethodHandler(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"method":"POST"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -95,7 +97,8 @@ func TestRequestHandler(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"request":"POST /path?foo=bar"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -111,7 +114,8 @@ func TestRemoteAddrHandler(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"ip":"1.2.3.4"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -127,7 +131,8 @@ func TestRemoteAddrHandlerIPv6(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"ip":"2001:db8:a0b:12f0::1"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -145,7 +150,8 @@ func TestUserAgentHandler(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"ua":"some user agent string"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -163,7 +169,8 @@ func TestRefererHandler(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"referer":"http://foo.com/bar"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -185,7 +192,8 @@ func TestRequestIDHandler(t *testing.T) {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
}
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h = requestid.HTTPMiddleware()(h)
h.ServeHTTP(httptest.NewRecorder(), r)
}
@ -200,7 +208,8 @@ func TestCombinedHandlers(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"method":"POST","request":"POST /path?foo=bar","url":"/path?foo=bar"}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -218,10 +227,22 @@ func BenchmarkHandlers(b *testing.B) {
}))
h2 := MethodHandler("method")(RequestHandler("request")(h1))
handlers := map[string]http.Handler{
"Single": NewHandler(zerolog.New(ioutil.Discard))(h1),
"Combined": NewHandler(zerolog.New(ioutil.Discard))(h2),
"SingleDisabled": NewHandler(zerolog.New(ioutil.Discard).Level(zerolog.Disabled))(h1),
"CombinedDisabled": NewHandler(zerolog.New(ioutil.Discard).Level(zerolog.Disabled))(h2),
"Single": NewHandler(func() *zerolog.Logger {
log := zerolog.New(ioutil.Discard)
return &log
})(h1),
"Combined": NewHandler((func() *zerolog.Logger {
log := zerolog.New(ioutil.Discard)
return &log
}))(h2),
"SingleDisabled": NewHandler((func() *zerolog.Logger {
log := zerolog.New(ioutil.Discard).Level(zerolog.Disabled)
return &log
}))(h1),
"CombinedDisabled": NewHandler((func() *zerolog.Logger {
log := zerolog.New(ioutil.Discard).Level(zerolog.Disabled)
return &log
}))(h2),
}
for name := range handlers {
h := handlers[name]
@ -237,7 +258,7 @@ func BenchmarkDataRace(b *testing.B) {
log := zerolog.New(nil).With().
Str("foo", "bar").
Logger()
lh := NewHandler(log)
lh := NewHandler(func() *zerolog.Logger { return &log })
h := lh(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
l := FromRequest(r)
l.UpdateContext(func(c zerolog.Context) zerolog.Context {
@ -264,7 +285,8 @@ func TestLogHeadersHandler(t *testing.T) {
l := FromRequest(r)
l.Log().Msg("")
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
h.ServeHTTP(nil, r)
if want, got := `{"X-Forwarded-For":["proxy1,proxy2,proxy3"]}`+"\n", decodeIfBinary(out); want != got {
t.Errorf("Invalid log output, got: %s, want: %s", got, want)
@ -284,7 +306,8 @@ func TestAccessHandler(t *testing.T) {
l.Log().Msg("some inner logging")
w.Write([]byte("Add something to the request of non-zero size"))
}))
h = NewHandler(zerolog.New(out))(h)
log := zerolog.New(out)
h = NewHandler(func() *zerolog.Logger { return &log })(h)
w := httptest.NewRecorder()
h.ServeHTTP(w, r)

View file

@ -5,6 +5,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"sync"
ocprom "contrib.go.opencensus.io/exporter/prometheus"
prom "github.com/prometheus/client_golang/prometheus"
@ -16,19 +17,11 @@ import (
// PrometheusHandler creates an exporter that exports stats to Prometheus
// and returns a handler suitable for exporting metrics.
func PrometheusHandler(envoyURL *url.URL) (http.Handler, error) {
if err := registerDefaultViews(); err != nil {
return nil, fmt.Errorf("telemetry/metrics: failed registering views")
}
reg := prom.DefaultRegisterer.(*prom.Registry)
exporter, err := ocprom.NewExporter(
ocprom.Options{
Namespace: "pomerium",
Registry: reg,
})
exporter, err := getGlobalExporter()
if err != nil {
return nil, fmt.Errorf("telemetry/metrics: prometheus exporter: %w", err)
return nil, err
}
view.RegisterExporter(exporter)
mux := http.NewServeMux()
envoyMetricsURL, err := envoyURL.Parse("/stats/prometheus")
@ -40,6 +33,36 @@ func PrometheusHandler(envoyURL *url.URL) (http.Handler, error) {
return mux, nil
}
var (
globalExporter *ocprom.Exporter
globalExporterErr error
globalExporterOnce sync.Once
)
func getGlobalExporter() (*ocprom.Exporter, error) {
globalExporterOnce.Do(func() {
globalExporterErr = registerDefaultViews()
if globalExporterErr != nil {
globalExporterErr = fmt.Errorf("telemetry/metrics: failed registering views: %w", globalExporterErr)
return
}
reg := prom.DefaultRegisterer.(*prom.Registry)
globalExporter, globalExporterErr = ocprom.NewExporter(
ocprom.Options{
Namespace: "pomerium",
Registry: reg,
})
if globalExporterErr != nil {
globalExporterErr = fmt.Errorf("telemetry/metrics: prometheus exporter: %w", globalExporterErr)
return
}
view.RegisterExporter(globalExporter)
})
return globalExporter, globalExporterErr
}
func registerDefaultViews() error {
var views []*view.View
for _, v := range DefaultViews {

View file

@ -3,6 +3,7 @@ package trace
import (
"context"
"fmt"
"net/url"
"contrib.go.opencensus.io/exporter/jaeger"
ocZipkin "contrib.go.opencensus.io/exporter/zipkin"
@ -10,18 +11,55 @@ import (
zipkinHTTP "github.com/openzipkin/zipkin-go/reporter/http"
"go.opencensus.io/trace"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
)
const (
// JaegerTracingProviderName is the name of the tracing provider Jaeger.
JaegerTracingProviderName = "jaeger"
// ZipkinTracingProviderName is the name of the tracing provider Zipkin.
ZipkinTracingProviderName = "zipkin"
)
// TracingOptions contains the configurations settings for a http server.
type TracingOptions struct {
// Shared
Provider string
Service string
Debug bool
// Jaeger
// CollectorEndpoint is the full url to the Jaeger HTTP Thrift collector.
// For example, http://localhost:14268/api/traces
JaegerCollectorEndpoint *url.URL
// AgentEndpoint instructs exporter to send spans to jaeger-agent at this address.
// For example, localhost:6831.
JaegerAgentEndpoint string
// Zipkin
// ZipkinEndpoint configures the zipkin collector URI
// Example: http://zipkin:9411/api/v2/spans
ZipkinEndpoint *url.URL
// SampleRate is percentage of requests which are sampled
SampleRate float64
}
// Enabled indicates whether tracing is enabled on a given TracingOptions
func (t *TracingOptions) Enabled() bool {
return t.Provider != ""
}
// RegisterTracing creates a new trace exporter from TracingOptions.
func RegisterTracing(opts *config.TracingOptions) (trace.Exporter, error) {
func RegisterTracing(opts *TracingOptions) (trace.Exporter, error) {
var exporter trace.Exporter
var err error
switch opts.Provider {
case config.JaegerTracingProviderName:
case JaegerTracingProviderName:
exporter, err = registerJaeger(opts)
case config.ZipkinTracingProviderName:
case ZipkinTracingProviderName:
exporter, err = registerZipkin(opts)
default:
return nil, fmt.Errorf("telemetry/trace: provider %s unknown", opts.Provider)
@ -40,7 +78,7 @@ func UnregisterTracing(exporter trace.Exporter) {
trace.UnregisterExporter(exporter)
}
func registerJaeger(opts *config.TracingOptions) (trace.Exporter, error) {
func registerJaeger(opts *TracingOptions) (trace.Exporter, error) {
jOpts := jaeger.Options{
ServiceName: opts.Service,
AgentEndpoint: opts.JaegerAgentEndpoint,
@ -56,7 +94,7 @@ func registerJaeger(opts *config.TracingOptions) (trace.Exporter, error) {
return jex, nil
}
func registerZipkin(opts *config.TracingOptions) (trace.Exporter, error) {
func registerZipkin(opts *TracingOptions) (trace.Exporter, error) {
localEndpoint, err := zipkin.NewEndpoint(opts.Service, "")
if err != nil {
return nil, fmt.Errorf("telemetry/trace: could not create local endpoint: %w", err)

View file

@ -3,21 +3,19 @@ package trace
import (
"net/url"
"testing"
"github.com/pomerium/pomerium/config"
)
func TestRegisterTracing(t *testing.T) {
tests := []struct {
name string
opts *config.TracingOptions
opts *TracingOptions
wantErr bool
}{
{"jaeger", &config.TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger"}, false},
{"jaeger with debug", &config.TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger", Debug: true}, false},
{"jaeger no endpoint", &config.TracingOptions{JaegerAgentEndpoint: "", Service: "all", Provider: "jaeger"}, true},
{"unknown provider", &config.TracingOptions{JaegerAgentEndpoint: "localhost:0", Service: "all", Provider: "Lucius Cornelius Sulla"}, true},
{"zipkin with debug", &config.TracingOptions{ZipkinEndpoint: &url.URL{Host: "localhost"}, Service: "all", Provider: "zipkin", Debug: true}, false},
{"jaeger", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger"}, false},
{"jaeger with debug", &TracingOptions{JaegerAgentEndpoint: "localhost:6831", Service: "all", Provider: "jaeger", Debug: true}, false},
{"jaeger no endpoint", &TracingOptions{JaegerAgentEndpoint: "", Service: "all", Provider: "jaeger"}, true},
{"unknown provider", &TracingOptions{JaegerAgentEndpoint: "localhost:0", Service: "all", Provider: "Lucius Cornelius Sulla"}, true},
{"zipkin with debug", &TracingOptions{ZipkinEndpoint: &url.URL{Host: "localhost"}, Service: "all", Provider: "zipkin", Debug: true}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {