From db426072b051f3a11d0062d5b2f2c2420d7a83f9 Mon Sep 17 00:00:00 2001 From: Denis Mishin Date: Tue, 14 Jun 2022 15:05:16 -0400 Subject: [PATCH] eliminate global events manager (#3422) --- Makefile | 2 +- databroker/cache.go | 6 +- databroker/cache_test.go | 3 +- internal/cmd/pomerium/pomerium.go | 15 +++- internal/controlplane/server.go | 10 ++- internal/controlplane/xdsmgr/xdsmgr.go | 12 ++- internal/controlplane/xdsmgr/xdsmgr_test.go | 3 +- internal/events/events.go | 15 ---- internal/events/manager.go | 20 +++-- internal/identity/manager/config.go | 9 +++ internal/identity/manager/manager.go | 21 +++++ internal/identity/manager/manager_test.go | 85 +++++++++++++++++++++ internal/telemetry/metrics/info.go | 13 ---- 13 files changed, 164 insertions(+), 50 deletions(-) diff --git a/Makefile b/Makefile index da88fc01e..4943426e9 100644 --- a/Makefile +++ b/Makefile @@ -98,7 +98,7 @@ lint: ## Verifies `golint` passes. .PHONY: test test: get-envoy ## Runs the go tests. @echo "==> $@" - @$(GO) test -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration) + @$(GO) test -race -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration) .PHONY: cover cover: get-envoy ## Runs go test with coverage diff --git a/databroker/cache.go b/databroker/cache.go index e7b31a57d..e26e5ef00 100644 --- a/databroker/cache.go +++ b/databroker/cache.go @@ -17,6 +17,7 @@ import ( "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/directory" "github.com/pomerium/pomerium/internal/envoy/files" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/identity" "github.com/pomerium/pomerium/internal/identity/manager" "github.com/pomerium/pomerium/internal/log" @@ -33,6 +34,7 @@ import ( type DataBroker struct { dataBrokerServer *dataBrokerServer manager *manager.Manager + eventsMgr *events.Manager localListener net.Listener localGRPCServer *grpc.Server @@ -45,7 +47,7 @@ type DataBroker struct { } // New creates a new databroker service. -func New(cfg *config.Config) (*DataBroker, error) { +func New(cfg *config.Config, eventsMgr *events.Manager) (*DataBroker, error) { localListener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { return nil, err @@ -100,6 +102,7 @@ func New(cfg *config.Config) (*DataBroker, error) { localGRPCConnection: localGRPCConnection, deprecatedCacheClusterDomain: dataBrokerURLs[0].Hostname(), dataBrokerStorageType: cfg.Options.DataBrokerStorageType, + eventsMgr: eventsMgr, } c.Register(c.localGRPCServer) @@ -174,6 +177,7 @@ func (c *DataBroker) update(ctx context.Context, cfg *config.Config) error { manager.WithDataBrokerClient(dataBrokerClient), manager.WithGroupRefreshInterval(cfg.Options.RefreshDirectoryInterval), manager.WithGroupRefreshTimeout(cfg.Options.RefreshDirectoryTimeout), + manager.WithEventManager(c.eventsMgr), } authenticator, err := identity.NewAuthenticator(oauthOptions) diff --git a/databroker/cache_test.go b/databroker/cache_test.go index bd7e53193..3c9ffa3d3 100644 --- a/databroker/cache_test.go +++ b/databroker/cache_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/pkg/cryptutil" ) @@ -28,7 +29,7 @@ func TestNew(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.opts.Provider = "google" - _, err := New(&config.Config{Options: &tt.opts}) + _, err := New(&config.Config{Options: &tt.opts}, events.New()) if (err != nil) != tt.wantErr { t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/cmd/pomerium/pomerium.go b/internal/cmd/pomerium/pomerium.go index 37bb8f789..239a938fa 100644 --- a/internal/cmd/pomerium/pomerium.go +++ b/internal/cmd/pomerium/pomerium.go @@ -23,6 +23,7 @@ import ( "github.com/pomerium/pomerium/internal/databroker" "github.com/pomerium/pomerium/internal/envoy" "github.com/pomerium/pomerium/internal/envoy/files" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/registry" "github.com/pomerium/pomerium/internal/version" @@ -63,8 +64,10 @@ func Run(ctx context.Context, configFile string) error { traceMgr := config.NewTraceManager(ctx, src) defer traceMgr.Close() + eventsMgr := events.New() + // setup the control plane - controlPlane, err := controlplane.NewServer(src.GetConfig(), metricsMgr) + controlPlane, err := controlplane.NewServer(src.GetConfig(), metricsMgr, eventsMgr) if err != nil { return fmt.Errorf("error creating control plane: %w", err) } @@ -109,7 +112,7 @@ func Run(ctx context.Context, configFile string) error { } var dataBrokerServer *databroker_service.DataBroker if config.IsDataBroker(src.GetConfig().Options.Services) { - dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane) + dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr) if err != nil { return fmt.Errorf("setting up databroker: %w", err) } @@ -189,8 +192,12 @@ func setupAuthorize(ctx context.Context, src config.Source, controlPlane *contro return svc, nil } -func setupDataBroker(ctx context.Context, src config.Source, controlPlane *controlplane.Server) (*databroker_service.DataBroker, error) { - svc, err := databroker_service.New(src.GetConfig()) +func setupDataBroker(ctx context.Context, + src config.Source, + controlPlane *controlplane.Server, + eventsMgr *events.Manager, +) (*databroker_service.DataBroker, error) { + svc, err := databroker_service.New(src.GetConfig(), eventsMgr) if err != nil { return nil, fmt.Errorf("error creating databroker service: %w", err) } diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index de5417e36..154d824e2 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -65,6 +65,7 @@ type Server struct { DebugListener net.Listener DebugRouter *mux.Router Builder *envoyconfig.Builder + EventsMgr *events.Manager currentConfig atomicVersionedConfig name string @@ -81,9 +82,10 @@ type Server struct { } // NewServer creates a new Server. Listener ports are chosen by the OS. -func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, error) { +func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager, eventsMgr *events.Manager) (*Server, error) { srv := &Server{ metricsMgr: metricsMgr, + EventsMgr: eventsMgr, reproxy: reproxy.New(), haveSetCapacity: map[string]bool{}, } @@ -172,7 +174,7 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, return nil, err } - srv.xdsmgr = xdsmgr.NewManager(res) + srv.xdsmgr = xdsmgr.NewManager(res, eventsMgr) envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv.GRPCServer, srv.xdsmgr) return srv, nil @@ -182,12 +184,12 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, func (srv *Server) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - handle := events.Register(func(evt events.Event) { + handle := srv.EventsMgr.Register(func(evt events.Event) { withGRPCBackoff(ctx, func() error { return srv.storeEvent(ctx, evt) }) }) - defer events.Unregister(handle) + defer srv.EventsMgr.Unregister(handle) // start the gRPC server eg.Go(func() error { diff --git a/internal/controlplane/xdsmgr/xdsmgr.go b/internal/controlplane/xdsmgr/xdsmgr.go index 81eb640fd..539f7f1cc 100644 --- a/internal/controlplane/xdsmgr/xdsmgr.go +++ b/internal/controlplane/xdsmgr/xdsmgr.go @@ -45,10 +45,12 @@ type Manager struct { nonceToConfig *lru.Cache hostname string + + events *events.Manager } // NewManager creates a new Manager. -func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Manager { +func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource, evt *events.Manager) *Manager { nonceToConfig, _ := lru.New(maxNonceCacheSize) // the only error they return is when size is negative, which never happens return &Manager{ @@ -59,6 +61,8 @@ func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Ma resources: resources, hostname: getHostname(), + + events: evt, } } @@ -268,7 +272,7 @@ func (mgr *Manager) nonceToConfigVersion(nonce string) (ver uint64) { } func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) { - events.Dispatch(&events.EnvoyConfigurationEvent{ + mgr.events.Dispatch(&events.EnvoyConfigurationEvent{ Instance: mgr.hostname, Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK, Time: timestamppb.Now(), @@ -294,7 +298,7 @@ func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_ } func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) { - events.Dispatch(&events.EnvoyConfigurationEvent{ + mgr.events.Dispatch(&events.EnvoyConfigurationEvent{ Instance: mgr.hostname, Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK, Time: timestamppb.Now(), @@ -315,7 +319,7 @@ func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v } func (mgr *Manager) changeEvent(ctx context.Context, res *envoy_service_discovery_v3.DeltaDiscoveryResponse) { - events.Dispatch(&events.EnvoyConfigurationEvent{ + mgr.events.Dispatch(&events.EnvoyConfigurationEvent{ Instance: mgr.hostname, Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE, Time: timestamppb.Now(), diff --git a/internal/controlplane/xdsmgr/xdsmgr_test.go b/internal/controlplane/xdsmgr/xdsmgr_test.go index cd0fa8fe8..ca508f91d 100644 --- a/internal/controlplane/xdsmgr/xdsmgr_test.go +++ b/internal/controlplane/xdsmgr/xdsmgr_test.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/test/bufconn" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/signal" ) @@ -36,7 +37,7 @@ func TestManager(t *testing.T) { typeURL: { {Name: "r1", Version: "1"}, }, - }) + }, events.New()) envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, mgr) li := bufconn.Listen(bufSize) diff --git a/internal/events/events.go b/internal/events/events.go index 4fa0ac883..424f84955 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -21,21 +21,6 @@ type EventSink func(Event) // An EventSinkHandle is a reference to a registered EventSink so that it can be unregistered. type EventSinkHandle string -// Dispatch dispatches an event to any event sinks. -func Dispatch(evt Event) { - defaultManager.Dispatch(evt) -} - -// Register registers a new sink to receive events. -func Register(sink EventSink) EventSinkHandle { - return defaultManager.Register(sink) -} - -// Unregister unregisters a sink so it will no longer receive events. -func Unregister(sinkHandle EventSinkHandle) { - defaultManager.Unregister(sinkHandle) -} - type ( // EnvoyConfigurationEvent re-exports events.EnvoyConfigurationEvent. EnvoyConfigurationEvent = events.EnvoyConfigurationEvent diff --git a/internal/events/manager.go b/internal/events/manager.go index f289fdb21..4623e54df 100644 --- a/internal/events/manager.go +++ b/internal/events/manager.go @@ -9,8 +9,6 @@ import ( "github.com/pomerium/pomerium/internal/log" ) -var defaultManager = New() - // A Manager manages the dispatching of events to event sinks. type Manager struct { mu sync.RWMutex @@ -27,21 +25,31 @@ func New() *Manager { // Dispatch dispatches an event to any registered event sinks. func (mgr *Manager) Dispatch(evt Event) { mgr.mu.RLock() + dropped := mgr.dispatchLocked(evt) + mgr.mu.RUnlock() + + if dropped { + log.Warn(context.Background()). + Interface("event", evt). + Msg("controlplane: dropping event due to full channel") + } +} + +func (mgr *Manager) dispatchLocked(evt Event) bool { sinks := make([]chan Event, 0, len(mgr.sinks)) for _, sink := range mgr.sinks { sinks = append(sinks, sink) } - mgr.mu.RUnlock() + dropped := false for _, sink := range sinks { select { case sink <- evt: default: - log.Warn(context.Background()). - Interface("event", evt). - Msg("controlplane: dropping event due to full channel") + dropped = true } } + return dropped } // Register registers an event sink to receive events. diff --git a/internal/identity/manager/config.go b/internal/identity/manager/config.go index e9ef874d6..4620a67d6 100644 --- a/internal/identity/manager/config.go +++ b/internal/identity/manager/config.go @@ -5,6 +5,7 @@ import ( "time" "github.com/pomerium/pomerium/internal/directory" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/pkg/grpc/databroker" ) @@ -24,6 +25,7 @@ type config struct { sessionRefreshGracePeriod time.Duration sessionRefreshCoolOffDuration time.Duration now func() time.Time + eventMgr *events.Manager } func newConfig(options ...Option) *config { @@ -98,6 +100,13 @@ func WithNow(now func() time.Time) Option { } } +// WithEventManager passes an event manager to record events +func WithEventManager(mgr *events.Manager) Option { + return func(c *config) { + c.eventMgr = mgr + } +} + type atomicConfig struct { value atomic.Value } diff --git a/internal/identity/manager/manager.go b/internal/identity/manager/manager.go index 63cdbb4cd..d651e6b9b 100644 --- a/internal/identity/manager/manager.go +++ b/internal/identity/manager/manager.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/directory" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/identity/identity" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/scheduler" @@ -23,6 +24,7 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/grpcutil" + metrics_ids "github.com/pomerium/pomerium/pkg/metrics" "github.com/pomerium/pomerium/pkg/protoutil" ) @@ -208,6 +210,7 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh directoryGroups, directoryUsers, err := mgr.cfg.Load().directory.UserGroups(ctx) metrics.RecordIdentityManagerUserGroupRefresh(ctx, err) + mgr.recordLastError(metrics_ids.IdentityManagerLastUserGroupRefreshError, err) if err != nil { msg := "failed to refresh directory users and groups" if ctx.Err() != nil { @@ -356,6 +359,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string newToken, err := mgr.cfg.Load().authenticator.Refresh(ctx, FromOAuthToken(s.OauthToken), &s) metrics.RecordIdentityManagerSessionRefresh(ctx, err) + mgr.recordLastError(metrics_ids.IdentityManagerLastSessionRefreshError, err) if isTemporaryError(err) { log.Error(ctx).Err(err). Str("user_id", s.GetUserId()). @@ -374,6 +378,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string err = mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &s) metrics.RecordIdentityManagerUserRefresh(ctx, err) + mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err) if isTemporaryError(err) { log.Error(ctx).Err(err). Str("user_id", s.GetUserId()). @@ -426,6 +431,7 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) { err := mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &u) metrics.RecordIdentityManagerUserRefresh(ctx, err) + mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err) if isTemporaryError(err) { log.Error(ctx).Err(err). Str("user_id", s.GetUserId()). @@ -552,6 +558,21 @@ func (mgr *Manager) reset() { mgr.users = userCollection{BTree: btree.New(8)} } +func (mgr *Manager) recordLastError(id string, err error) { + if err == nil { + return + } + evtMgr := mgr.cfg.Load().eventMgr + if evtMgr == nil { + return + } + evtMgr.Dispatch(&events.LastError{ + Time: timestamppb.Now(), + Message: err.Error(), + Id: id, + }) +} + func isTemporaryError(err error) bool { if err == nil { return false diff --git a/internal/identity/manager/manager_test.go b/internal/identity/manager/manager_test.go index 2551d3b8f..1c6bca1fe 100644 --- a/internal/identity/manager/manager_test.go +++ b/internal/identity/manager/manager_test.go @@ -2,19 +2,25 @@ package manager import ( "context" + "errors" "fmt" "testing" "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "golang.org/x/oauth2" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/directory" + "github.com/pomerium/pomerium/internal/events" + "github.com/pomerium/pomerium/internal/identity/identity" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker/mock_databroker" "github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/user" + metrics_ids "github.com/pomerium/pomerium/pkg/metrics" "github.com/pomerium/pomerium/pkg/protoutil" ) @@ -31,6 +37,20 @@ func (mock mockProvider) UserGroups(ctx context.Context) ([]*directory.Group, [] return mock.userGroups(ctx) } +type mockAuthenticator struct{} + +func (mock mockAuthenticator) Refresh(_ context.Context, _ *oauth2.Token, _ identity.State) (*oauth2.Token, error) { + return nil, errors.New("update session") +} + +func (mock mockAuthenticator) Revoke(_ context.Context, _ *oauth2.Token) error { + return errors.New("not implemented") +} + +func (mock mockAuthenticator) UpdateUserInfo(_ context.Context, _ *oauth2.Token, _ any) error { + return errors.New("update user info") +} + func TestManager_onUpdateRecords(t *testing.T) { ctrl := gomock.NewController(t) @@ -109,6 +129,71 @@ func TestManager_refreshDirectoryUserGroups(t *testing.T) { }) } +func TestManager_reportErrors(t *testing.T) { + ctrl := gomock.NewController(t) + + ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10) + defer clearTimeout() + + evtMgr := events.New() + received := make(chan events.Event, 1) + handle := evtMgr.Register(func(evt events.Event) { + received <- evt + }) + defer evtMgr.Unregister(handle) + + expectMsg := func(id, msg string) { + t.Helper() + assert.Eventually(t, func() bool { + select { + case evt := <-received: + lastErr := evt.(*events.LastError) + return msg == lastErr.Message && id == lastErr.Id + default: + return false + } + }, time.Second, time.Millisecond*20, msg) + } + + client := mock_databroker.NewMockDataBrokerServiceClient(ctrl) + client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes() + mgr := New( + WithEventManager(evtMgr), + WithDataBrokerClient(client), + WithAuthenticator(mockAuthenticator{}), + WithDirectoryProvider(mockProvider{ + user: func(ctx context.Context, userID, accessToken string) (*directory.User, error) { + return nil, fmt.Errorf("user") + }, + userGroups: func(ctx context.Context) ([]*directory.Group, []*directory.User, error) { + return nil, nil, fmt.Errorf("user groups") + }, + }), + WithGroupRefreshInterval(time.Second), + ) + mgr.directoryBackoff.RandomizationFactor = 0 // disable randomization for deterministic testing + + mgr.onUpdateRecords(ctx, updateRecordsMessage{ + records: []*databroker.Record{ + mkRecord(&directory.Group{Id: "group1", Name: "group 1", Email: "group1@example.com"}), + mkRecord(&directory.User{Id: "user1", DisplayName: "user 1", Email: "user1@example.com", GroupIds: []string{"group1s"}}), + mkRecord(&session.Session{Id: "session1", UserId: "user1", OauthToken: &session.OAuthToken{ + ExpiresAt: timestamppb.New(time.Now().Add(time.Hour)), + }, ExpiresAt: timestamppb.New(time.Now().Add(time.Hour))}), + mkRecord(&user.User{Id: "user1", Name: "user 1", Email: "user1@example.com"}), + }, + }) + + _ = mgr.refreshDirectoryUserGroups(ctx) + expectMsg(metrics_ids.IdentityManagerLastUserGroupRefreshError, "user groups") + + mgr.refreshUser(ctx, "user1") + expectMsg(metrics_ids.IdentityManagerLastUserRefreshError, "update user info") + + mgr.refreshSession(ctx, "user1", "session1") + expectMsg(metrics_ids.IdentityManagerLastSessionRefreshError, "update session") +} + func mkRecord(msg recordable) *databroker.Record { any := protoutil.NewAny(msg) return &databroker.Record{ diff --git a/internal/telemetry/metrics/info.go b/internal/telemetry/metrics/info.go index db4ee2aab..471a0bab1 100644 --- a/internal/telemetry/metrics/info.go +++ b/internal/telemetry/metrics/info.go @@ -9,9 +9,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/pkg/metrics" ) @@ -277,7 +275,6 @@ func RecordIdentityManagerUserRefresh(ctx context.Context, err error) { if err != nil { counter = identityManagerLastUserRefreshError ts = identityManagerLastUserRefreshErrorTimestamp - storeLastErrorEvent(counter.Name(), err) } stats.Record(ctx, ts.M(time.Now().Unix()), @@ -292,7 +289,6 @@ func RecordIdentityManagerUserGroupRefresh(ctx context.Context, err error) { if err != nil { counter = identityManagerLastUserGroupRefreshError ts = identityManagerLastUserGroupRefreshErrorTimestamp - storeLastErrorEvent(counter.Name(), err) } stats.Record(ctx, ts.M(time.Now().Unix()), @@ -307,7 +303,6 @@ func RecordIdentityManagerSessionRefresh(ctx context.Context, err error) { if err != nil { counter = identityManagerLastSessionRefreshError ts = identityManagerLastSessionRefreshErrorTimestamp - storeLastErrorEvent(counter.Name(), err) } stats.Record(ctx, ts.M(time.Now().Unix()), @@ -315,14 +310,6 @@ func RecordIdentityManagerSessionRefresh(ctx context.Context, err error) { ) } -func storeLastErrorEvent(id string, err error) { - events.Dispatch(&events.LastError{ - Time: timestamppb.Now(), - Message: err.Error(), - Id: id, - }) -} - // SetDBConfigInfo records status, databroker version and error count while parsing // the configuration from a databroker func SetDBConfigInfo(ctx context.Context, service, configID string, version uint64, errCount int64) {