eliminate global events manager (#3422)

This commit is contained in:
Denis Mishin 2022-06-14 15:05:16 -04:00 committed by GitHub
parent 6b386f2a00
commit db426072b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 164 additions and 50 deletions

View file

@ -98,7 +98,7 @@ lint: ## Verifies `golint` passes.
.PHONY: test
test: get-envoy ## Runs the go tests.
@echo "==> $@"
@$(GO) test -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration)
@$(GO) test -race -tags "$(BUILDTAGS)" $(shell $(GO) list ./... | grep -v vendor | grep -v github.com/pomerium/pomerium/integration)
.PHONY: cover
cover: get-envoy ## Runs go test with coverage

View file

@ -17,6 +17,7 @@ import (
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity"
"github.com/pomerium/pomerium/internal/identity/manager"
"github.com/pomerium/pomerium/internal/log"
@ -33,6 +34,7 @@ import (
type DataBroker struct {
dataBrokerServer *dataBrokerServer
manager *manager.Manager
eventsMgr *events.Manager
localListener net.Listener
localGRPCServer *grpc.Server
@ -45,7 +47,7 @@ type DataBroker struct {
}
// New creates a new databroker service.
func New(cfg *config.Config) (*DataBroker, error) {
func New(cfg *config.Config, eventsMgr *events.Manager) (*DataBroker, error) {
localListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return nil, err
@ -100,6 +102,7 @@ func New(cfg *config.Config) (*DataBroker, error) {
localGRPCConnection: localGRPCConnection,
deprecatedCacheClusterDomain: dataBrokerURLs[0].Hostname(),
dataBrokerStorageType: cfg.Options.DataBrokerStorageType,
eventsMgr: eventsMgr,
}
c.Register(c.localGRPCServer)
@ -174,6 +177,7 @@ func (c *DataBroker) update(ctx context.Context, cfg *config.Config) error {
manager.WithDataBrokerClient(dataBrokerClient),
manager.WithGroupRefreshInterval(cfg.Options.RefreshDirectoryInterval),
manager.WithGroupRefreshTimeout(cfg.Options.RefreshDirectoryTimeout),
manager.WithEventManager(c.eventsMgr),
}
authenticator, err := identity.NewAuthenticator(oauthOptions)

View file

@ -6,6 +6,7 @@ import (
"testing"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/pkg/cryptutil"
)
@ -28,7 +29,7 @@ func TestNew(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.opts.Provider = "google"
_, err := New(&config.Config{Options: &tt.opts})
_, err := New(&config.Config{Options: &tt.opts}, events.New())
if (err != nil) != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
return

View file

@ -23,6 +23,7 @@ import (
"github.com/pomerium/pomerium/internal/databroker"
"github.com/pomerium/pomerium/internal/envoy"
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/registry"
"github.com/pomerium/pomerium/internal/version"
@ -63,8 +64,10 @@ func Run(ctx context.Context, configFile string) error {
traceMgr := config.NewTraceManager(ctx, src)
defer traceMgr.Close()
eventsMgr := events.New()
// setup the control plane
controlPlane, err := controlplane.NewServer(src.GetConfig(), metricsMgr)
controlPlane, err := controlplane.NewServer(src.GetConfig(), metricsMgr, eventsMgr)
if err != nil {
return fmt.Errorf("error creating control plane: %w", err)
}
@ -109,7 +112,7 @@ func Run(ctx context.Context, configFile string) error {
}
var dataBrokerServer *databroker_service.DataBroker
if config.IsDataBroker(src.GetConfig().Options.Services) {
dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane)
dataBrokerServer, err = setupDataBroker(ctx, src, controlPlane, eventsMgr)
if err != nil {
return fmt.Errorf("setting up databroker: %w", err)
}
@ -189,8 +192,12 @@ func setupAuthorize(ctx context.Context, src config.Source, controlPlane *contro
return svc, nil
}
func setupDataBroker(ctx context.Context, src config.Source, controlPlane *controlplane.Server) (*databroker_service.DataBroker, error) {
svc, err := databroker_service.New(src.GetConfig())
func setupDataBroker(ctx context.Context,
src config.Source,
controlPlane *controlplane.Server,
eventsMgr *events.Manager,
) (*databroker_service.DataBroker, error) {
svc, err := databroker_service.New(src.GetConfig(), eventsMgr)
if err != nil {
return nil, fmt.Errorf("error creating databroker service: %w", err)
}

View file

@ -65,6 +65,7 @@ type Server struct {
DebugListener net.Listener
DebugRouter *mux.Router
Builder *envoyconfig.Builder
EventsMgr *events.Manager
currentConfig atomicVersionedConfig
name string
@ -81,9 +82,10 @@ type Server struct {
}
// NewServer creates a new Server. Listener ports are chosen by the OS.
func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, error) {
func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager, eventsMgr *events.Manager) (*Server, error) {
srv := &Server{
metricsMgr: metricsMgr,
EventsMgr: eventsMgr,
reproxy: reproxy.New(),
haveSetCapacity: map[string]bool{},
}
@ -172,7 +174,7 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server,
return nil, err
}
srv.xdsmgr = xdsmgr.NewManager(res)
srv.xdsmgr = xdsmgr.NewManager(res, eventsMgr)
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv.GRPCServer, srv.xdsmgr)
return srv, nil
@ -182,12 +184,12 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server,
func (srv *Server) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
handle := events.Register(func(evt events.Event) {
handle := srv.EventsMgr.Register(func(evt events.Event) {
withGRPCBackoff(ctx, func() error {
return srv.storeEvent(ctx, evt)
})
})
defer events.Unregister(handle)
defer srv.EventsMgr.Unregister(handle)
// start the gRPC server
eg.Go(func() error {

View file

@ -45,10 +45,12 @@ type Manager struct {
nonceToConfig *lru.Cache
hostname string
events *events.Manager
}
// NewManager creates a new Manager.
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Manager {
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource, evt *events.Manager) *Manager {
nonceToConfig, _ := lru.New(maxNonceCacheSize) // the only error they return is when size is negative, which never happens
return &Manager{
@ -59,6 +61,8 @@ func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Ma
resources: resources,
hostname: getHostname(),
events: evt,
}
}
@ -268,7 +272,7 @@ func (mgr *Manager) nonceToConfigVersion(nonce string) (ver uint64) {
}
func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
events.Dispatch(&events.EnvoyConfigurationEvent{
mgr.events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK,
Time: timestamppb.Now(),
@ -294,7 +298,7 @@ func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_
}
func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
events.Dispatch(&events.EnvoyConfigurationEvent{
mgr.events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK,
Time: timestamppb.Now(),
@ -315,7 +319,7 @@ func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v
}
func (mgr *Manager) changeEvent(ctx context.Context, res *envoy_service_discovery_v3.DeltaDiscoveryResponse) {
events.Dispatch(&events.EnvoyConfigurationEvent{
mgr.events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE,
Time: timestamppb.Now(),

View file

@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/test/bufconn"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/signal"
)
@ -36,7 +37,7 @@ func TestManager(t *testing.T) {
typeURL: {
{Name: "r1", Version: "1"},
},
})
}, events.New())
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, mgr)
li := bufconn.Listen(bufSize)

View file

@ -21,21 +21,6 @@ type EventSink func(Event)
// An EventSinkHandle is a reference to a registered EventSink so that it can be unregistered.
type EventSinkHandle string
// Dispatch dispatches an event to any event sinks.
func Dispatch(evt Event) {
defaultManager.Dispatch(evt)
}
// Register registers a new sink to receive events.
func Register(sink EventSink) EventSinkHandle {
return defaultManager.Register(sink)
}
// Unregister unregisters a sink so it will no longer receive events.
func Unregister(sinkHandle EventSinkHandle) {
defaultManager.Unregister(sinkHandle)
}
type (
// EnvoyConfigurationEvent re-exports events.EnvoyConfigurationEvent.
EnvoyConfigurationEvent = events.EnvoyConfigurationEvent

View file

@ -9,8 +9,6 @@ import (
"github.com/pomerium/pomerium/internal/log"
)
var defaultManager = New()
// A Manager manages the dispatching of events to event sinks.
type Manager struct {
mu sync.RWMutex
@ -27,21 +25,31 @@ func New() *Manager {
// Dispatch dispatches an event to any registered event sinks.
func (mgr *Manager) Dispatch(evt Event) {
mgr.mu.RLock()
dropped := mgr.dispatchLocked(evt)
mgr.mu.RUnlock()
if dropped {
log.Warn(context.Background()).
Interface("event", evt).
Msg("controlplane: dropping event due to full channel")
}
}
func (mgr *Manager) dispatchLocked(evt Event) bool {
sinks := make([]chan Event, 0, len(mgr.sinks))
for _, sink := range mgr.sinks {
sinks = append(sinks, sink)
}
mgr.mu.RUnlock()
dropped := false
for _, sink := range sinks {
select {
case sink <- evt:
default:
log.Warn(context.Background()).
Interface("event", evt).
Msg("controlplane: dropping event due to full channel")
dropped = true
}
}
return dropped
}
// Register registers an event sink to receive events.

View file

@ -5,6 +5,7 @@ import (
"time"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
@ -24,6 +25,7 @@ type config struct {
sessionRefreshGracePeriod time.Duration
sessionRefreshCoolOffDuration time.Duration
now func() time.Time
eventMgr *events.Manager
}
func newConfig(options ...Option) *config {
@ -98,6 +100,13 @@ func WithNow(now func() time.Time) Option {
}
}
// WithEventManager passes an event manager to record events
func WithEventManager(mgr *events.Manager) Option {
return func(c *config) {
c.eventMgr = mgr
}
}
type atomicConfig struct {
value atomic.Value
}

View file

@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity/identity"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/scheduler"
@ -23,6 +24,7 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/grpcutil"
metrics_ids "github.com/pomerium/pomerium/pkg/metrics"
"github.com/pomerium/pomerium/pkg/protoutil"
)
@ -208,6 +210,7 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh
directoryGroups, directoryUsers, err := mgr.cfg.Load().directory.UserGroups(ctx)
metrics.RecordIdentityManagerUserGroupRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastUserGroupRefreshError, err)
if err != nil {
msg := "failed to refresh directory users and groups"
if ctx.Err() != nil {
@ -356,6 +359,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string
newToken, err := mgr.cfg.Load().authenticator.Refresh(ctx, FromOAuthToken(s.OauthToken), &s)
metrics.RecordIdentityManagerSessionRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastSessionRefreshError, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
Str("user_id", s.GetUserId()).
@ -374,6 +378,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string
err = mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &s)
metrics.RecordIdentityManagerUserRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
Str("user_id", s.GetUserId()).
@ -426,6 +431,7 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) {
err := mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &u)
metrics.RecordIdentityManagerUserRefresh(ctx, err)
mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
Str("user_id", s.GetUserId()).
@ -552,6 +558,21 @@ func (mgr *Manager) reset() {
mgr.users = userCollection{BTree: btree.New(8)}
}
func (mgr *Manager) recordLastError(id string, err error) {
if err == nil {
return
}
evtMgr := mgr.cfg.Load().eventMgr
if evtMgr == nil {
return
}
evtMgr.Dispatch(&events.LastError{
Time: timestamppb.Now(),
Message: err.Error(),
Id: id,
})
}
func isTemporaryError(err error) bool {
if err == nil {
return false

View file

@ -2,19 +2,25 @@ package manager
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity/identity"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/databroker/mock_databroker"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
metrics_ids "github.com/pomerium/pomerium/pkg/metrics"
"github.com/pomerium/pomerium/pkg/protoutil"
)
@ -31,6 +37,20 @@ func (mock mockProvider) UserGroups(ctx context.Context) ([]*directory.Group, []
return mock.userGroups(ctx)
}
type mockAuthenticator struct{}
func (mock mockAuthenticator) Refresh(_ context.Context, _ *oauth2.Token, _ identity.State) (*oauth2.Token, error) {
return nil, errors.New("update session")
}
func (mock mockAuthenticator) Revoke(_ context.Context, _ *oauth2.Token) error {
return errors.New("not implemented")
}
func (mock mockAuthenticator) UpdateUserInfo(_ context.Context, _ *oauth2.Token, _ any) error {
return errors.New("update user info")
}
func TestManager_onUpdateRecords(t *testing.T) {
ctrl := gomock.NewController(t)
@ -109,6 +129,71 @@ func TestManager_refreshDirectoryUserGroups(t *testing.T) {
})
}
func TestManager_reportErrors(t *testing.T) {
ctrl := gomock.NewController(t)
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
defer clearTimeout()
evtMgr := events.New()
received := make(chan events.Event, 1)
handle := evtMgr.Register(func(evt events.Event) {
received <- evt
})
defer evtMgr.Unregister(handle)
expectMsg := func(id, msg string) {
t.Helper()
assert.Eventually(t, func() bool {
select {
case evt := <-received:
lastErr := evt.(*events.LastError)
return msg == lastErr.Message && id == lastErr.Id
default:
return false
}
}, time.Second, time.Millisecond*20, msg)
}
client := mock_databroker.NewMockDataBrokerServiceClient(ctrl)
client.EXPECT().Put(gomock.Any(), gomock.Any()).AnyTimes()
mgr := New(
WithEventManager(evtMgr),
WithDataBrokerClient(client),
WithAuthenticator(mockAuthenticator{}),
WithDirectoryProvider(mockProvider{
user: func(ctx context.Context, userID, accessToken string) (*directory.User, error) {
return nil, fmt.Errorf("user")
},
userGroups: func(ctx context.Context) ([]*directory.Group, []*directory.User, error) {
return nil, nil, fmt.Errorf("user groups")
},
}),
WithGroupRefreshInterval(time.Second),
)
mgr.directoryBackoff.RandomizationFactor = 0 // disable randomization for deterministic testing
mgr.onUpdateRecords(ctx, updateRecordsMessage{
records: []*databroker.Record{
mkRecord(&directory.Group{Id: "group1", Name: "group 1", Email: "group1@example.com"}),
mkRecord(&directory.User{Id: "user1", DisplayName: "user 1", Email: "user1@example.com", GroupIds: []string{"group1s"}}),
mkRecord(&session.Session{Id: "session1", UserId: "user1", OauthToken: &session.OAuthToken{
ExpiresAt: timestamppb.New(time.Now().Add(time.Hour)),
}, ExpiresAt: timestamppb.New(time.Now().Add(time.Hour))}),
mkRecord(&user.User{Id: "user1", Name: "user 1", Email: "user1@example.com"}),
},
})
_ = mgr.refreshDirectoryUserGroups(ctx)
expectMsg(metrics_ids.IdentityManagerLastUserGroupRefreshError, "user groups")
mgr.refreshUser(ctx, "user1")
expectMsg(metrics_ids.IdentityManagerLastUserRefreshError, "update user info")
mgr.refreshSession(ctx, "user1", "session1")
expectMsg(metrics_ids.IdentityManagerLastSessionRefreshError, "update session")
}
func mkRecord(msg recordable) *databroker.Record {
any := protoutil.NewAny(msg)
return &databroker.Record{

View file

@ -9,9 +9,7 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/metrics"
)
@ -277,7 +275,6 @@ func RecordIdentityManagerUserRefresh(ctx context.Context, err error) {
if err != nil {
counter = identityManagerLastUserRefreshError
ts = identityManagerLastUserRefreshErrorTimestamp
storeLastErrorEvent(counter.Name(), err)
}
stats.Record(ctx,
ts.M(time.Now().Unix()),
@ -292,7 +289,6 @@ func RecordIdentityManagerUserGroupRefresh(ctx context.Context, err error) {
if err != nil {
counter = identityManagerLastUserGroupRefreshError
ts = identityManagerLastUserGroupRefreshErrorTimestamp
storeLastErrorEvent(counter.Name(), err)
}
stats.Record(ctx,
ts.M(time.Now().Unix()),
@ -307,7 +303,6 @@ func RecordIdentityManagerSessionRefresh(ctx context.Context, err error) {
if err != nil {
counter = identityManagerLastSessionRefreshError
ts = identityManagerLastSessionRefreshErrorTimestamp
storeLastErrorEvent(counter.Name(), err)
}
stats.Record(ctx,
ts.M(time.Now().Unix()),
@ -315,14 +310,6 @@ func RecordIdentityManagerSessionRefresh(ctx context.Context, err error) {
)
}
func storeLastErrorEvent(id string, err error) {
events.Dispatch(&events.LastError{
Time: timestamppb.Now(),
Message: err.Error(),
Id: id,
})
}
// SetDBConfigInfo records status, databroker version and error count while parsing
// the configuration from a databroker
func SetDBConfigInfo(ctx context.Context, service, configID string, version uint64, errCount int64) {