mirror of
https://github.com/pomerium/pomerium.git
synced 2025-05-04 04:46:01 +02:00
357 lines
10 KiB
Go
357 lines
10 KiB
Go
// Package xdsmgr implements a resource discovery manager for envoy.
|
|
package xdsmgr
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"os"
|
|
"sync"
|
|
|
|
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
|
"github.com/google/uuid"
|
|
lru "github.com/hashicorp/golang-lru"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
|
|
"github.com/pomerium/pomerium/internal/contextkeys"
|
|
"github.com/pomerium/pomerium/internal/log"
|
|
"github.com/pomerium/pomerium/internal/signal"
|
|
"github.com/pomerium/pomerium/pkg/grpc/events"
|
|
)
|
|
|
|
const (
|
|
maxNonceCacheSize = 1 << 12
|
|
)
|
|
|
|
type streamState struct {
|
|
typeURL string
|
|
clientResourceVersions map[string]string
|
|
unsubscribedResources map[string]struct{}
|
|
}
|
|
|
|
var onHandleDeltaRequest = func(state *streamState) {}
|
|
|
|
// A Manager manages xDS resources.
|
|
type Manager struct {
|
|
signal *signal.Signal
|
|
eventHandler func(*events.EnvoyConfigurationEvent)
|
|
|
|
mu sync.Mutex
|
|
nonce string
|
|
resources map[string][]*envoy_service_discovery_v3.Resource
|
|
|
|
nonceToConfig *lru.Cache
|
|
|
|
hostname string
|
|
}
|
|
|
|
// NewManager creates a new Manager.
|
|
func NewManager(resources map[string][]*envoy_service_discovery_v3.Resource, eventHandler func(*events.EnvoyConfigurationEvent)) *Manager {
|
|
nonceToConfig, _ := lru.New(maxNonceCacheSize) // the only error they return is when size is negative, which never happens
|
|
|
|
return &Manager{
|
|
signal: signal.New(),
|
|
eventHandler: eventHandler,
|
|
|
|
nonceToConfig: nonceToConfig,
|
|
nonce: uuid.NewString(),
|
|
resources: resources,
|
|
|
|
hostname: getHostname(),
|
|
}
|
|
}
|
|
|
|
// DeltaAggregatedResources implements the increment xDS server.
|
|
func (mgr *Manager) DeltaAggregatedResources(
|
|
stream envoy_service_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer,
|
|
) error {
|
|
ch := mgr.signal.Bind()
|
|
defer mgr.signal.Unbind(ch)
|
|
|
|
stateByTypeURL := map[string]*streamState{}
|
|
|
|
getDeltaResponse := func(ctx context.Context, typeURL string) *envoy_service_discovery_v3.DeltaDiscoveryResponse {
|
|
mgr.mu.Lock()
|
|
defer mgr.mu.Unlock()
|
|
|
|
state, ok := stateByTypeURL[typeURL]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
res := &envoy_service_discovery_v3.DeltaDiscoveryResponse{
|
|
TypeUrl: typeURL,
|
|
Nonce: mgr.nonce,
|
|
}
|
|
seen := map[string]struct{}{}
|
|
for _, resource := range mgr.resources[typeURL] {
|
|
seen[resource.Name] = struct{}{}
|
|
if resource.Version != state.clientResourceVersions[resource.Name] {
|
|
res.Resources = append(res.Resources, resource)
|
|
}
|
|
}
|
|
for name := range state.clientResourceVersions {
|
|
_, ok := seen[name]
|
|
if !ok {
|
|
res.RemovedResources = append(res.RemovedResources, name)
|
|
}
|
|
}
|
|
|
|
if len(res.Resources) == 0 && len(res.RemovedResources) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
handleDeltaRequest := func(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
|
|
mgr.mu.Lock()
|
|
defer mgr.mu.Unlock()
|
|
|
|
state, ok := stateByTypeURL[req.GetTypeUrl()]
|
|
if !ok {
|
|
// first time we've seen a message for this type URL.
|
|
state = &streamState{
|
|
typeURL: req.GetTypeUrl(),
|
|
clientResourceVersions: req.GetInitialResourceVersions(),
|
|
unsubscribedResources: make(map[string]struct{}),
|
|
}
|
|
if state.clientResourceVersions == nil {
|
|
state.clientResourceVersions = make(map[string]string)
|
|
}
|
|
stateByTypeURL[req.GetTypeUrl()] = state
|
|
}
|
|
|
|
switch {
|
|
case req.GetResponseNonce() == "":
|
|
// neither an ACK or a NACK
|
|
case req.GetErrorDetail() != nil:
|
|
// a NACK
|
|
// - set the client resource versions to the current resource versions
|
|
state.clientResourceVersions = make(map[string]string)
|
|
for _, resource := range mgr.resources[req.GetTypeUrl()] {
|
|
state.clientResourceVersions[resource.Name] = resource.Version
|
|
}
|
|
|
|
mgr.nackEvent(ctx, req)
|
|
case req.GetResponseNonce() == mgr.nonce:
|
|
// an ACK for the last response
|
|
// - set the client resource versions to the current resource versions
|
|
state.clientResourceVersions = make(map[string]string)
|
|
for _, resource := range mgr.resources[req.GetTypeUrl()] {
|
|
state.clientResourceVersions[resource.Name] = resource.Version
|
|
}
|
|
|
|
mgr.ackEvent(ctx, req)
|
|
default:
|
|
// an ACK for a response that's not the last response
|
|
mgr.ackEvent(ctx, req)
|
|
}
|
|
|
|
// update subscriptions
|
|
for _, name := range req.GetResourceNamesSubscribe() {
|
|
delete(state.unsubscribedResources, name)
|
|
}
|
|
for _, name := range req.GetResourceNamesUnsubscribe() {
|
|
state.unsubscribedResources[name] = struct{}{}
|
|
// from the docs:
|
|
// NOTE: the server must respond with all resources listed in
|
|
// resource_names_subscribe, even if it believes the client has
|
|
// the most recent version of them. The reason: the client may
|
|
// have dropped them, but then regained interest before it had
|
|
// a chance to send the unsubscribe message.
|
|
// so we reset the version to treat it like a new version
|
|
delete(state.clientResourceVersions, name)
|
|
}
|
|
|
|
onHandleDeltaRequest(state)
|
|
}
|
|
|
|
incoming := make(chan *envoy_service_discovery_v3.DeltaDiscoveryRequest)
|
|
outgoing := make(chan *envoy_service_discovery_v3.DeltaDiscoveryResponse)
|
|
eg, ctx := errgroup.WithContext(stream.Context())
|
|
// 1. receive all incoming messages
|
|
eg.Go(func() error {
|
|
for {
|
|
req, err := stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case incoming <- req:
|
|
}
|
|
}
|
|
})
|
|
// 2. handle incoming requests or resource changes
|
|
eg.Go(func() error {
|
|
changeCtx := ctx
|
|
for {
|
|
var typeURLs []string
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case req := <-incoming:
|
|
handleDeltaRequest(changeCtx, req)
|
|
typeURLs = []string{req.GetTypeUrl()}
|
|
case changeCtx = <-ch:
|
|
mgr.mu.Lock()
|
|
for typeURL := range mgr.resources {
|
|
typeURLs = append(typeURLs, typeURL)
|
|
}
|
|
mgr.mu.Unlock()
|
|
}
|
|
|
|
for _, typeURL := range typeURLs {
|
|
res := getDeltaResponse(changeCtx, typeURL)
|
|
if res == nil {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case outgoing <- res:
|
|
mgr.changeEvent(ctx, res)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
// 3. send all outgoing messages
|
|
eg.Go(func() error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case res := <-outgoing:
|
|
err := stream.Send(res)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
})
|
|
return eg.Wait()
|
|
}
|
|
|
|
// StreamAggregatedResources is not implemented.
|
|
func (mgr *Manager) StreamAggregatedResources(
|
|
stream envoy_service_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer,
|
|
) error {
|
|
return status.Errorf(codes.Unimplemented, "method StreamAggregatedResources not implemented")
|
|
}
|
|
|
|
// Update updates the state of resources. If any changes are made they will be pushed to any listening
|
|
// streams. For each TypeURL the list of resources should be the complete list of resources.
|
|
func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_service_discovery_v3.Resource) {
|
|
nonce := uuid.New().String()
|
|
|
|
mgr.mu.Lock()
|
|
mgr.nonce = nonce
|
|
mgr.resources = resources
|
|
mgr.nonceToConfig.Add(nonce, ctx.Value(contextkeys.UpdateRecordsVersion))
|
|
mgr.mu.Unlock()
|
|
|
|
mgr.signal.Broadcast(ctx)
|
|
}
|
|
|
|
func (mgr *Manager) nonceToConfigVersion(nonce string) (ver uint64) {
|
|
val, ok := mgr.nonceToConfig.Get(nonce)
|
|
if !ok {
|
|
return 0
|
|
}
|
|
ver, _ = val.(uint64)
|
|
return ver
|
|
}
|
|
|
|
func (mgr *Manager) nackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
|
|
mgr.eventHandler(&events.EnvoyConfigurationEvent{
|
|
Instance: mgr.hostname,
|
|
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_NACK,
|
|
Time: timestamppb.Now(),
|
|
Message: req.ErrorDetail.Message,
|
|
Code: req.ErrorDetail.Code,
|
|
Details: req.ErrorDetail.Details,
|
|
ResourceSubscribed: req.ResourceNamesSubscribe,
|
|
ResourceUnsubscribed: req.ResourceNamesUnsubscribe,
|
|
ConfigVersion: mgr.nonceToConfigVersion(req.ResponseNonce),
|
|
TypeUrl: req.TypeUrl,
|
|
Nonce: req.ResponseNonce,
|
|
})
|
|
|
|
bs, _ := json.Marshal(req.ErrorDetail.Details)
|
|
log.Error(ctx).
|
|
Err(errors.New(req.ErrorDetail.Message)).
|
|
Str("resource_type", req.TypeUrl).
|
|
Strs("resources_unsubscribe", req.ResourceNamesUnsubscribe).
|
|
Strs("resources_subscribe", req.ResourceNamesSubscribe).
|
|
Uint64("nonce_version", mgr.nonceToConfigVersion(req.ResponseNonce)).
|
|
Int32("code", req.ErrorDetail.Code).
|
|
RawJSON("details", bs).Msg("error applying configuration")
|
|
}
|
|
|
|
func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v3.DeltaDiscoveryRequest) {
|
|
mgr.eventHandler(&events.EnvoyConfigurationEvent{
|
|
Instance: mgr.hostname,
|
|
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_REQUEST_ACK,
|
|
Time: timestamppb.Now(),
|
|
ConfigVersion: mgr.nonceToConfigVersion(req.ResponseNonce),
|
|
ResourceSubscribed: req.ResourceNamesSubscribe,
|
|
ResourceUnsubscribed: req.ResourceNamesUnsubscribe,
|
|
TypeUrl: req.TypeUrl,
|
|
Nonce: req.ResponseNonce,
|
|
Message: "ok",
|
|
})
|
|
|
|
log.Debug(ctx).
|
|
Str("resource_type", req.TypeUrl).
|
|
Strs("resources_unsubscribe", req.ResourceNamesUnsubscribe).
|
|
Strs("resources_subscribe", req.ResourceNamesSubscribe).
|
|
Uint64("nonce_version", mgr.nonceToConfigVersion(req.ResponseNonce)).
|
|
Msg("ACK")
|
|
}
|
|
|
|
func (mgr *Manager) changeEvent(ctx context.Context, res *envoy_service_discovery_v3.DeltaDiscoveryResponse) {
|
|
mgr.eventHandler(&events.EnvoyConfigurationEvent{
|
|
Instance: mgr.hostname,
|
|
Kind: events.EnvoyConfigurationEvent_EVENT_DISCOVERY_RESPONSE,
|
|
Time: timestamppb.Now(),
|
|
Nonce: res.Nonce,
|
|
Message: "change",
|
|
ConfigVersion: mgr.nonceToConfigVersion(res.Nonce),
|
|
TypeUrl: res.TypeUrl,
|
|
ResourceSubscribed: resourceNames(res.Resources),
|
|
ResourceUnsubscribed: res.RemovedResources,
|
|
})
|
|
log.Debug(ctx).
|
|
Uint64("ctx_config_version", mgr.nonceToConfigVersion(res.Nonce)).
|
|
Str("nonce", res.Nonce).
|
|
Str("type", res.TypeUrl).
|
|
Strs("subscribe", resourceNames(res.Resources)).
|
|
Strs("removed", res.RemovedResources).
|
|
Msg("sent update")
|
|
}
|
|
|
|
func resourceNames(res []*envoy_service_discovery_v3.Resource) []string {
|
|
txt := make([]string, 0, len(res))
|
|
for _, r := range res {
|
|
txt = append(txt, r.Name)
|
|
}
|
|
return txt
|
|
}
|
|
|
|
func getHostname() string {
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = os.Getenv("HOSTNAME")
|
|
}
|
|
if hostname == "" {
|
|
hostname = "__unknown__"
|
|
}
|
|
return hostname
|
|
}
|