directory: save IDP errors to databroker, put event handling in dedicated package (#2957)

This commit is contained in:
Caleb Doxsey 2022-01-28 15:15:32 -07:00 committed by GitHub
parent 2f328e7de0
commit 64ee7eca5c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 451 additions and 83 deletions

View file

@ -14,40 +14,14 @@ import (
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/grpc"
databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/events"
"github.com/pomerium/pomerium/pkg/protoutil"
)
const maxEnvoyConfigurationEvents = 50
const maxEvents = 50
var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn)
func (srv *Server) handleEnvoyConfigurationEvent(evt *events.EnvoyConfigurationEvent) {
select {
case srv.envoyConfigurationEvents <- evt:
default:
log.Warn(context.Background()).
Interface("event", evt).
Msg("controlplane: dropping envoy configuration event due to full channel")
}
}
func (srv *Server) runEnvoyConfigurationEventHandler(ctx context.Context) error {
for {
var evt *events.EnvoyConfigurationEvent
select {
case <-ctx.Done():
return ctx.Err()
case evt = <-srv.envoyConfigurationEvents:
}
withGRPCBackoff(ctx, func() error {
return srv.storeEnvoyConfigurationEvent(ctx, evt)
})
}
}
func (srv *Server) storeEnvoyConfigurationEvent(ctx context.Context, evt *events.EnvoyConfigurationEvent) error {
func (srv *Server) storeEvent(ctx context.Context, evt proto.Message) error {
any := protoutil.NewAny(evt)
client, err := srv.getDataBrokerClient(ctx)
@ -55,17 +29,17 @@ func (srv *Server) storeEnvoyConfigurationEvent(ctx context.Context, evt *events
return err
}
if !srv.haveSetEnvoyConfigurationEventOptions {
if !srv.haveSetCapacity[any.GetTypeUrl()] {
_, err = client.SetOptions(ctx, &databrokerpb.SetOptionsRequest{
Type: any.GetTypeUrl(),
Options: &databrokerpb.Options{
Capacity: proto.Uint64(maxEnvoyConfigurationEvents),
Capacity: proto.Uint64(maxEvents),
},
})
if err != nil {
return err
}
srv.haveSetEnvoyConfigurationEventOptions = true
srv.haveSetCapacity[any.GetTypeUrl()] = true
}
_, err = client.Put(ctx, &databrokerpb.PutRequest{

View file

@ -32,33 +32,6 @@ func (mock *mockDataBrokerServer) SetOptions(ctx context.Context, req *databroke
}
func TestEvents(t *testing.T) {
t.Run("passes events", func(t *testing.T) {
srv := &Server{envoyConfigurationEvents: make(chan *events.EnvoyConfigurationEvent, 1)}
srv.handleEnvoyConfigurationEvent(new(events.EnvoyConfigurationEvent))
evt := <-srv.envoyConfigurationEvents
assert.NotNil(t, evt)
})
t.Run("receives events", func(t *testing.T) {
ctx := context.Background()
srv := &Server{
envoyConfigurationEvents: make(chan *events.EnvoyConfigurationEvent, 1),
}
srv.currentConfig.Store(versionedConfig{
Config: &config.Config{
Options: &config.Options{},
},
})
ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return srv.runEnvoyConfigurationEventHandler(ctx)
})
srv.envoyConfigurationEvents <- new(events.EnvoyConfigurationEvent)
cancel()
assert.Equal(t, context.Canceled, eg.Wait())
})
t.Run("saves events", func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
@ -98,7 +71,9 @@ func TestEvents(t *testing.T) {
eg.Go(func() error {
defer cancel()
srv := &Server{}
srv := &Server{
haveSetCapacity: make(map[string]bool),
}
srv.currentConfig.Store(versionedConfig{
Config: &config.Config{
OutboundPort: outboundPort,
@ -109,13 +84,13 @@ func TestEvents(t *testing.T) {
},
},
})
err := srv.storeEnvoyConfigurationEvent(ctx, new(events.EnvoyConfigurationEvent))
err := srv.storeEvent(ctx, new(events.EnvoyConfigurationEvent))
assert.NoError(t, err)
return err
})
_ = eg.Wait()
assert.Equal(t, uint64(maxEnvoyConfigurationEvents), setOptionsRequest.GetOptions().GetCapacity())
assert.Equal(t, uint64(maxEvents), setOptionsRequest.GetOptions().GetCapacity())
assert.Equal(t, "type.googleapis.com/pomerium.events.EnvoyConfigurationEvent", putRequest.GetRecord().GetType())
})
}

View file

@ -21,13 +21,13 @@ import (
"github.com/pomerium/pomerium/config/envoyconfig/filemgr"
"github.com/pomerium/pomerium/internal/controlplane/xdsmgr"
"github.com/pomerium/pomerium/internal/envoy/files"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/httputil/reproxy"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/requestid"
"github.com/pomerium/pomerium/internal/version"
pom_grpc "github.com/pomerium/pomerium/pkg/grpc"
"github.com/pomerium/pomerium/pkg/grpc/events"
"github.com/pomerium/pomerium/pkg/grpcutil"
)
@ -63,16 +63,15 @@ type Server struct {
metricsMgr *config.MetricsManager
reproxy *reproxy.Handler
haveSetEnvoyConfigurationEventOptions bool
envoyConfigurationEvents chan *events.EnvoyConfigurationEvent
haveSetCapacity map[string]bool
}
// NewServer creates a new Server. Listener ports are chosen by the OS.
func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, error) {
srv := &Server{
metricsMgr: metricsMgr,
reproxy: reproxy.New(),
envoyConfigurationEvents: make(chan *events.EnvoyConfigurationEvent, 10),
metricsMgr: metricsMgr,
reproxy: reproxy.New(),
haveSetCapacity: map[string]bool{},
}
srv.currentConfig.Store(versionedConfig{
Config: cfg,
@ -129,7 +128,7 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server,
return nil, err
}
srv.xdsmgr = xdsmgr.NewManager(res, srv.handleEnvoyConfigurationEvent)
srv.xdsmgr = xdsmgr.NewManager(res)
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv.GRPCServer, srv.xdsmgr)
return srv, nil
@ -139,10 +138,12 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server,
func (srv *Server) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
// handle envoy configuration events
eg.Go(func() error {
return srv.runEnvoyConfigurationEventHandler(ctx)
handle := events.Register(func(evt events.Event) {
withGRPCBackoff(ctx, func() error {
return srv.storeEvent(ctx, evt)
})
})
defer events.Unregister(handle)
// start the gRPC server
eg.Go(func() error {

View file

@ -17,9 +17,9 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/contextkeys"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/signal"
"github.com/pomerium/pomerium/pkg/grpc/events"
)
const (
@ -36,8 +36,7 @@ var onHandleDeltaRequest = func(state *streamState) {}
// A Manager manages xDS resources.
type Manager struct {
signal *signal.Signal
eventHandler func(*events.EnvoyConfigurationEvent)
signal *signal.Signal
mu sync.Mutex
nonce string
@ -49,12 +48,11 @@ type Manager struct {
}
// NewManager creates a new Manager.
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource, eventHandler func(*events.EnvoyConfigurationEvent)) *Manager {
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Manager {
nonceToConfig, _ := lru.New(maxNonceCacheSize) // the only error they return is when size is negative, which never happens
return &Manager{
signal: signal.New(),
eventHandler: eventHandler,
signal: signal.New(),
nonceToConfig: nonceToConfig,
nonce: uuid.NewString(),
@ -270,7 +268,7 @@ func (mgr *Manager) nonceToConfigVersion(nonce string) (ver uint64) {
}
func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
mgr.eventHandler(&events.EnvoyConfigurationEvent{
events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK,
Time: timestamppb.Now(),
@ -296,7 +294,7 @@ func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_
}
func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
mgr.eventHandler(&events.EnvoyConfigurationEvent{
events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK,
Time: timestamppb.Now(),
@ -317,7 +315,7 @@ func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v
}
func (mgr *Manager) changeEvent(ctx context.Context, res *envoy_service_discovery_v3.DeltaDiscoveryResponse) {
mgr.eventHandler(&events.EnvoyConfigurationEvent{
events.Dispatch(&events.EnvoyConfigurationEvent{
Instance: mgr.hostname,
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE,
Time: timestamppb.Now(),

View file

@ -14,7 +14,6 @@ import (
"google.golang.org/grpc/test/bufconn"
"github.com/pomerium/pomerium/internal/signal"
"github.com/pomerium/pomerium/pkg/grpc/events"
)
const bufSize = 1024 * 1024
@ -37,7 +36,7 @@ func TestManager(t *testing.T) {
typeURL: {
{Name: "r1", Version: "1"},
},
}, func(evt *events.EnvoyConfigurationEvent) {})
})
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, mgr)
li := bufconn.Listen(bufSize)

51
internal/events/events.go Normal file
View file

@ -0,0 +1,51 @@
// Package events contains a manager for dispatching and receiving arbitrary events.
package events
import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/pkg/grpc/events"
)
// An Event is any protobuf message that has a time and message.
type Event interface {
proto.Message
GetTime() *timestamppb.Timestamp
GetMessage() string
}
// An EventSink receives events.
type EventSink func(Event)
// An EventSinkHandle is a reference to a registered EventSink so that it can be unregistered.
type EventSinkHandle string
// Dispatch dispatches an event to any event sinks.
func Dispatch(evt Event) {
defaultManager.Dispatch(evt)
}
// Register registers a new sink to receive events.
func Register(sink EventSink) EventSinkHandle {
return defaultManager.Register(sink)
}
// Unregister unregisters a sink so it will no longer receive events.
func Unregister(sinkHandle EventSinkHandle) {
defaultManager.Unregister(sinkHandle)
}
type (
// EnvoyConfigurationEvent re-exports events.EnvoyConfigurationEvent.
EnvoyConfigurationEvent = events.EnvoyConfigurationEvent
// IDPErrorEvent re-exports events.IDPErrorEvent.
IDPErrorEvent = events.IDPErrorEvent
)
// re-exported protobuf constants
const (
EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK = events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK // nolint
EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK = events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK // nolint
EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE = events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE // nolint
)

View file

@ -0,0 +1,72 @@
package events
import (
"context"
"sync"
"github.com/google/uuid"
"github.com/pomerium/pomerium/internal/log"
)
var defaultManager = New()
// A Manager manages the dispatching of events to event sinks.
type Manager struct {
mu sync.RWMutex
sinks map[EventSinkHandle]chan Event
}
// New creates a new Manager.
func New() *Manager {
return &Manager{
sinks: make(map[EventSinkHandle]chan Event),
}
}
// Dispatch dispatches an event to any registered event sinks.
func (mgr *Manager) Dispatch(evt Event) {
mgr.mu.RLock()
sinks := make([]chan Event, 0, len(mgr.sinks))
for _, sink := range mgr.sinks {
sinks = append(sinks, sink)
}
mgr.mu.RUnlock()
for _, sink := range sinks {
select {
case sink <- evt:
default:
log.Warn(context.Background()).
Interface("event", evt).
Msg("controlplane: dropping event due to full channel")
}
}
}
// Register registers an event sink to receive events.
func (mgr *Manager) Register(sink EventSink) (handle EventSinkHandle) {
handle = EventSinkHandle(uuid.NewString())
ch := make(chan Event, 10)
mgr.mu.Lock()
mgr.sinks[handle] = ch
mgr.mu.Unlock()
go func() {
for evt := range ch {
sink(evt)
}
}()
return handle
}
// Unregister unregisters an event sink so it no longer receives events.
func (mgr *Manager) Unregister(sinkHandle EventSinkHandle) {
mgr.mu.Lock()
sink, ok := mgr.sinks[sinkHandle]
delete(mgr.sinks, sinkHandle)
mgr.mu.Unlock()
if ok {
close(sink)
}
}

View file

@ -0,0 +1,32 @@
package events
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/testing/protocmp"
)
func TestManager(t *testing.T) {
mgr := New()
received := make(chan Event, 1)
handle := mgr.Register(func(evt Event) {
received <- evt
})
assert.NotEmpty(t, handle)
expect := &IDPErrorEvent{Message: "TEST"}
mgr.Dispatch(expect)
assert.Eventually(t, func() bool {
select {
case evt := <-received:
return cmp.Equal(evt, expect, protocmp.Transform())
default:
return false
}
}, time.Second, time.Millisecond*20)
}

View file

@ -17,6 +17,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/events"
"github.com/pomerium/pomerium/internal/identity/identity"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/scheduler"
@ -216,6 +217,7 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh
defer clearTimeout()
directoryGroups, directoryUsers, err := mgr.cfg.Load().directory.UserGroups(ctx)
mgr.maybeDispatchErrorEvent(err)
metrics.RecordIdentityManagerUserGroupRefresh(ctx, err)
if err != nil {
msg := "failed to refresh directory users and groups"
@ -405,6 +407,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string
}
newToken, err := mgr.cfg.Load().authenticator.Refresh(ctx, FromOAuthToken(s.OauthToken), &s)
mgr.maybeDispatchErrorEvent(err)
metrics.RecordIdentityManagerSessionRefresh(ctx, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
@ -423,6 +426,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string
s.OauthToken = ToOAuthToken(newToken)
err = mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &s)
mgr.maybeDispatchErrorEvent(err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
Str("user_id", s.GetUserId()).
@ -474,6 +478,7 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) {
}
err := mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &u)
mgr.maybeDispatchErrorEvent(err)
metrics.RecordIdentityManagerUserRefresh(ctx, err)
if isTemporaryError(err) {
log.Error(ctx).Err(err).
@ -601,6 +606,18 @@ func (mgr *Manager) reset() {
mgr.users = userCollection{BTree: btree.New(8)}
}
// maybeDispatchErrorEvent dispatches an error event if err is not nil
func (mgr *Manager) maybeDispatchErrorEvent(err error) {
if err == nil {
return
}
events.Dispatch(&events.IDPErrorEvent{
Time: timestamppb.Now(),
Message: err.Error(),
})
}
func isTemporaryError(err error) bool {
if err == nil {
return false