mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-06 10:21:05 +02:00
envoy: add support for hot-reloading bootstrap configuration (#1259)
* envoy: add support for hot-reloading bootstrap configuration * use passed in log level * fix unnecessary firstNonEmpty * move process release to after new command start
This commit is contained in:
parent
82b1daae50
commit
2afd7b6864
4 changed files with 171 additions and 55 deletions
|
@ -69,10 +69,11 @@ func Run(ctx context.Context, configFile string) error {
|
||||||
log.Info().Str("port", httpPort).Msg("HTTP server started")
|
log.Info().Str("port", httpPort).Msg("HTTP server started")
|
||||||
|
|
||||||
// create envoy server
|
// create envoy server
|
||||||
envoyServer, err := envoy.NewServer(cfg.Options, grpcPort, httpPort)
|
envoyServer, err := envoy.NewServer(src, grpcPort, httpPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error creating envoy server: %w", err)
|
return fmt.Errorf("error creating envoy server: %w", err)
|
||||||
}
|
}
|
||||||
|
defer envoyServer.Close()
|
||||||
|
|
||||||
// add services
|
// add services
|
||||||
if err := setupAuthenticate(src, cfg, controlPlane); err != nil {
|
if err := setupAuthenticate(src, cfg, controlPlane); err != nil {
|
||||||
|
@ -116,9 +117,6 @@ func Run(ctx context.Context, configFile string) error {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return controlPlane.Run(ctx)
|
return controlPlane.Run(ctx)
|
||||||
})
|
})
|
||||||
eg.Go(func() error {
|
|
||||||
return envoyServer.Run(ctx)
|
|
||||||
})
|
|
||||||
if authorizeServer != nil {
|
if authorizeServer != nil {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
return authorizeServer.Run(ctx)
|
return authorizeServer.Run(ctx)
|
||||||
|
|
|
@ -4,7 +4,6 @@ package envoy
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -14,13 +13,17 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
envoy_config_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
|
envoy_config_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
|
||||||
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||||
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||||
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
||||||
envoy_config_metrics_v3 "github.com/envoyproxy/go-control-plane/envoy/config/metrics/v3"
|
envoy_config_metrics_v3 "github.com/envoyproxy/go-control-plane/envoy/config/metrics/v3"
|
||||||
envoy_config_trace_v3 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3"
|
envoy_config_trace_v3 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3"
|
||||||
|
"github.com/google/go-cmp/cmp"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/golang/protobuf/ptypes"
|
"github.com/golang/protobuf/ptypes"
|
||||||
|
@ -40,74 +43,156 @@ const (
|
||||||
configFileName = "envoy-config.yaml"
|
configFileName = "envoy-config.yaml"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type serverOptions struct {
|
||||||
|
services string
|
||||||
|
logLevel string
|
||||||
|
tracingOptions trace.TracingOptions
|
||||||
|
}
|
||||||
|
|
||||||
// A Server is a pomerium proxy implemented via envoy.
|
// A Server is a pomerium proxy implemented via envoy.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
wd string
|
wd string
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
|
|
||||||
grpcPort, httpPort string
|
grpcPort, httpPort string
|
||||||
opts *config.Options
|
envoyPath string
|
||||||
|
restartEpoch int
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
options serverOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new server with traffic routed by envoy.
|
// NewServer creates a new server with traffic routed by envoy.
|
||||||
func NewServer(opts *config.Options, grpcPort, httpPort string) (*Server, error) {
|
func NewServer(src config.Source, grpcPort, httpPort string) (*Server, error) {
|
||||||
wd := filepath.Join(os.TempDir(), workingDirectoryName)
|
wd := filepath.Join(os.TempDir(), workingDirectoryName)
|
||||||
err := os.MkdirAll(wd, 0755)
|
err := os.MkdirAll(wd, 0755)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating temporary working directory for envoy: %w", err)
|
return nil, fmt.Errorf("error creating temporary working directory for envoy: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := &Server{
|
|
||||||
wd: wd,
|
|
||||||
grpcPort: grpcPort,
|
|
||||||
httpPort: httpPort,
|
|
||||||
opts: opts,
|
|
||||||
}
|
|
||||||
|
|
||||||
err = srv.writeConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error writing initial envoy configuration: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return srv, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run runs the server by extracting the embedded envoy and then executing it.
|
|
||||||
func (srv *Server) Run(ctx context.Context) error {
|
|
||||||
envoyPath, err := extractEmbeddedEnvoy()
|
envoyPath, err := extractEmbeddedEnvoy()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn().Err(err).Send()
|
log.Warn().Err(err).Send()
|
||||||
envoyPath = "envoy"
|
envoyPath = "envoy"
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.cmd = exec.CommandContext(ctx, envoyPath,
|
srv := &Server{
|
||||||
|
wd: wd,
|
||||||
|
grpcPort: grpcPort,
|
||||||
|
httpPort: httpPort,
|
||||||
|
envoyPath: envoyPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
src.OnConfigChange(srv.onConfigChange)
|
||||||
|
srv.onConfigChange(src.GetConfig())
|
||||||
|
|
||||||
|
return srv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close kills any underlying envoy process.
|
||||||
|
func (srv *Server) Close() error {
|
||||||
|
srv.mu.Lock()
|
||||||
|
defer srv.mu.Unlock()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if srv.cmd != nil && srv.cmd.Process != nil {
|
||||||
|
err = srv.cmd.Process.Kill()
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("service", "envoy").Msg("envoy: failed to kill process on close")
|
||||||
|
}
|
||||||
|
srv.cmd = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *Server) onConfigChange(cfg *config.Config) {
|
||||||
|
srv.update(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *Server) update(cfg *config.Config) {
|
||||||
|
srv.mu.Lock()
|
||||||
|
defer srv.mu.Unlock()
|
||||||
|
|
||||||
|
tracingOptions, err := config.NewTracingOptions(cfg.Options)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("service", "envoy").Msg("invalid tracing config")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
options := serverOptions{
|
||||||
|
services: cfg.Options.Services,
|
||||||
|
logLevel: firstNonEmpty(cfg.Options.ProxyLogLevel, cfg.Options.LogLevel, "debug"),
|
||||||
|
tracingOptions: *tracingOptions,
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmp.Equal(srv.options, options, cmp.AllowUnexported(serverOptions{})) {
|
||||||
|
log.Debug().Str("service", "envoy").Msg("envoy: no config changes detected")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
srv.options = options
|
||||||
|
|
||||||
|
if err := srv.writeConfig(); err != nil {
|
||||||
|
log.Error().Err(err).Str("service", "envoy").Msg("envoy: failed to write envoy config")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info().Msg("envoy: starting envoy process")
|
||||||
|
if err := srv.run(); err != nil {
|
||||||
|
log.Error().Err(err).Str("service", "envoy").Msg("envoy: failed to run envoy process")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *Server) run() error {
|
||||||
|
args := []string{
|
||||||
"-c", configFileName,
|
"-c", configFileName,
|
||||||
"--log-level", "trace",
|
"--log-level", srv.options.logLevel,
|
||||||
"--log-format", "[LOG_FORMAT]%l--%n--%v",
|
"--log-format", "[LOG_FORMAT]%l--%n--%v",
|
||||||
"--log-format-escaped",
|
"--log-format-escaped",
|
||||||
"--disable-hot-restart",
|
}
|
||||||
)
|
|
||||||
srv.cmd.Dir = srv.wd
|
|
||||||
|
|
||||||
stderr, err := srv.cmd.StderrPipe()
|
if baseID, ok := readBaseID(); ok {
|
||||||
|
srv.restartEpoch++
|
||||||
|
args = append(args, "--base-id", strconv.Itoa(baseID), "--restart-epoch", strconv.Itoa(srv.restartEpoch))
|
||||||
|
} else {
|
||||||
|
args = append(args, "--use-dynamic-base-id", "--base-id-path", baseIDPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd := exec.Command(srv.envoyPath, args...) // #nosec
|
||||||
|
cmd.Dir = srv.wd
|
||||||
|
|
||||||
|
stderr, err := cmd.StderrPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error creating stderr pipe for envoy: %w", err)
|
return fmt.Errorf("error creating stderr pipe for envoy: %w", err)
|
||||||
}
|
}
|
||||||
go srv.handleLogs(stderr)
|
go srv.handleLogs(stderr)
|
||||||
|
|
||||||
stdout, err := srv.cmd.StdoutPipe()
|
stdout, err := cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error creating stderr pipe for envoy: %w", err)
|
return fmt.Errorf("error creating stderr pipe for envoy: %w", err)
|
||||||
}
|
}
|
||||||
go srv.handleLogs(stdout)
|
go srv.handleLogs(stdout)
|
||||||
|
|
||||||
// make sure envoy is killed if we're killed
|
// make sure envoy is killed if we're killed
|
||||||
srv.cmd.SysProcAttr = sysProcAttr
|
cmd.SysProcAttr = sysProcAttr
|
||||||
err = srv.cmd.Run()
|
|
||||||
if err == nil {
|
err = cmd.Start()
|
||||||
return errors.New("envoy exited without error")
|
if err != nil {
|
||||||
|
return fmt.Errorf("error starting envoy: %w", err)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("envoy exited: %w", err)
|
|
||||||
|
// release the previous process so we can hot-reload
|
||||||
|
if srv.cmd != nil && srv.cmd.Process != nil {
|
||||||
|
log.Info().Msg("envoy: releasing envoy process for hot-reload")
|
||||||
|
err := srv.cmd.Process.Release()
|
||||||
|
if err != nil {
|
||||||
|
log.Warn().Err(err).Str("service", "envoy").Msg("envoy: failed to release envoy process for hot-reload")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
srv.cmd = cmd
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) writeConfig() error {
|
func (srv *Server) writeConfig() error {
|
||||||
|
@ -117,13 +202,12 @@ func (srv *Server) writeConfig() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
cfgPath := filepath.Join(srv.wd, configFileName)
|
cfgPath := filepath.Join(srv.wd, configFileName)
|
||||||
log.WithLevel(zerolog.DebugLevel).Str("service", "envoy").Str("location", cfgPath).Msg("wrote config file to location")
|
log.Debug().Str("service", "envoy").Str("location", cfgPath).Msg("wrote config file to location")
|
||||||
|
|
||||||
return atomic.WriteFile(cfgPath, bytes.NewReader(confBytes))
|
return atomic.WriteFile(cfgPath, bytes.NewReader(confBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) buildBootstrapConfig() ([]byte, error) {
|
func (srv *Server) buildBootstrapConfig() ([]byte, error) {
|
||||||
|
|
||||||
nodeCfg := &envoy_config_core_v3.Node{
|
nodeCfg := &envoy_config_core_v3.Node{
|
||||||
Id: "proxy",
|
Id: "proxy",
|
||||||
Cluster: "proxy",
|
Cluster: "proxy",
|
||||||
|
@ -225,12 +309,7 @@ func (srv *Server) buildBootstrapConfig() ([]byte, error) {
|
||||||
StatsConfig: srv.buildStatsConfig(),
|
StatsConfig: srv.buildStatsConfig(),
|
||||||
}
|
}
|
||||||
|
|
||||||
traceOpts, err := config.NewTracingOptions(srv.opts)
|
if err := srv.addTraceConfig(cfg); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid tracing config: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := srv.addTraceConfig(traceOpts, cfg); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to add tracing config: %w", err)
|
return nil, fmt.Errorf("failed to add tracing config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,25 +327,24 @@ func (srv *Server) buildStatsConfig() *envoy_config_metrics_v3.StatsConfig {
|
||||||
{
|
{
|
||||||
TagName: "service",
|
TagName: "service",
|
||||||
TagValue: &envoy_config_metrics_v3.TagSpecifier_FixedValue{
|
TagValue: &envoy_config_metrics_v3.TagSpecifier_FixedValue{
|
||||||
FixedValue: telemetry.ServiceName(srv.opts.Services),
|
FixedValue: telemetry.ServiceName(srv.options.services),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return cfg
|
return cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) addTraceConfig(traceOpts *config.TracingOptions, bootCfg *envoy_config_bootstrap_v3.Bootstrap) error {
|
func (srv *Server) addTraceConfig(bootCfg *envoy_config_bootstrap_v3.Bootstrap) error {
|
||||||
|
if !srv.options.tracingOptions.Enabled() {
|
||||||
if !traceOpts.Enabled() {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We only support zipkin in envoy currently
|
// We only support zipkin in envoy currently
|
||||||
if traceOpts.Provider != trace.ZipkinTracingProviderName {
|
if srv.options.tracingOptions.Provider != trace.ZipkinTracingProviderName {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if traceOpts.ZipkinEndpoint.String() == "" {
|
if srv.options.tracingOptions.ZipkinEndpoint.String() == "" {
|
||||||
return fmt.Errorf("missing zipkin url")
|
return fmt.Errorf("missing zipkin url")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,7 +353,7 @@ func (srv *Server) addTraceConfig(traceOpts *config.TracingOptions, bootCfg *env
|
||||||
tracingTC, _ := ptypes.MarshalAny(
|
tracingTC, _ := ptypes.MarshalAny(
|
||||||
&envoy_config_trace_v3.OpenCensusConfig{
|
&envoy_config_trace_v3.OpenCensusConfig{
|
||||||
ZipkinExporterEnabled: true,
|
ZipkinExporterEnabled: true,
|
||||||
ZipkinUrl: traceOpts.ZipkinEndpoint.String(),
|
ZipkinUrl: srv.options.tracingOptions.ZipkinEndpoint.String(),
|
||||||
IncomingTraceContext: []envoy_config_trace_v3.OpenCensusConfig_TraceContext{
|
IncomingTraceContext: []envoy_config_trace_v3.OpenCensusConfig_TraceContext{
|
||||||
envoy_config_trace_v3.OpenCensusConfig_B3,
|
envoy_config_trace_v3.OpenCensusConfig_B3,
|
||||||
envoy_config_trace_v3.OpenCensusConfig_TRACE_CONTEXT,
|
envoy_config_trace_v3.OpenCensusConfig_TRACE_CONTEXT,
|
||||||
|
@ -319,17 +397,22 @@ func (srv *Server) parseLog(line string) (name string, logLevel string, msg stri
|
||||||
|
|
||||||
func (srv *Server) handleLogs(rc io.ReadCloser) {
|
func (srv *Server) handleLogs(rc io.ReadCloser) {
|
||||||
defer rc.Close()
|
defer rc.Close()
|
||||||
|
|
||||||
|
bo := backoff.NewExponentialBackOff()
|
||||||
|
|
||||||
s := bufio.NewReader(rc)
|
s := bufio.NewReader(rc)
|
||||||
for {
|
for {
|
||||||
ln, err := s.ReadString('\n')
|
ln, err := s.ReadString('\n')
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == io.EOF {
|
if errors.Is(err, io.EOF) || errors.Is(err, os.ErrClosed) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
log.Error().Err(err).Msg("failed to read log")
|
log.Error().Err(err).Msg("failed to read log")
|
||||||
|
time.Sleep(bo.NextBackOff())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ln = strings.TrimRight(ln, "\r\n")
|
ln = strings.TrimRight(ln, "\r\n")
|
||||||
|
bo.Reset()
|
||||||
|
|
||||||
name, logLevel, msg := srv.parseLog(ln)
|
name, logLevel, msg := srv.parseLog(ln)
|
||||||
if name == "" {
|
if name == "" {
|
||||||
|
|
|
@ -61,10 +61,14 @@ func Test_addTraceConfig(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
srv := &Server{}
|
srv := &Server{
|
||||||
|
options: serverOptions{
|
||||||
|
tracingOptions: *tt.opts,
|
||||||
|
},
|
||||||
|
}
|
||||||
baseCfg := &envoy_config_bootstrap_v3.Bootstrap{}
|
baseCfg := &envoy_config_bootstrap_v3.Bootstrap{}
|
||||||
|
|
||||||
err := srv.addTraceConfig(tt.opts, baseCfg)
|
err := srv.addTraceConfig(baseCfg)
|
||||||
|
|
||||||
assert.Equal(t, tt.wantErr, err != nil, "unexpected error state")
|
assert.Equal(t, tt.wantErr, err != nil, "unexpected error state")
|
||||||
|
|
||||||
|
@ -86,7 +90,7 @@ func Test_buildStatsConfig(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
srv := &Server{opts: tt.opts}
|
srv := &Server{options: serverOptions{services: tt.opts.Services}}
|
||||||
|
|
||||||
statsCfg := srv.buildStatsConfig()
|
statsCfg := srv.buildStatsConfig()
|
||||||
testutil.AssertProtoJSONEqual(t, tt.want, statsCfg)
|
testutil.AssertProtoJSONEqual(t, tt.want, statsCfg)
|
||||||
|
|
31
internal/envoy/misc.go
Normal file
31
internal/envoy/misc.go
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package envoy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
const baseIDPath = "/tmp/pomerium-envoy-base-id"
|
||||||
|
|
||||||
|
func firstNonEmpty(args ...string) string {
|
||||||
|
for _, a := range args {
|
||||||
|
if a != "" {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func readBaseID() (int, bool) {
|
||||||
|
bs, err := ioutil.ReadFile(baseIDPath)
|
||||||
|
if err != nil {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
baseID, err := strconv.Atoi(string(bs))
|
||||||
|
if err != nil {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return baseID, true
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue