mirror of
https://github.com/pomerium/pomerium.git
synced 2025-04-29 02:16:28 +02:00
core/config: add config version, additional telemetry (#4645)
* core/config: add config version, additional telemetry * typo
This commit is contained in:
parent
dd7e3b993c
commit
ae420f01c6
13 changed files with 857 additions and 766 deletions
|
@ -27,6 +27,7 @@ type Config struct {
|
|||
Options *Options
|
||||
AutoCertificates []tls.Certificate
|
||||
EnvoyVersion string
|
||||
Version int64
|
||||
|
||||
// DerivedCertificates are TLS certificates derived from the shared secret
|
||||
DerivedCertificates []tls.Certificate
|
||||
|
@ -62,6 +63,7 @@ func (cfg *Config) Clone() *Config {
|
|||
_ = copy(endpoints, cfg.MetricsScrapeEndpoints)
|
||||
|
||||
return &Config{
|
||||
Version: cfg.Version,
|
||||
Options: newOptions,
|
||||
AutoCertificates: cfg.AutoCertificates,
|
||||
EnvoyVersion: cfg.EnvoyVersion,
|
||||
|
|
|
@ -114,6 +114,7 @@ func NewFileOrEnvironmentSource(
|
|||
cfg := &Config{
|
||||
Options: options,
|
||||
EnvoyVersion: envoyVersion,
|
||||
Version: 1,
|
||||
}
|
||||
|
||||
ports, err := netutil.AllocatePorts(6)
|
||||
|
@ -151,6 +152,7 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
|
|||
options, err := newOptionsFromConfig(src.configFile)
|
||||
if err == nil {
|
||||
cfg = cfg.Clone()
|
||||
cfg.Version++
|
||||
cfg.Options = options
|
||||
metrics.SetConfigInfo(ctx, cfg.Options.Services, "local", cfg.Checksum(), true)
|
||||
} else {
|
||||
|
@ -160,6 +162,8 @@ func (src *FileOrEnvironmentSource) check(ctx context.Context) {
|
|||
src.config = cfg
|
||||
src.mu.Unlock()
|
||||
|
||||
log.Info(ctx).Int64("config-version", cfg.Version).Msg("config: loaded configuration")
|
||||
|
||||
src.Trigger(ctx, cfg)
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
|
||||
"github.com/pomerium/pomerium/config"
|
||||
"github.com/pomerium/pomerium/internal/telemetry"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -32,6 +33,9 @@ func (b *Builder) BuildBootstrap(
|
|||
cfg *config.Config,
|
||||
fullyStatic bool,
|
||||
) (bootstrap *envoy_config_bootstrap_v3.Bootstrap, err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildBootstrap")
|
||||
defer span.End()
|
||||
|
||||
bootstrap = new(envoy_config_bootstrap_v3.Bootstrap)
|
||||
|
||||
bootstrap.Admin, err = b.BuildBootstrapAdmin(cfg)
|
||||
|
@ -164,6 +168,9 @@ func (b *Builder) BuildBootstrapStaticResources(
|
|||
cfg *config.Config,
|
||||
fullyStatic bool,
|
||||
) (staticResources *envoy_config_bootstrap_v3.Bootstrap_StaticResources, err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildBootstrapStaticResources")
|
||||
defer span.End()
|
||||
|
||||
staticResources = new(envoy_config_bootstrap_v3.Bootstrap_StaticResources)
|
||||
|
||||
if fullyStatic {
|
||||
|
|
|
@ -20,11 +20,15 @@ import (
|
|||
|
||||
"github.com/pomerium/pomerium/config"
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
"github.com/pomerium/pomerium/internal/urlutil"
|
||||
)
|
||||
|
||||
// BuildClusters builds envoy clusters from the given config.
|
||||
func (b *Builder) BuildClusters(ctx context.Context, cfg *config.Config) ([]*envoy_config_cluster_v3.Cluster, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildClusters")
|
||||
defer span.End()
|
||||
|
||||
grpcURLs := []*url.URL{{
|
||||
Scheme: "http",
|
||||
Host: b.localGRPCAddress,
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/sets"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/metrics"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
"github.com/pomerium/pomerium/internal/urlutil"
|
||||
)
|
||||
|
||||
|
@ -48,6 +49,9 @@ func (b *Builder) BuildListeners(
|
|||
cfg *config.Config,
|
||||
fullyStatic bool,
|
||||
) ([]*envoy_config_listener_v3.Listener, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildListeners")
|
||||
defer span.End()
|
||||
|
||||
var listeners []*envoy_config_listener_v3.Listener
|
||||
|
||||
if config.IsAuthenticate(cfg.Options.Services) || config.IsProxy(cfg.Options.Services) {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
|
||||
|
||||
"github.com/pomerium/pomerium/config"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
)
|
||||
|
||||
// BuildRouteConfigurations builds the route configurations for the RDS service.
|
||||
|
@ -13,6 +14,9 @@ func (b *Builder) BuildRouteConfigurations(
|
|||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
) ([]*envoy_config_route_v3.RouteConfiguration, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "envoyconfig.Builder.BuildRouteConfigurations")
|
||||
defer span.End()
|
||||
|
||||
var routeConfigurations []*envoy_config_route_v3.RouteConfiguration
|
||||
|
||||
if config.IsAuthenticate(cfg.Options.Services) || config.IsProxy(cfg.Options.Services) {
|
||||
|
|
|
@ -267,7 +267,7 @@ func (srv *Server) OnConfigChange(ctx context.Context, cfg *config.Config) error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
srv.xdsmgr.Update(ctx, res)
|
||||
srv.xdsmgr.Update(ctx, cfg.Version, res)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
|
||||
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/telemetry/trace"
|
||||
"github.com/pomerium/pomerium/pkg/cryptutil"
|
||||
"github.com/pomerium/pomerium/pkg/protoutil"
|
||||
)
|
||||
|
@ -17,14 +19,22 @@ const (
|
|||
)
|
||||
|
||||
func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*envoy_service_discovery_v3.Resource, error) {
|
||||
resources := map[string][]*envoy_service_discovery_v3.Resource{}
|
||||
ctx, span := trace.StartSpan(ctx, "controlplane.Server.buildDiscoveryResources")
|
||||
defer span.End()
|
||||
|
||||
cfg := srv.currentConfig.Load()
|
||||
|
||||
log.Info(ctx).Int64("config-version", cfg.Version).Msg("controlplane: building discovery resources")
|
||||
|
||||
resources := map[string][]*envoy_service_discovery_v3.Resource{}
|
||||
var clusterCount, listenerCount, routeConfigurationCount int
|
||||
|
||||
clusters, err := srv.Builder.BuildClusters(ctx, cfg.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, cluster := range clusters {
|
||||
clusterCount++
|
||||
resources[clusterTypeURL] = append(resources[clusterTypeURL], &envoy_service_discovery_v3.Resource{
|
||||
Name: cluster.Name,
|
||||
Version: hex.EncodeToString(cryptutil.HashProto(cluster)),
|
||||
|
@ -37,6 +47,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
|||
return nil, err
|
||||
}
|
||||
for _, listener := range listeners {
|
||||
listenerCount++
|
||||
resources[listenerTypeURL] = append(resources[listenerTypeURL], &envoy_service_discovery_v3.Resource{
|
||||
Name: listener.Name,
|
||||
Version: hex.EncodeToString(cryptutil.HashProto(listener)),
|
||||
|
@ -49,6 +60,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
|||
return nil, err
|
||||
}
|
||||
for _, routeConfiguration := range routeConfigurations {
|
||||
routeConfigurationCount++
|
||||
resources[routeConfigurationTypeURL] = append(resources[routeConfigurationTypeURL], &envoy_service_discovery_v3.Resource{
|
||||
Name: routeConfiguration.Name,
|
||||
Version: hex.EncodeToString(cryptutil.HashProto(routeConfiguration)),
|
||||
|
@ -56,5 +68,12 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
|
|||
})
|
||||
}
|
||||
|
||||
log.Info(ctx).
|
||||
Int64("config-version", cfg.Version).
|
||||
Int("cluster-count", clusterCount).
|
||||
Int("listener-count", listenerCount).
|
||||
Int("route-configuration-count", routeConfigurationCount).
|
||||
Msg("controlplane: built discovery resources")
|
||||
|
||||
return resources, nil
|
||||
}
|
||||
|
|
|
@ -3,6 +3,9 @@ package xdsmgr
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
|
@ -11,6 +14,7 @@ import (
|
|||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/signal"
|
||||
)
|
||||
|
||||
|
@ -36,7 +40,7 @@ func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Ma
|
|||
return &Manager{
|
||||
signal: signal.New(),
|
||||
|
||||
nonce: uuid.NewString(),
|
||||
nonce: toNonce(0),
|
||||
resources: resources,
|
||||
}
|
||||
}
|
||||
|
@ -106,6 +110,10 @@ func (mgr *Manager) DeltaAggregatedResources(
|
|||
case req.GetResponseNonce() == "":
|
||||
// neither an ACK or a NACK
|
||||
case req.GetErrorDetail() != nil:
|
||||
log.Info(ctx).
|
||||
Any("error-detail", req.GetErrorDetail()).
|
||||
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
|
||||
Msg("xdsmgr: nack")
|
||||
// a NACK
|
||||
// - set the client resource versions to the current resource versions
|
||||
state.clientResourceVersions = make(map[string]string)
|
||||
|
@ -113,6 +121,9 @@ func (mgr *Manager) DeltaAggregatedResources(
|
|||
state.clientResourceVersions[resource.Name] = resource.Version
|
||||
}
|
||||
case req.GetResponseNonce() == mgr.nonce:
|
||||
log.Info(ctx).
|
||||
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
|
||||
Msg("xdsmgr: ack")
|
||||
// an ACK for the last response
|
||||
// - set the client resource versions to the current resource versions
|
||||
state.clientResourceVersions = make(map[string]string)
|
||||
|
@ -121,6 +132,9 @@ func (mgr *Manager) DeltaAggregatedResources(
|
|||
}
|
||||
default:
|
||||
// an ACK for a response that's not the last response
|
||||
log.Info(ctx).
|
||||
Int64("config-version", versionFromNonce(req.GetResponseNonce())).
|
||||
Msg("xdsmgr: ack")
|
||||
}
|
||||
|
||||
// update subscriptions
|
||||
|
@ -200,6 +214,11 @@ func (mgr *Manager) DeltaAggregatedResources(
|
|||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case res := <-outgoing:
|
||||
log.Info(ctx).
|
||||
Int64("config-version", versionFromNonce(res.GetNonce())).
|
||||
Int("resource-count", len(res.GetResources())).
|
||||
Int("removed-resource-count", len(res.GetRemovedResources())).
|
||||
Msg("xdsmgr: sending resources")
|
||||
err := stream.Send(res)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -219,8 +238,8 @@ func (mgr *Manager) StreamAggregatedResources(
|
|||
|
||||
// Update updates the state of resources. If any changes are made they will be pushed to any listening
|
||||
// streams. For each TypeURL the list of resources should be the complete list of resources.
|
||||
func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_service_discovery_v3.Resource) {
|
||||
nonce := uuid.New().String()
|
||||
func (mgr *Manager) Update(ctx context.Context, version int64, resources map[string][]*envoy_service_discovery_v3.Resource) {
|
||||
nonce := toNonce(version)
|
||||
|
||||
mgr.mu.Lock()
|
||||
mgr.nonce = nonce
|
||||
|
@ -229,3 +248,15 @@ func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_se
|
|||
|
||||
mgr.signal.Broadcast(ctx)
|
||||
}
|
||||
|
||||
func toNonce(version int64) string {
|
||||
return fmt.Sprintf("%d/%s", version, uuid.New().String())
|
||||
}
|
||||
|
||||
// versionFromNonce parses the version out of the nonce. A missing or invalid version will be returned as 0.
|
||||
func versionFromNonce(nonce string) (version int64) {
|
||||
if idx := strings.Index(nonce, "/"); idx > 0 {
|
||||
version, _ = strconv.ParseInt(nonce[:idx], 10, 64)
|
||||
}
|
||||
return version
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ func TestManager(t *testing.T) {
|
|||
}, msg.GetResources())
|
||||
ack(msg.Nonce)
|
||||
|
||||
mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
|
||||
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{
|
||||
typeURL: {{Name: "r1", Version: "2"}},
|
||||
})
|
||||
|
||||
|
@ -105,7 +105,7 @@ func TestManager(t *testing.T) {
|
|||
}, msg.GetResources())
|
||||
ack(msg.Nonce)
|
||||
|
||||
mgr.Update(ctx, map[string][]*envoy_service_discovery_v3.Resource{
|
||||
mgr.Update(ctx, 1, map[string][]*envoy_service_discovery_v3.Resource{
|
||||
typeURL: nil,
|
||||
})
|
||||
|
||||
|
|
|
@ -74,6 +74,8 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
|
|||
_, span := trace.StartSpan(ctx, "databroker.config_source.rebuild")
|
||||
defer span.End()
|
||||
|
||||
log.Info(ctx).Msg("databroker: rebuilding configuration")
|
||||
|
||||
src.mu.Lock()
|
||||
defer src.mu.Unlock()
|
||||
|
||||
|
@ -107,6 +109,9 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
|
|||
// add all the config policies to the list
|
||||
for _, id := range ids {
|
||||
cfgpb := src.dbConfigs[id]
|
||||
if cfgpb.GetVersion() > 0 {
|
||||
cfg.Version = cfgpb.GetVersion()
|
||||
}
|
||||
|
||||
cfg.Options.ApplySettings(ctx, certsIndex, cfgpb.Settings)
|
||||
var errCount uint64
|
||||
|
@ -166,6 +171,8 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
|
|||
// add the additional policies here since calling `Validate` will reset them.
|
||||
cfg.Options.AdditionalPolicies = append(cfg.Options.AdditionalPolicies, additionalPolicies...)
|
||||
|
||||
log.Info(ctx).Int64("config-version", cfg.Version).Msg("databroker: built new config")
|
||||
|
||||
src.computedConfig = cfg
|
||||
if !firstTime {
|
||||
src.Trigger(ctx, cfg)
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -12,6 +12,7 @@ import "crypt/crypt.proto";
|
|||
|
||||
message Config {
|
||||
string name = 1;
|
||||
int64 version = 4;
|
||||
repeated Route routes = 2;
|
||||
Settings settings = 3;
|
||||
}
|
||||
|
@ -132,9 +133,7 @@ message Settings {
|
|||
bytes cert_bytes = 3;
|
||||
bytes key_bytes = 4;
|
||||
}
|
||||
message StringList {
|
||||
repeated string values = 1;
|
||||
}
|
||||
message StringList { repeated string values = 1; }
|
||||
|
||||
optional string installation_id = 71;
|
||||
optional bool debug = 2;
|
||||
|
|
Loading…
Add table
Reference in a new issue