package controlplane import ( "context" "fmt" "time" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/pkg/grpc" databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/protoutil" ) const maxEvents = 50 var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn) func (srv *Server) storeEvent(ctx context.Context, evt proto.Message) error { any := protoutil.NewAny(evt) client, err := srv.getDataBrokerClient(ctx) if err != nil { return err } if !srv.haveSetCapacity[any.GetTypeUrl()] { _, err = client.SetOptions(ctx, &databrokerpb.SetOptionsRequest{ Type: any.GetTypeUrl(), Options: &databrokerpb.Options{ Capacity: proto.Uint64(maxEvents), }, }) if err != nil { return err } srv.haveSetCapacity[any.GetTypeUrl()] = true } var id string if withID, ok := evt.(interface{ GetId() string }); ok { id = withID.GetId() } else { id = uuid.NewString() } _, err = client.Put(ctx, &databrokerpb.PutRequest{ Records: []*databrokerpb.Record{{ Type: any.GetTypeUrl(), Id: id, Data: any, }}, }) if err != nil { return err } return nil } func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBrokerServiceClient, error) { cfg := srv.currentConfig.Load() sharedKey, err := cfg.Options.GetSharedKey() if err != nil { return nil, err } cc, err := outboundGRPCConnection.Get(context.Background(), &grpc.OutboundOptions{ OutboundPort: cfg.OutboundPort, InstallationID: cfg.Options.InstallationID, ServiceName: cfg.Options.Services, SignedJWTKey: sharedKey, }) if err != nil { return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err) } _ = grpc.WaitForReady(ctx, cc, time.Second*10) client := databrokerpb.NewDataBrokerServiceClient(cc) return client, nil } // withGRPCBackoff runs f. If an unavailable or resource exhausted error occurs, the request will be retried. // All other errors return immediately. func withGRPCBackoff(ctx context.Context, f func() error) { bo := backoff.NewExponentialBackOff() bo.MaxElapsedTime = 0 for { err := f() switch { case err == nil: return case status.Code(err) == codes.Unavailable, status.Code(err) == codes.ResourceExhausted, status.Code(err) == codes.DeadlineExceeded: log.Error(ctx).Err(err).Msg("controlplane: error storing configuration event, retrying") // retry default: log.Error(ctx).Err(err).Msg("controlplane: error storing configuration event") return } select { case <-ctx.Done(): return case <-time.After(bo.NextBackOff()): } } }