diff --git a/.vscode/launch.json b/.vscode/launch.json index b520e2b45..3259ed702 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,10 +7,7 @@ "request": "launch", "mode": "debug", "program": "${workspaceRoot}/cmd/pomerium", - "args": [ - "-config", - "${workspaceRoot}/.config.yaml" - ] + "args": ["-config", "${workspaceRoot}/.config.yaml"] }, { "name": "Connect to server", @@ -19,7 +16,7 @@ "mode": "remote", "remotePath": "/go/src/github.com/pomerium/pomerium/", "port": 9999, - "host": "127.0.0.1", + "host": "127.0.0.1" } ] } diff --git a/authorize/sync.go b/authorize/sync.go index dd5b98269..7d231997b 100644 --- a/authorize/sync.go +++ b/authorize/sync.go @@ -38,7 +38,7 @@ func newDataBrokerSyncer(authorize *Authorize) *dataBrokerSyncer { syncer := &dataBrokerSyncer{ authorize: authorize, } - syncer.Syncer = databroker.NewSyncer(syncer) + syncer.Syncer = databroker.NewSyncer("authorize", syncer) return syncer } diff --git a/internal/databroker/config_source.go b/internal/databroker/config_source.go index b29cbbe72..6f11f44cc 100644 --- a/internal/databroker/config_source.go +++ b/internal/databroker/config_source.go @@ -22,17 +22,22 @@ type ConfigSource struct { mu sync.RWMutex computedConfig *config.Config underlyingConfig *config.Config - dbConfigs map[string]*configpb.Config + dbConfigs map[string]dbConfig updaterHash uint64 cancel func() config.ChangeDispatcher } +type dbConfig struct { + *configpb.Config + version uint64 +} + // NewConfigSource creates a new ConfigSource. func NewConfigSource(underlying config.Source, listeners ...config.ChangeListener) *ConfigSource { src := &ConfigSource{ - dbConfigs: map[string]*configpb.Config{}, + dbConfigs: map[string]dbConfig{}, } for _, li := range listeners { src.OnConfigChange(li) @@ -84,25 +89,31 @@ func (src *ConfigSource) rebuild(firstTime bool) { var additionalPolicies []config.Policy // add all the config policies to the list - for _, cfgpb := range src.dbConfigs { + for id, cfgpb := range src.dbConfigs { cfg.Options.ApplySettings(cfgpb.Settings) + var errCount uint64 err := cfg.Options.Validate() if err != nil { - log.Warn().Err(err).Msg("databroker: invalid config detected, ignoring") + metrics.SetDBConfigRejected(cfg.Options.Services, id, cfgpb.version, err) return } for _, routepb := range cfgpb.GetRoutes() { policy, err := config.NewPolicyFromProto(routepb) if err != nil { - log.Warn().Err(err).Msg("databroker: error converting protobuf into policy") + errCount++ + log.Warn().Err(err). + Str("db_config_id", id). + Msg("databroker: error converting protobuf into policy") continue } err = policy.Validate() if err != nil { + errCount++ log.Warn().Err(err). + Str("db_config_id", id). Str("policy", policy.String()). Msg("databroker: invalid policy, ignoring") continue @@ -110,14 +121,18 @@ func (src *ConfigSource) rebuild(firstTime bool) { routeID, err := policy.RouteID() if err != nil { + errCount++ log.Warn().Err(err). + Str("db_config_id", id). Str("policy", policy.String()). Msg("databroker: cannot establish policy route ID, ignoring") continue } if _, ok := seen[routeID]; ok { + errCount++ log.Warn().Err(err). + Str("db_config_id", id). Str("policy", policy.String()). Msg("databroker: duplicate policy detected, ignoring") continue @@ -126,6 +141,7 @@ func (src *ConfigSource) rebuild(firstTime bool) { additionalPolicies = append(additionalPolicies, *policy) } + metrics.SetDBConfigInfo(cfg.Options.Services, id, cfgpb.version, int64(errCount)) } // add the additional policies here since calling `Validate` will reset them. @@ -184,7 +200,7 @@ func (src *ConfigSource) runUpdater(cfg *config.Config) { ctx := context.Background() ctx, src.cancel = context.WithCancel(ctx) - syncer := databroker.NewSyncer(&syncerHandler{ + syncer := databroker.NewSyncer("databroker", &syncerHandler{ client: client, src: src, }, databroker.WithTypeURL(grpcutil.GetTypeURL(new(configpb.Config)))) @@ -202,7 +218,7 @@ func (s *syncerHandler) GetDataBrokerServiceClient() databroker.DataBrokerServic func (s *syncerHandler) ClearRecords(ctx context.Context) { s.src.mu.Lock() - s.src.dbConfigs = map[string]*configpb.Config{} + s.src.dbConfigs = map[string]dbConfig{} s.src.mu.Unlock() } @@ -226,7 +242,7 @@ func (s *syncerHandler) UpdateRecords(ctx context.Context, serverVersion uint64, continue } - s.src.dbConfigs[record.GetId()] = &cfgpb + s.src.dbConfigs[record.GetId()] = dbConfig{&cfgpb, record.Version} } s.src.mu.Unlock() diff --git a/internal/identity/manager/sync.go b/internal/identity/manager/sync.go index 81582a8ef..682cf7243 100644 --- a/internal/identity/manager/sync.go +++ b/internal/identity/manager/sync.go @@ -31,7 +31,7 @@ func newDataBrokerSyncer( update: update, clear: clear, } - syncer.syncer = databroker.NewSyncer(syncer) + syncer.syncer = databroker.NewSyncer("identity_manager", syncer) return syncer } diff --git a/internal/telemetry/metrics/const.go b/internal/telemetry/metrics/const.go index eda0386a4..a697329cc 100644 --- a/internal/telemetry/metrics/const.go +++ b/internal/telemetry/metrics/const.go @@ -10,6 +10,7 @@ import ( var ( TagKeyHTTPMethod = tag.MustNewKey("http_method") TagKeyService = tag.MustNewKey("service") + TagConfigID = tag.MustNewKey("config_id") TagKeyGRPCService = tag.MustNewKey("grpc_service") TagKeyGRPCMethod = tag.MustNewKey("grpc_method") TagKeyHost = tag.MustNewKey("host") diff --git a/internal/telemetry/metrics/info.go b/internal/telemetry/metrics/info.go index b9e4b357f..c4ea3c2ac 100644 --- a/internal/telemetry/metrics/info.go +++ b/internal/telemetry/metrics/info.go @@ -21,22 +21,52 @@ var ( ConfigLastReloadView, ConfigLastReloadSuccessView, IdentityManagerLastRefreshView, + ConfigDBVersionView, + ConfigDBErrorsView, } configLastReload = stats.Int64( metrics.ConfigLastReloadTimestampSeconds, "Timestamp of last successful config reload", - "seconds") + stats.UnitSeconds) + configDBVersion = stats.Int64( + metrics.ConfigDBVersion, + metrics.ConfigDBVersionHelp, + stats.UnitDimensionless, + ) + configDBErrors = stats.Int64( + metrics.ConfigDBErrors, + metrics.ConfigDBErrorsHelp, + stats.UnitDimensionless, + ) configLastReloadSuccess = stats.Int64( metrics.ConfigLastReloadSuccess, "Returns 1 if last reload was successful", - "1") + stats.UnitDimensionless) identityManagerLastRefresh = stats.Int64( metrics.IdentityManagerLastRefreshTimestamp, "Timestamp of last directory refresh", "seconds", ) + // ConfigDBVersionView contains last databroker config version that was processed + ConfigDBVersionView = &view.View{ + Name: configDBVersion.Name(), + Description: configDBVersion.Description(), + Measure: configDBVersion, + TagKeys: []tag.Key{TagKeyService, TagConfigID}, + Aggregation: view.LastValue(), + } + + // ConfigDBErrorsView contains list of errors encountered while parsing this databroker config + ConfigDBErrorsView = &view.View{ + Name: configDBErrors.Name(), + Description: configDBErrors.Description(), + Measure: configDBErrors, + TagKeys: []tag.Key{TagKeyService, TagConfigID}, + Aggregation: view.LastValue(), + } + // ConfigLastReloadView contains the timestamp the configuration was last // reloaded, labeled by service. ConfigLastReloadView = &view.View{ @@ -72,6 +102,46 @@ func RecordIdentityManagerLastRefresh() { stats.Record(context.Background(), identityManagerLastRefresh.M(time.Now().Unix())) } +// SetDBConfigInfo records status, databroker version and error count while parsing +// the configuration from a databroker +func SetDBConfigInfo(service, configID string, version uint64, errCount int64) { + log.Info(). + Str("service", service). + Str("config_id", configID). + Uint64("version", version). + Int64("err_count", errCount). + Msg("set db config info") + + if err := stats.RecordWithTags( + context.Background(), + []tag.Mutator{ + tag.Insert(TagKeyService, service), + tag.Insert(TagConfigID, configID), + }, + configDBVersion.M(int64(version)), + ); err != nil { + log.Error().Err(err).Msg("telemetry/metrics: failed to record config version number") + } + + if err := stats.RecordWithTags( + context.Background(), + []tag.Mutator{ + tag.Insert(TagKeyService, service), + tag.Insert(TagConfigID, configID), + }, + configDBErrors.M(errCount), + ); err != nil { + log.Error().Err(err).Msg("telemetry/metrics: failed to record config error count") + } + +} + +// SetDBConfigRejected records that a certain databroker config version has been rejected +func SetDBConfigRejected(service, configID string, version uint64, err error) { + log.Warn().Err(err).Msg("databroker: invalid config detected, ignoring") + SetDBConfigInfo(service, configID, version, -1) +} + // SetConfigInfo records the status, checksum and timestamp of a configuration // reload. You must register InfoViews or the related config views before calling func SetConfigInfo(service, configName string, checksum uint64, success bool) { diff --git a/internal/telemetry/metrics/info_test.go b/internal/telemetry/metrics/info_test.go index 243b5f5a8..fc520e222 100644 --- a/internal/telemetry/metrics/info_test.go +++ b/internal/telemetry/metrics/info_test.go @@ -1,6 +1,7 @@ package metrics import ( + "fmt" "runtime" "testing" @@ -35,6 +36,32 @@ func Test_SetConfigInfo(t *testing.T) { } } +func Test_SetDBConfigInfo(t *testing.T) { + tests := []struct { + version uint64 + errCount int64 + wantVersion string + wantErrors string + }{ + { + 1, + 2, + "{ { {config_id test_config}{service test_service} }&{1} }", + "{ { {config_id test_config}{service test_service} }&{2} }", + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("version=%d errors=%d", tt.version, tt.errCount), func(t *testing.T) { + view.Unregister(InfoViews...) + view.Register(InfoViews...) + SetDBConfigInfo("test_service", "test_config", tt.version, tt.errCount) + + testDataRetrieval(ConfigDBVersionView, t, tt.wantVersion) + testDataRetrieval(ConfigDBErrorsView, t, tt.wantErrors) + }) + } +} func Test_SetBuildInfo(t *testing.T) { registry = newMetricRegistry() diff --git a/pkg/grpc/databroker/syncer.go b/pkg/grpc/databroker/syncer.go index c5bd1cff7..0f3af6627 100644 --- a/pkg/grpc/databroker/syncer.go +++ b/pkg/grpc/databroker/syncer.go @@ -56,10 +56,12 @@ type Syncer struct { closeCtx context.Context closeCtxCancel func() + + id string } // NewSyncer creates a new Syncer. -func NewSyncer(handler SyncerHandler, options ...SyncerOption) *Syncer { +func NewSyncer(id string, handler SyncerHandler, options ...SyncerOption) *Syncer { closeCtx, closeCtxCancel := context.WithCancel(context.Background()) bo := backoff.NewExponentialBackOff() @@ -71,6 +73,8 @@ func NewSyncer(handler SyncerHandler, options ...SyncerOption) *Syncer { closeCtx: closeCtx, closeCtxCancel: closeCtxCancel, + + id: id, } } @@ -97,6 +101,7 @@ func (syncer *Syncer) Run(ctx context.Context) error { } if err != nil { + syncer.log().Error().Err(err).Msg("sync") select { case <-ctx.Done(): return ctx.Err() @@ -107,7 +112,7 @@ func (syncer *Syncer) Run(ctx context.Context) error { } func (syncer *Syncer) init(ctx context.Context) error { - syncer.log().Info().Msg("syncing latest records") + syncer.log().Info().Msg("initial sync") records, recordVersion, serverVersion, err := InitialSync(ctx, syncer.handler.GetDataBrokerServiceClient(), &SyncLatestRequest{ Type: syncer.cfg.typeURL, }) @@ -137,6 +142,8 @@ func (syncer *Syncer) sync(ctx context.Context) error { return err } + syncer.log().Info().Msg("listening for updates") + for { res, err := stream.Recv() if status.Code(err) == codes.Aborted { @@ -148,6 +155,12 @@ func (syncer *Syncer) sync(ctx context.Context) error { return err } + syncer.log().Debug(). + Uint("version", uint(res.Record.GetVersion())). + Str("type", res.Record.Type). + Str("id", res.Record.Id). + Msg("syncer got record") + if syncer.recordVersion != res.GetRecord().GetVersion()-1 { syncer.log().Error().Err(err). Uint64("received", res.GetRecord().GetVersion()). @@ -163,7 +176,7 @@ func (syncer *Syncer) sync(ctx context.Context) error { } func (syncer *Syncer) log() *zerolog.Logger { - l := log.With().Str("service", "syncer"). + l := log.With().Str("syncer_id", syncer.id). Str("type", syncer.cfg.typeURL). Uint64("server_version", syncer.serverVersion). Uint64("record_version", syncer.recordVersion).Logger() diff --git a/pkg/grpc/databroker/syncer_test.go b/pkg/grpc/databroker/syncer_test.go index af8652473..e6ce522ad 100644 --- a/pkg/grpc/databroker/syncer_test.go +++ b/pkg/grpc/databroker/syncer_test.go @@ -159,7 +159,7 @@ func TestSyncer(t *testing.T) { clearCh := make(chan struct{}) updateCh := make(chan []*Record) - syncer := NewSyncer(testSyncerHandler{ + syncer := NewSyncer("test", testSyncerHandler{ getDataBrokerServiceClient: func() DataBrokerServiceClient { return NewDataBrokerServiceClient(gc) }, diff --git a/pkg/metrics/constants.go b/pkg/metrics/constants.go index 59633964d..29cbb3f53 100644 --- a/pkg/metrics/constants.go +++ b/pkg/metrics/constants.go @@ -19,6 +19,14 @@ const ( PolicyCountTotal = "policy_count_total" // ConfigChecksumDecimal should only be used to compare config on a single node, it will be different in multi-node environment ConfigChecksumDecimal = "config_checksum_decimal" + // ConfigDBVersion sets currently loaded databroker config version config_db_version{service="service",id="config_id"} + ConfigDBVersion = "config_db_version" + // ConfigDBVersionHelp is the help text for ConfigDBVersion. + ConfigDBVersionHelp = "databroker current config record version" + // ConfigDBErrors sets number of errors while parsing current config that were tolerated + ConfigDBErrors = "config_db_errors" + // ConfigDBErrorsHelp is the help text for ConfigDBErrors. + ConfigDBErrorsHelp = "amount of errors observed while applying databroker config; -1 if validation failed and was rejected altogether" ) // labels