controlplane: save configuration events to databroker (#2153)

* envoy: save events to databroker

* controlplane: add tests for envoy configuration events

* format imports
This commit is contained in:
Caleb Doxsey 2021-04-29 15:51:46 -06:00 committed by GitHub
parent d32b8a4d8a
commit 0adbf4f24c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 1067 additions and 618 deletions

View file

@ -1,6 +1,7 @@
package authenticate
import (
"context"
"crypto/cipher"
"encoding/base64"
"fmt"
@ -150,7 +151,7 @@ func newAuthenticateStateFromConfig(cfg *config.Config) (*authenticateState, err
return nil, err
}
dataBrokerConn, err := grpc.GetGRPCClientConn("databroker", &grpc.Options{
dataBrokerConn, err := grpc.GetGRPCClientConn(context.Background(), "databroker", &grpc.Options{
Addrs: urls,
OverrideCertificateName: cfg.Options.OverrideCertificateName,
CA: cfg.Options.CA,

View file

@ -1,6 +1,7 @@
package authorize
import (
"context"
"fmt"
"sync/atomic"
@ -55,7 +56,7 @@ func newAuthorizeStateFromConfig(cfg *config.Config, store *evaluator.Store) (*a
return nil, err
}
cc, err := grpc.GetGRPCClientConn("databroker", &grpc.Options{
cc, err := grpc.GetGRPCClientConn(context.Background(), "databroker", &grpc.Options{
Addrs: urls,
OverrideCertificateName: cfg.Options.OverrideCertificateName,
CA: cfg.Options.CA,

View file

@ -0,0 +1,112 @@
package controlplane
import (
"context"
"fmt"
"github.com/google/uuid"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/grpc"
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker"
)
const maxEnvoyConfigurationEvents = 50
func (srv *Server) handleEnvoyConfigurationEvent(evt *configpb.EnvoyConfigurationEvent) {
select {
case srv.envoyConfigurationEvents <- evt:
default:
log.Warn(context.Background()).
Interface("event", evt).
Msg("controlplane: dropping envoy configuration event due to full channel")
}
}
func (srv *Server) runEnvoyConfigurationEventHandler(ctx context.Context) error {
for {
var evt *configpb.EnvoyConfigurationEvent
select {
case <-ctx.Done():
return ctx.Err()
case evt = <-srv.envoyConfigurationEvents:
}
err := srv.storeEnvoyConfigurationEvent(ctx, evt)
if err != nil {
log.Error(ctx).Err(err).Msg("controlplane: error storing configuration event")
}
}
}
func (srv *Server) storeEnvoyConfigurationEvent(ctx context.Context, evt *configpb.EnvoyConfigurationEvent) error {
any, err := anypb.New(evt)
if err != nil {
return err
}
client, err := srv.getDataBrokerClient(ctx)
if err != nil {
return err
}
if !srv.haveSetEnvoyConfigurationEventOptions {
_, err = client.SetOptions(ctx, &databrokerpb.SetOptionsRequest{
Type: any.GetTypeUrl(),
Options: &databrokerpb.Options{
Capacity: proto.Uint64(maxEnvoyConfigurationEvents),
},
})
if err != nil {
return err
}
srv.haveSetEnvoyConfigurationEventOptions = true
}
_, err = client.Put(ctx, &databrokerpb.PutRequest{
Record: &databrokerpb.Record{
Type: any.GetTypeUrl(),
Id: uuid.NewString(),
Data: any,
},
})
if err != nil {
return err
}
return nil
}
func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBrokerServiceClient, error) {
options := srv.currentConfig.Load().Options
sharedKey, err := options.GetSharedKey()
if err != nil {
return nil, err
}
urls, err := options.GetDataBrokerURLs()
if err != nil {
return nil, err
}
cc, err := grpc.GetGRPCClientConn(ctx, "databroker", &grpc.Options{
Addrs: urls,
OverrideCertificateName: options.OverrideCertificateName,
CA: options.CA,
CAFile: options.CAFile,
RequestTimeout: options.GRPCClientTimeout,
ClientDNSRoundRobin: options.GRPCClientDNSRoundRobin,
WithInsecure: options.GetGRPCInsecure(),
InstallationID: options.InstallationID,
ServiceName: options.Services,
SignedJWTKey: sharedKey,
})
if err != nil {
return nil, fmt.Errorf("controlplane: error creating databroker connection: %w", err)
}
client := databrokerpb.NewDataBrokerServiceClient(cc)
return client, nil
}

View file

@ -0,0 +1,119 @@
package controlplane
import (
"context"
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/pkg/cryptutil"
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker"
)
type mockDataBrokerServer struct {
databrokerpb.DataBrokerServiceServer
put func(context.Context, *databrokerpb.PutRequest) (*databrokerpb.PutResponse, error)
setOptions func(context.Context, *databrokerpb.SetOptionsRequest) (*databrokerpb.SetOptionsResponse, error)
}
func (mock *mockDataBrokerServer) Put(ctx context.Context, req *databrokerpb.PutRequest) (*databrokerpb.PutResponse, error) {
return mock.put(ctx, req)
}
func (mock *mockDataBrokerServer) SetOptions(ctx context.Context, req *databrokerpb.SetOptionsRequest) (*databrokerpb.SetOptionsResponse, error) {
return mock.setOptions(ctx, req)
}
func TestEvents(t *testing.T) {
t.Run("passes events", func(t *testing.T) {
srv := &Server{envoyConfigurationEvents: make(chan *configpb.EnvoyConfigurationEvent, 1)}
srv.handleEnvoyConfigurationEvent(new(configpb.EnvoyConfigurationEvent))
evt := <-srv.envoyConfigurationEvents
assert.NotNil(t, evt)
})
t.Run("receives events", func(t *testing.T) {
ctx := context.Background()
srv := &Server{
envoyConfigurationEvents: make(chan *configpb.EnvoyConfigurationEvent, 1),
}
srv.currentConfig.Store(versionedConfig{
Config: &config.Config{
Options: &config.Options{},
},
})
ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return srv.runEnvoyConfigurationEventHandler(ctx)
})
srv.envoyConfigurationEvents <- new(configpb.EnvoyConfigurationEvent)
cancel()
assert.Equal(t, context.Canceled, eg.Wait())
})
t.Run("saves events", func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, clearTimeout := context.WithTimeout(ctx, time.Second*5)
defer clearTimeout()
li, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer li.Close()
var putRequest *databrokerpb.PutRequest
var setOptionsRequest *databrokerpb.SetOptionsRequest
grpcSrv := grpc.NewServer()
databrokerpb.RegisterDataBrokerServiceServer(grpcSrv, &mockDataBrokerServer{
put: func(ctx context.Context, req *databrokerpb.PutRequest) (*databrokerpb.PutResponse, error) {
putRequest = req
return new(databrokerpb.PutResponse), nil
},
setOptions: func(ctx context.Context, req *databrokerpb.SetOptionsRequest) (*databrokerpb.SetOptionsResponse, error) {
setOptionsRequest = req
return new(databrokerpb.SetOptionsResponse), nil
},
})
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
<-ctx.Done()
grpcSrv.Stop()
return nil
})
eg.Go(func() error {
return grpcSrv.Serve(li)
})
eg.Go(func() error {
defer cancel()
srv := &Server{}
srv.currentConfig.Store(versionedConfig{
Config: &config.Config{
Options: &config.Options{
SharedKey: cryptutil.NewBase64Key(),
DataBrokerURLString: "http://" + li.Addr().String(),
GRPCInsecure: true,
},
},
})
err := srv.storeEnvoyConfigurationEvent(ctx, new(configpb.EnvoyConfigurationEvent))
assert.NoError(t, err)
return err
})
_ = eg.Wait()
assert.Equal(t, uint64(maxEnvoyConfigurationEvents), setOptionsRequest.GetOptions().GetCapacity())
assert.Equal(t, "type.googleapis.com/pomerium.config.EnvoyConfigurationEvent", putRequest.GetRecord().GetType())
})
}

View file

@ -24,6 +24,7 @@ import (
"github.com/pomerium/pomerium/internal/telemetry"
"github.com/pomerium/pomerium/internal/telemetry/requestid"
"github.com/pomerium/pomerium/internal/version"
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
"github.com/pomerium/pomerium/pkg/grpcutil"
)
@ -58,13 +59,17 @@ type Server struct {
filemgr *filemgr.Manager
metricsMgr *config.MetricsManager
reproxy *reproxy.Handler
haveSetEnvoyConfigurationEventOptions bool
envoyConfigurationEvents chan *configpb.EnvoyConfigurationEvent
}
// NewServer creates a new Server. Listener ports are chosen by the OS.
func NewServer(name string, metricsMgr *config.MetricsManager) (*Server, error) {
srv := &Server{
metricsMgr: metricsMgr,
reproxy: reproxy.New(),
metricsMgr: metricsMgr,
reproxy: reproxy.New(),
envoyConfigurationEvents: make(chan *configpb.EnvoyConfigurationEvent, 10),
}
srv.currentConfig.Store(versionedConfig{
Config: &config.Config{Options: &config.Options{}},
@ -116,7 +121,7 @@ func NewServer(name string, metricsMgr *config.MetricsManager) (*Server, error)
return nil, err
}
srv.xdsmgr = xdsmgr.NewManager(res)
srv.xdsmgr = xdsmgr.NewManager(res, srv.handleEnvoyConfigurationEvent)
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv.GRPCServer, srv.xdsmgr)
return srv, nil
@ -126,6 +131,11 @@ func NewServer(name string, metricsMgr *config.MetricsManager) (*Server, error)
func (srv *Server) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
// handle envoy configuration events
eg.Go(func() error {
return srv.runEnvoyConfigurationEventHandler(ctx)
})
// start the gRPC server
eg.Go(func() error {
log.Info(ctx).Str("addr", srv.GRPCListener.Addr().String()).Msg("starting control-plane gRPC server")

View file

@ -12,9 +12,11 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/signal"
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
)
type streamState struct {
@ -27,7 +29,8 @@ var onHandleDeltaRequest = func(state *streamState) {}
// A Manager manages xDS resources.
type Manager struct {
signal *signal.Signal
signal *signal.Signal
eventHandler func(*configpb.EnvoyConfigurationEvent)
mu sync.Mutex
nonce string
@ -35,9 +38,11 @@ type Manager struct {
}
// NewManager creates a new Manager.
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource) *Manager {
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource, eventHandler func(*configpb.EnvoyConfigurationEvent)) *Manager {
return &Manager{
signal: signal.New(),
signal: signal.New(),
eventHandler: eventHandler,
nonce: uuid.New().String(),
resources: resources,
}
@ -120,6 +125,13 @@ func (mgr *Manager) DeltaAggregatedResources(
for _, resource := range mgr.resources[req.GetTypeUrl()] {
state.clientResourceVersions[resource.Name] = resource.Version
}
mgr.eventHandler(&configpb.EnvoyConfigurationEvent{
Time: timestamppb.Now(),
Message: req.ErrorDetail.Message,
Code: req.ErrorDetail.Code,
Details: req.ErrorDetail.Details,
})
case req.GetResponseNonce() == mgr.nonce:
// an ACK for the last response
// - set the client resource versions to the current resource versions
@ -130,6 +142,11 @@ func (mgr *Manager) DeltaAggregatedResources(
for _, resource := range mgr.resources[req.GetTypeUrl()] {
state.clientResourceVersions[resource.Name] = resource.Version
}
mgr.eventHandler(&configpb.EnvoyConfigurationEvent{
Time: timestamppb.Now(),
Message: "OK",
})
default:
// an ACK for a response that's not the last response
log.Debug(ctx).

View file

@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/test/bufconn"
"github.com/pomerium/pomerium/internal/signal"
configpb "github.com/pomerium/pomerium/pkg/grpc/config"
)
const bufSize = 1024 * 1024
@ -36,7 +37,7 @@ func TestManager(t *testing.T) {
typeURL: {
{Name: "r1", Version: "1"},
},
})
}, func(evt *configpb.EnvoyConfigurationEvent) {})
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, mgr)
li := bufconn.Listen(bufSize)

View file

@ -190,17 +190,17 @@ func (src *ConfigSource) runUpdater(cfg *config.Config) {
src.cancel = nil
}
cc, err := grpc.NewGRPCClientConn(connectionOptions)
ctx := context.Background()
ctx, src.cancel = context.WithCancel(ctx)
cc, err := grpc.NewGRPCClientConn(ctx, connectionOptions)
if err != nil {
log.Error(context.TODO()).Err(err).Msg("databroker: failed to create gRPC connection to data broker")
log.Error(ctx).Err(err).Msg("databroker: failed to create gRPC connection to data broker")
return
}
client := databroker.NewDataBrokerServiceClient(cc)
ctx := context.Background()
ctx, src.cancel = context.WithCancel(ctx)
syncer := databroker.NewSyncer("databroker", &syncerHandler{
client: client,
src: src,

View file

@ -45,7 +45,7 @@ func (r *Reporter) OnConfigChange(ctx context.Context, cfg *config.Config) {
return
}
registryConn, err := grpc.GetGRPCClientConn("databroker", &grpc.Options{
registryConn, err := grpc.GetGRPCClientConn(ctx, "databroker", &grpc.Options{
Addrs: urls,
OverrideCertificateName: cfg.Options.OverrideCertificateName,
CA: cfg.Options.CA,

View file

@ -59,8 +59,7 @@ type Options struct {
}
// NewGRPCClientConn returns a new gRPC pomerium service client connection.
func NewGRPCClientConn(opts *Options, other ...grpc.DialOption) (*grpc.ClientConn, error) {
ctx := context.TODO()
func NewGRPCClientConn(ctx context.Context, opts *Options, other ...grpc.DialOption) (*grpc.ClientConn, error) {
if len(opts.Addrs) == 0 {
return nil, errors.New("internal/grpc: connection address required")
}
@ -130,7 +129,7 @@ func NewGRPCClientConn(opts *Options, other ...grpc.DialOption) (*grpc.ClientCon
dialOptions = append(dialOptions, grpc.WithTransportCredentials(cert))
}
return grpc.Dial(connAddr, dialOptions...)
return grpc.DialContext(ctx, connAddr, dialOptions...)
}
// grpcTimeoutInterceptor enforces per-RPC request timeouts
@ -160,7 +159,7 @@ var grpcClientConns = struct {
// GetGRPCClientConn returns a gRPC client connection for the given name. If a connection for that name has already been
// established the existing connection will be returned. If any options change for that connection, the existing
// connection will be closed and a new one established.
func GetGRPCClientConn(name string, opts *Options) (*grpc.ClientConn, error) {
func GetGRPCClientConn(ctx context.Context, name string, opts *Options) (*grpc.ClientConn, error) {
grpcClientConns.Lock()
defer grpcClientConns.Unlock()
@ -176,7 +175,7 @@ func GetGRPCClientConn(name string, opts *Options) (*grpc.ClientConn, error) {
}
}
cc, err := NewGRPCClientConn(opts)
cc, err := NewGRPCClientConn(ctx, opts)
if err != nil {
return nil, err
}

View file

@ -62,7 +62,7 @@ func TestNewGRPC(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewGRPCClientConn(tt.opts)
got, err := NewGRPCClientConn(context.Background(), tt.opts)
if (err != nil) != tt.wantErr {
t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr)
if !strings.EqualFold(err.Error(), tt.wantErrStr) {
@ -77,14 +77,14 @@ func TestNewGRPC(t *testing.T) {
}
func TestGetGRPC(t *testing.T) {
cc1, err := GetGRPCClientConn("example", &Options{
cc1, err := GetGRPCClientConn(context.Background(), "example", &Options{
Addrs: mustParseURLs("https://localhost.example"),
})
if !assert.NoError(t, err) {
return
}
cc2, err := GetGRPCClientConn("example", &Options{
cc2, err := GetGRPCClientConn(context.Background(), "example", &Options{
Addrs: mustParseURLs("https://localhost.example"),
})
if !assert.NoError(t, err) {
@ -93,7 +93,7 @@ func TestGetGRPC(t *testing.T) {
assert.Same(t, cc1, cc2, "GetGRPCClientConn should return the same connection when there are no changes")
cc3, err := GetGRPCClientConn("example", &Options{
cc3, err := GetGRPCClientConn(context.Background(), "example", &Options{
Addrs: mustParseURLs("http://localhost.example"),
WithInsecure: true,
})

File diff suppressed because it is too large Load diff

View file

@ -3,8 +3,10 @@ syntax = "proto3";
package pomerium.config;
option go_package = "github.com/pomerium/pomerium/pkg/grpc/config";
import "google/protobuf/any.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
import "envoy/config/cluster/v3/cluster.proto";
import "crypt/crypt.proto";
@ -182,3 +184,16 @@ message Settings {
repeated string programmatic_redirect_domain_whitelist = 68;
optional pomerium.crypt.PublicKeyEncryptionKey audit_key = 72;
}
// EnvoyConfigurationEvents is a list of envoy configuration events.
message EnvoyConfigurationEvents {
repeated EnvoyConfigurationEvent values = 1;
}
// EnvoyConfigurationEvent is an envoy configuration event.
message EnvoyConfigurationEvent {
google.protobuf.Timestamp time = 1;
string message = 2;
int32 code = 3;
repeated google.protobuf.Any details = 4;
}