From 64ee7eca5c9a2a0aca3eb84613a9b79af03b5850 Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Fri, 28 Jan 2022 15:15:32 -0700 Subject: [PATCH] directory: save IDP errors to databroker, put event handling in dedicated package (#2957) --- internal/controlplane/events.go | 36 +--- internal/controlplane/events_test.go | 35 +-- internal/controlplane/server.go | 21 +- internal/controlplane/xdsmgr/xdsmgr.go | 16 +- internal/controlplane/xdsmgr/xdsmgr_test.go | 3 +- internal/events/events.go | 51 +++++ internal/events/manager.go | 72 +++++++ internal/events/manager_test.go | 32 +++ internal/identity/manager/manager.go | 17 ++ pkg/grpc/events/idp.pb.go | 228 ++++++++++++++++++++ pkg/grpc/events/idp.proto | 15 ++ pkg/grpc/protoc.bash | 2 +- pkg/grpc/registry/registry.pb.validate.go | 6 + 13 files changed, 451 insertions(+), 83 deletions(-) create mode 100644 internal/events/events.go create mode 100644 internal/events/manager.go create mode 100644 internal/events/manager_test.go create mode 100644 pkg/grpc/events/idp.pb.go create mode 100644 pkg/grpc/events/idp.proto diff --git a/internal/controlplane/events.go b/internal/controlplane/events.go index 387230a39..208ab9bc5 100644 --- a/internal/controlplane/events.go +++ b/internal/controlplane/events.go @@ -14,40 +14,14 @@ import ( "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/pkg/grpc" databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker" - "github.com/pomerium/pomerium/pkg/grpc/events" "github.com/pomerium/pomerium/pkg/protoutil" ) -const maxEnvoyConfigurationEvents = 50 +const maxEvents = 50 var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn) -func (srv *Server) handleEnvoyConfigurationEvent(evt *events.EnvoyConfigurationEvent) { - select { - case srv.envoyConfigurationEvents <- evt: - default: - log.Warn(context.Background()). - Interface("event", evt). - Msg("controlplane: dropping envoy configuration event due to full channel") - } -} - -func (srv *Server) runEnvoyConfigurationEventHandler(ctx context.Context) error { - for { - var evt *events.EnvoyConfigurationEvent - select { - case <-ctx.Done(): - return ctx.Err() - case evt = <-srv.envoyConfigurationEvents: - } - - withGRPCBackoff(ctx, func() error { - return srv.storeEnvoyConfigurationEvent(ctx, evt) - }) - } -} - -func (srv *Server) storeEnvoyConfigurationEvent(ctx context.Context, evt *events.EnvoyConfigurationEvent) error { +func (srv *Server) storeEvent(ctx context.Context, evt proto.Message) error { any := protoutil.NewAny(evt) client, err := srv.getDataBrokerClient(ctx) @@ -55,17 +29,17 @@ func (srv *Server) storeEnvoyConfigurationEvent(ctx context.Context, evt *events return err } - if !srv.haveSetEnvoyConfigurationEventOptions { + if !srv.haveSetCapacity[any.GetTypeUrl()] { _, err = client.SetOptions(ctx, &databrokerpb.SetOptionsRequest{ Type: any.GetTypeUrl(), Options: &databrokerpb.Options{ - Capacity: proto.Uint64(maxEnvoyConfigurationEvents), + Capacity: proto.Uint64(maxEvents), }, }) if err != nil { return err } - srv.haveSetEnvoyConfigurationEventOptions = true + srv.haveSetCapacity[any.GetTypeUrl()] = true } _, err = client.Put(ctx, &databrokerpb.PutRequest{ diff --git a/internal/controlplane/events_test.go b/internal/controlplane/events_test.go index 1ad2d11f7..8425df512 100644 --- a/internal/controlplane/events_test.go +++ b/internal/controlplane/events_test.go @@ -32,33 +32,6 @@ func (mock *mockDataBrokerServer) SetOptions(ctx context.Context, req *databroke } func TestEvents(t *testing.T) { - t.Run("passes events", func(t *testing.T) { - srv := &Server{envoyConfigurationEvents: make(chan *events.EnvoyConfigurationEvent, 1)} - srv.handleEnvoyConfigurationEvent(new(events.EnvoyConfigurationEvent)) - evt := <-srv.envoyConfigurationEvents - assert.NotNil(t, evt) - }) - t.Run("receives events", func(t *testing.T) { - ctx := context.Background() - - srv := &Server{ - envoyConfigurationEvents: make(chan *events.EnvoyConfigurationEvent, 1), - } - srv.currentConfig.Store(versionedConfig{ - Config: &config.Config{ - Options: &config.Options{}, - }, - }) - - ctx, cancel := context.WithCancel(ctx) - eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - return srv.runEnvoyConfigurationEventHandler(ctx) - }) - srv.envoyConfigurationEvents <- new(events.EnvoyConfigurationEvent) - cancel() - assert.Equal(t, context.Canceled, eg.Wait()) - }) t.Run("saves events", func(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) @@ -98,7 +71,9 @@ func TestEvents(t *testing.T) { eg.Go(func() error { defer cancel() - srv := &Server{} + srv := &Server{ + haveSetCapacity: make(map[string]bool), + } srv.currentConfig.Store(versionedConfig{ Config: &config.Config{ OutboundPort: outboundPort, @@ -109,13 +84,13 @@ func TestEvents(t *testing.T) { }, }, }) - err := srv.storeEnvoyConfigurationEvent(ctx, new(events.EnvoyConfigurationEvent)) + err := srv.storeEvent(ctx, new(events.EnvoyConfigurationEvent)) assert.NoError(t, err) return err }) _ = eg.Wait() - assert.Equal(t, uint64(maxEnvoyConfigurationEvents), setOptionsRequest.GetOptions().GetCapacity()) + assert.Equal(t, uint64(maxEvents), setOptionsRequest.GetOptions().GetCapacity()) assert.Equal(t, "type.googleapis.com/pomerium.events.EnvoyConfigurationEvent", putRequest.GetRecord().GetType()) }) } diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index 0f1d5ca22..9db363ad6 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -21,13 +21,13 @@ import ( "github.com/pomerium/pomerium/config/envoyconfig/filemgr" "github.com/pomerium/pomerium/internal/controlplane/xdsmgr" "github.com/pomerium/pomerium/internal/envoy/files" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/httputil/reproxy" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/telemetry/requestid" "github.com/pomerium/pomerium/internal/version" pom_grpc "github.com/pomerium/pomerium/pkg/grpc" - "github.com/pomerium/pomerium/pkg/grpc/events" "github.com/pomerium/pomerium/pkg/grpcutil" ) @@ -63,16 +63,15 @@ type Server struct { metricsMgr *config.MetricsManager reproxy *reproxy.Handler - haveSetEnvoyConfigurationEventOptions bool - envoyConfigurationEvents chan *events.EnvoyConfigurationEvent + haveSetCapacity map[string]bool } // NewServer creates a new Server. Listener ports are chosen by the OS. func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, error) { srv := &Server{ - metricsMgr: metricsMgr, - reproxy: reproxy.New(), - envoyConfigurationEvents: make(chan *events.EnvoyConfigurationEvent, 10), + metricsMgr: metricsMgr, + reproxy: reproxy.New(), + haveSetCapacity: map[string]bool{}, } srv.currentConfig.Store(versionedConfig{ Config: cfg, @@ -129,7 +128,7 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, return nil, err } - srv.xdsmgr = xdsmgr.NewManager(res, srv.handleEnvoyConfigurationEvent) + srv.xdsmgr = xdsmgr.NewManager(res) envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv.GRPCServer, srv.xdsmgr) return srv, nil @@ -139,10 +138,12 @@ func NewServer(cfg *config.Config, metricsMgr *config.MetricsManager) (*Server, func (srv *Server) Run(ctx context.Context) error { eg, ctx := errgroup.WithContext(ctx) - // handle envoy configuration events - eg.Go(func() error { - return srv.runEnvoyConfigurationEventHandler(ctx) + handle := events.Register(func(evt events.Event) { + withGRPCBackoff(ctx, func() error { + return srv.storeEvent(ctx, evt) + }) }) + defer events.Unregister(handle) // start the gRPC server eg.Go(func() error { diff --git a/internal/controlplane/xdsmgr/xdsmgr.go b/internal/controlplane/xdsmgr/xdsmgr.go index 21c5f09cd..81eb640fd 100644 --- a/internal/controlplane/xdsmgr/xdsmgr.go +++ b/internal/controlplane/xdsmgr/xdsmgr.go @@ -17,9 +17,9 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/contextkeys" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/signal" - "github.com/pomerium/pomerium/pkg/grpc/events" ) const ( @@ -36,8 +36,7 @@ var onHandleDeltaRequest = func(state *streamState) {} // A Manager manages xDS resources. type Manager struct { - signal *signal.Signal - eventHandler func(*events.EnvoyConfigurationEvent) + signal *signal.Signal mu sync.Mutex nonce string @@ -49,12 +48,11 @@ type Manager struct { } // NewManager creates a new Manager. -func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource, eventHandler func(*events.EnvoyConfigurationEvent)) *Manager { +func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Manager { nonceToConfig, _ := lru.New(maxNonceCacheSize) // the only error they return is when size is negative, which never happens return &Manager{ - signal: signal.New(), - eventHandler: eventHandler, + signal: signal.New(), nonceToConfig: nonceToConfig, nonce: uuid.NewString(), @@ -270,7 +268,7 @@ func (mgr *Manager) nonceToConfigVersion(nonce string) (ver uint64) { } func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) { - mgr.eventHandler(&events.EnvoyConfigurationEvent{ + events.Dispatch(&events.EnvoyConfigurationEvent{ Instance: mgr.hostname, Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK, Time: timestamppb.Now(), @@ -296,7 +294,7 @@ func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_ } func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) { - mgr.eventHandler(&events.EnvoyConfigurationEvent{ + events.Dispatch(&events.EnvoyConfigurationEvent{ Instance: mgr.hostname, Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK, Time: timestamppb.Now(), @@ -317,7 +315,7 @@ func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v } func (mgr *Manager) changeEvent(ctx context.Context, res *envoy_service_discovery_v3.DeltaDiscoveryResponse) { - mgr.eventHandler(&events.EnvoyConfigurationEvent{ + events.Dispatch(&events.EnvoyConfigurationEvent{ Instance: mgr.hostname, Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE, Time: timestamppb.Now(), diff --git a/internal/controlplane/xdsmgr/xdsmgr_test.go b/internal/controlplane/xdsmgr/xdsmgr_test.go index b674c620e..cd0fa8fe8 100644 --- a/internal/controlplane/xdsmgr/xdsmgr_test.go +++ b/internal/controlplane/xdsmgr/xdsmgr_test.go @@ -14,7 +14,6 @@ import ( "google.golang.org/grpc/test/bufconn" "github.com/pomerium/pomerium/internal/signal" - "github.com/pomerium/pomerium/pkg/grpc/events" ) const bufSize = 1024 * 1024 @@ -37,7 +36,7 @@ func TestManager(t *testing.T) { typeURL: { {Name: "r1", Version: "1"}, }, - }, func(evt *events.EnvoyConfigurationEvent) {}) + }) envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, mgr) li := bufconn.Listen(bufSize) diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 000000000..5f676226d --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,51 @@ +// Package events contains a manager for dispatching and receiving arbitrary events. +package events + +import ( + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/pomerium/pomerium/pkg/grpc/events" +) + +// An Event is any protobuf message that has a time and message. +type Event interface { + proto.Message + GetTime() *timestamppb.Timestamp + GetMessage() string +} + +// An EventSink receives events. +type EventSink func(Event) + +// An EventSinkHandle is a reference to a registered EventSink so that it can be unregistered. +type EventSinkHandle string + +// Dispatch dispatches an event to any event sinks. +func Dispatch(evt Event) { + defaultManager.Dispatch(evt) +} + +// Register registers a new sink to receive events. +func Register(sink EventSink) EventSinkHandle { + return defaultManager.Register(sink) +} + +// Unregister unregisters a sink so it will no longer receive events. +func Unregister(sinkHandle EventSinkHandle) { + defaultManager.Unregister(sinkHandle) +} + +type ( + // EnvoyConfigurationEvent re-exports events.EnvoyConfigurationEvent. + EnvoyConfigurationEvent = events.EnvoyConfigurationEvent + // IDPErrorEvent re-exports events.IDPErrorEvent. + IDPErrorEvent = events.IDPErrorEvent +) + +// re-exported protobuf constants +const ( + EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK = events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK // nolint + EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK = events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK // nolint + EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE = events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE // nolint +) diff --git a/internal/events/manager.go b/internal/events/manager.go new file mode 100644 index 000000000..f289fdb21 --- /dev/null +++ b/internal/events/manager.go @@ -0,0 +1,72 @@ +package events + +import ( + "context" + "sync" + + "github.com/google/uuid" + + "github.com/pomerium/pomerium/internal/log" +) + +var defaultManager = New() + +// A Manager manages the dispatching of events to event sinks. +type Manager struct { + mu sync.RWMutex + sinks map[EventSinkHandle]chan Event +} + +// New creates a new Manager. +func New() *Manager { + return &Manager{ + sinks: make(map[EventSinkHandle]chan Event), + } +} + +// Dispatch dispatches an event to any registered event sinks. +func (mgr *Manager) Dispatch(evt Event) { + mgr.mu.RLock() + sinks := make([]chan Event, 0, len(mgr.sinks)) + for _, sink := range mgr.sinks { + sinks = append(sinks, sink) + } + mgr.mu.RUnlock() + + for _, sink := range sinks { + select { + case sink <- evt: + default: + log.Warn(context.Background()). + Interface("event", evt). + Msg("controlplane: dropping event due to full channel") + } + } +} + +// Register registers an event sink to receive events. +func (mgr *Manager) Register(sink EventSink) (handle EventSinkHandle) { + handle = EventSinkHandle(uuid.NewString()) + ch := make(chan Event, 10) + mgr.mu.Lock() + mgr.sinks[handle] = ch + mgr.mu.Unlock() + go func() { + for evt := range ch { + sink(evt) + } + }() + return handle +} + +// Unregister unregisters an event sink so it no longer receives events. +func (mgr *Manager) Unregister(sinkHandle EventSinkHandle) { + mgr.mu.Lock() + sink, ok := mgr.sinks[sinkHandle] + delete(mgr.sinks, sinkHandle) + mgr.mu.Unlock() + + if ok { + close(sink) + } +} diff --git a/internal/events/manager_test.go b/internal/events/manager_test.go new file mode 100644 index 000000000..6a76028a0 --- /dev/null +++ b/internal/events/manager_test.go @@ -0,0 +1,32 @@ +package events + +import ( + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/testing/protocmp" +) + +func TestManager(t *testing.T) { + mgr := New() + + received := make(chan Event, 1) + handle := mgr.Register(func(evt Event) { + received <- evt + }) + assert.NotEmpty(t, handle) + + expect := &IDPErrorEvent{Message: "TEST"} + mgr.Dispatch(expect) + + assert.Eventually(t, func() bool { + select { + case evt := <-received: + return cmp.Equal(evt, expect, protocmp.Transform()) + default: + return false + } + }, time.Second, time.Millisecond*20) +} diff --git a/internal/identity/manager/manager.go b/internal/identity/manager/manager.go index dea8a91a1..57845ff31 100644 --- a/internal/identity/manager/manager.go +++ b/internal/identity/manager/manager.go @@ -17,6 +17,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/pomerium/pomerium/internal/directory" + "github.com/pomerium/pomerium/internal/events" "github.com/pomerium/pomerium/internal/identity/identity" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/scheduler" @@ -216,6 +217,7 @@ func (mgr *Manager) refreshDirectoryUserGroups(ctx context.Context) (nextRefresh defer clearTimeout() directoryGroups, directoryUsers, err := mgr.cfg.Load().directory.UserGroups(ctx) + mgr.maybeDispatchErrorEvent(err) metrics.RecordIdentityManagerUserGroupRefresh(ctx, err) if err != nil { msg := "failed to refresh directory users and groups" @@ -405,6 +407,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string } newToken, err := mgr.cfg.Load().authenticator.Refresh(ctx, FromOAuthToken(s.OauthToken), &s) + mgr.maybeDispatchErrorEvent(err) metrics.RecordIdentityManagerSessionRefresh(ctx, err) if isTemporaryError(err) { log.Error(ctx).Err(err). @@ -423,6 +426,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, userID, sessionID string s.OauthToken = ToOAuthToken(newToken) err = mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &s) + mgr.maybeDispatchErrorEvent(err) if isTemporaryError(err) { log.Error(ctx).Err(err). Str("user_id", s.GetUserId()). @@ -474,6 +478,7 @@ func (mgr *Manager) refreshUser(ctx context.Context, userID string) { } err := mgr.cfg.Load().authenticator.UpdateUserInfo(ctx, FromOAuthToken(s.OauthToken), &u) + mgr.maybeDispatchErrorEvent(err) metrics.RecordIdentityManagerUserRefresh(ctx, err) if isTemporaryError(err) { log.Error(ctx).Err(err). @@ -601,6 +606,18 @@ func (mgr *Manager) reset() { mgr.users = userCollection{BTree: btree.New(8)} } +// maybeDispatchErrorEvent dispatches an error event if err is not nil +func (mgr *Manager) maybeDispatchErrorEvent(err error) { + if err == nil { + return + } + + events.Dispatch(&events.IDPErrorEvent{ + Time: timestamppb.Now(), + Message: err.Error(), + }) +} + func isTemporaryError(err error) bool { if err == nil { return false diff --git a/pkg/grpc/events/idp.pb.go b/pkg/grpc/events/idp.pb.go new file mode 100644 index 000000000..b78f52d51 --- /dev/null +++ b/pkg/grpc/events/idp.pb.go @@ -0,0 +1,228 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.14.0 +// source: idp.proto + +package events + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// IDPErrorEvents is a list of IDP error events. +type IDPErrorEvents struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Values []*IDPErrorEvent `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` +} + +func (x *IDPErrorEvents) Reset() { + *x = IDPErrorEvents{} + if protoimpl.UnsafeEnabled { + mi := &file_idp_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IDPErrorEvents) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IDPErrorEvents) ProtoMessage() {} + +func (x *IDPErrorEvents) ProtoReflect() protoreflect.Message { + mi := &file_idp_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IDPErrorEvents.ProtoReflect.Descriptor instead. +func (*IDPErrorEvents) Descriptor() ([]byte, []int) { + return file_idp_proto_rawDescGZIP(), []int{0} +} + +func (x *IDPErrorEvents) GetValues() []*IDPErrorEvent { + if x != nil { + return x.Values + } + return nil +} + +// IDPErrorEvent is an IDP error event. +type IDPErrorEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Time *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=time,proto3" json:"time,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *IDPErrorEvent) Reset() { + *x = IDPErrorEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_idp_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IDPErrorEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IDPErrorEvent) ProtoMessage() {} + +func (x *IDPErrorEvent) ProtoReflect() protoreflect.Message { + mi := &file_idp_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IDPErrorEvent.ProtoReflect.Descriptor instead. +func (*IDPErrorEvent) Descriptor() ([]byte, []int) { + return file_idp_proto_rawDescGZIP(), []int{1} +} + +func (x *IDPErrorEvent) GetTime() *timestamppb.Timestamp { + if x != nil { + return x.Time + } + return nil +} + +func (x *IDPErrorEvent) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_idp_proto protoreflect.FileDescriptor + +var file_idp_proto_rawDesc = []byte{ + 0x0a, 0x09, 0x69, 0x64, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x70, 0x6f, 0x6d, + 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x1a, 0x1f, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x48, 0x0a, + 0x0e, 0x49, 0x44, 0x50, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, + 0x36, 0x0a, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x1e, 0x2e, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2e, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x73, 0x2e, 0x49, 0x44, 0x50, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, + 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x22, 0x59, 0x0a, 0x0d, 0x49, 0x44, 0x50, 0x45, 0x72, + 0x72, 0x6f, 0x72, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, + 0x75, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_idp_proto_rawDescOnce sync.Once + file_idp_proto_rawDescData = file_idp_proto_rawDesc +) + +func file_idp_proto_rawDescGZIP() []byte { + file_idp_proto_rawDescOnce.Do(func() { + file_idp_proto_rawDescData = protoimpl.X.CompressGZIP(file_idp_proto_rawDescData) + }) + return file_idp_proto_rawDescData +} + +var file_idp_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_idp_proto_goTypes = []interface{}{ + (*IDPErrorEvents)(nil), // 0: pomerium.events.IDPErrorEvents + (*IDPErrorEvent)(nil), // 1: pomerium.events.IDPErrorEvent + (*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp +} +var file_idp_proto_depIdxs = []int32{ + 1, // 0: pomerium.events.IDPErrorEvents.values:type_name -> pomerium.events.IDPErrorEvent + 2, // 1: pomerium.events.IDPErrorEvent.time:type_name -> google.protobuf.Timestamp + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_idp_proto_init() } +func file_idp_proto_init() { + if File_idp_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_idp_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IDPErrorEvents); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_idp_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*IDPErrorEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_idp_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_idp_proto_goTypes, + DependencyIndexes: file_idp_proto_depIdxs, + MessageInfos: file_idp_proto_msgTypes, + }.Build() + File_idp_proto = out.File + file_idp_proto_rawDesc = nil + file_idp_proto_goTypes = nil + file_idp_proto_depIdxs = nil +} diff --git a/pkg/grpc/events/idp.proto b/pkg/grpc/events/idp.proto new file mode 100644 index 000000000..92abe073b --- /dev/null +++ b/pkg/grpc/events/idp.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package pomerium.events; +option go_package = "github.com/pomerium/pomerium/pkg/grpc/events"; + +import "google/protobuf/timestamp.proto"; + +// IDPErrorEvents is a list of IDP error events. +message IDPErrorEvents { repeated IDPErrorEvent values = 1; } + +// IDPErrorEvent is an IDP error event. +message IDPErrorEvent { + google.protobuf.Timestamp time = 1; + string message = 2; +} diff --git a/pkg/grpc/protoc.bash b/pkg/grpc/protoc.bash index e42ae0e4a..54968c15e 100755 --- a/pkg/grpc/protoc.bash +++ b/pkg/grpc/protoc.bash @@ -111,7 +111,7 @@ _import_paths=$(join_by , "${_imports[@]}") ../../scripts/protoc -I ./events/ -I ./ \ --go_out="$_import_paths,plugins=grpc,paths=source_relative:./events/." \ - ./events/xds.proto + ./events/xds.proto ./events/idp.proto ../../scripts/protoc -I ./cli/ -I ./ \ --go_out="$_import_paths,plugins=grpc,paths=source_relative:./cli/." \ diff --git a/pkg/grpc/registry/registry.pb.validate.go b/pkg/grpc/registry/registry.pb.validate.go index cf2aaa767..aa22f5515 100644 --- a/pkg/grpc/registry/registry.pb.validate.go +++ b/pkg/grpc/registry/registry.pb.validate.go @@ -91,6 +91,7 @@ func (m *Service) validate(all bool) error { if len(errors) > 0 { return ServiceMultiError(errors) } + return nil } @@ -284,6 +285,7 @@ func (m *RegisterRequest) validate(all bool) error { if len(errors) > 0 { return RegisterRequestMultiError(errors) } + return nil } @@ -412,6 +414,7 @@ func (m *RegisterResponse) validate(all bool) error { if len(errors) > 0 { return RegisterResponseMultiError(errors) } + return nil } @@ -511,6 +514,7 @@ func (m *ListRequest) validate(all bool) error { if len(errors) > 0 { return ListRequestMultiError(errors) } + return nil } @@ -667,6 +671,7 @@ func (m *ServiceRegistration) validate(all bool) error { if len(errors) > 0 { return ServiceRegistrationMultiError(errors) } + return nil } @@ -802,6 +807,7 @@ func (m *ServiceList) validate(all bool) error { if len(errors) > 0 { return ServiceListMultiError(errors) } + return nil }