mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-04 01:09:36 +02:00
config: use stable route ids for authorize matching and order xds responses (#5618)
## Summary Update the `RouteID` to use the `policy.ID` if it is set. This makes it so that updated routes use a stable identifier between updates so if the envoy control plane is updated before the authorize service's internal definitions (or vice-versa) the authorize service will still be able to match the route. The current behavior results in a 404 if envoy passes the old route id. The new behavior will result in inconsistency, but it should be quickly remedied. To help with debugging 4 new fields were added to the authorize check log. The `route-id` and `route-checksum` as the authorize sees it and the `envoy-route-id` and `envoy-route-checksum` as envoy sees it. I also updated the way we send updates to envoy to try and model their recommended approach: > In general, to avoid traffic drop, sequencing of updates should follow a make before break model, wherein: > > - CDS updates (if any) must always be pushed first. > - EDS updates (if any) must arrive after CDS updates for the respective clusters. > - LDS updates must arrive after corresponding CDS/EDS updates. > - RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates. > - VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates. > - Stale CDS clusters and related EDS endpoints (ones no longer being referenced) can then be removed. This should help avoid 404s when configuration is being updated. ## Related issues - [ENG-2386](https://linear.app/pomerium/issue/ENG-2386/large-number-of-routes-leads-to-404s-and-slowness) ## Checklist - [x] reference any related issues - [x] updated unit tests - [x] add appropriate label (`enhancement`, `bug`, `breaking`, `dependencies`, `ci`) - [x] ready for review
This commit is contained in:
parent
2f179658b6
commit
7a6d7c5a3c
17 changed files with 278 additions and 85 deletions
|
@ -2,7 +2,9 @@
|
|||
package xdsmgr
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
|
@ -10,6 +12,7 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/internal/signal"
|
||||
|
@ -186,12 +189,17 @@ func (mgr *Manager) DeltaAggregatedResources(
|
|||
mgr.mu.Unlock()
|
||||
}
|
||||
|
||||
var responses []*envoy_service_discovery_v3.DeltaDiscoveryResponse
|
||||
for _, typeURL := range typeURLs {
|
||||
res := getDeltaResponse(changeCtx, typeURL)
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
responses = append(responses, res)
|
||||
}
|
||||
|
||||
responses = buildDiscoveryResponsesForConsistentUpdates(responses)
|
||||
for _, res := range responses {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
|
@ -242,3 +250,51 @@ func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_se
|
|||
|
||||
mgr.signal.Broadcast(ctx)
|
||||
}
|
||||
|
||||
func buildDiscoveryResponsesForConsistentUpdates(in []*envoy_service_discovery_v3.DeltaDiscoveryResponse) (out []*envoy_service_discovery_v3.DeltaDiscoveryResponse) {
|
||||
var updates, removals []*envoy_service_discovery_v3.DeltaDiscoveryResponse
|
||||
for _, r := range in {
|
||||
if len(r.Resources) > 0 {
|
||||
rr := proto.Clone(r).(*envoy_service_discovery_v3.DeltaDiscoveryResponse)
|
||||
rr.RemovedResources = nil
|
||||
updates = append(updates, rr)
|
||||
}
|
||||
if len(r.RemovedResources) > 0 {
|
||||
rr := proto.Clone(r).(*envoy_service_discovery_v3.DeltaDiscoveryResponse)
|
||||
rr.Resources = nil
|
||||
removals = append(removals, rr)
|
||||
}
|
||||
}
|
||||
|
||||
// from the docs:
|
||||
//
|
||||
// In general, to avoid traffic drop, sequencing of updates should follow a make before break model, wherein:
|
||||
//
|
||||
// CDS updates (if any) must always be pushed first.
|
||||
// EDS updates (if any) must arrive after CDS updates for the respective clusters.
|
||||
// LDS updates must arrive after corresponding CDS/EDS updates.
|
||||
// RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates.
|
||||
// VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates.
|
||||
// Stale CDS clusters and related EDS endpoints (ones no longer being referenced) can then be removed.
|
||||
|
||||
updateOrder := map[string]int{
|
||||
clusterTypeURL: 1,
|
||||
listenerTypeURL: 2,
|
||||
routeConfigurationTypeURL: 3,
|
||||
}
|
||||
slices.SortFunc(updates, func(a, b *envoy_service_discovery_v3.DeltaDiscoveryResponse) int {
|
||||
return cmp.Compare(updateOrder[a.TypeUrl], updateOrder[b.TypeUrl])
|
||||
})
|
||||
|
||||
removeOrder := map[string]int{
|
||||
routeConfigurationTypeURL: 1,
|
||||
listenerTypeURL: 2,
|
||||
clusterTypeURL: 3,
|
||||
}
|
||||
slices.SortFunc(removals, func(a, b *envoy_service_discovery_v3.DeltaDiscoveryResponse) int {
|
||||
return cmp.Compare(removeOrder[a.TypeUrl], removeOrder[b.TypeUrl])
|
||||
})
|
||||
|
||||
out = append(updates, removals...)
|
||||
return out
|
||||
}
|
||||
|
|
|
@ -6,6 +6,9 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
||||
envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
|
||||
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -14,6 +17,8 @@ import (
|
|||
"google.golang.org/grpc/test/bufconn"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/signal"
|
||||
"github.com/pomerium/pomerium/internal/testutil"
|
||||
"github.com/pomerium/pomerium/pkg/protoutil"
|
||||
)
|
||||
|
||||
const bufSize = 1024 * 1024
|
||||
|
@ -117,3 +122,55 @@ func TestManager(t *testing.T) {
|
|||
}, time.Second*5, time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
||||
func TestBuildDiscoveryResponsesForConsistentUpdates(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
rc1 := protoutil.NewAny(&envoy_config_route_v3.RouteConfiguration{})
|
||||
l1 := protoutil.NewAny(&envoy_config_listener_v3.Listener{})
|
||||
c1 := protoutil.NewAny(&envoy_config_cluster_v3.Cluster{})
|
||||
|
||||
responses := buildDiscoveryResponsesForConsistentUpdates([]*envoy_service_discovery_v3.DeltaDiscoveryResponse{
|
||||
{
|
||||
TypeUrl: routeConfigurationTypeURL,
|
||||
Resources: []*envoy_service_discovery_v3.Resource{{Name: "rc1", Resource: rc1}},
|
||||
RemovedResources: []string{"rc2"},
|
||||
},
|
||||
{
|
||||
TypeUrl: listenerTypeURL,
|
||||
Resources: []*envoy_service_discovery_v3.Resource{{Name: "l1", Resource: l1}},
|
||||
RemovedResources: []string{"l2"},
|
||||
},
|
||||
{
|
||||
TypeUrl: clusterTypeURL,
|
||||
Resources: []*envoy_service_discovery_v3.Resource{{Name: "c1", Resource: c1}},
|
||||
RemovedResources: []string{"c2"},
|
||||
},
|
||||
})
|
||||
testutil.AssertProtoEqual(t, []*envoy_service_discovery_v3.DeltaDiscoveryResponse{
|
||||
{
|
||||
TypeUrl: clusterTypeURL,
|
||||
Resources: []*envoy_service_discovery_v3.Resource{{Name: "c1", Resource: c1}},
|
||||
},
|
||||
{
|
||||
TypeUrl: listenerTypeURL,
|
||||
Resources: []*envoy_service_discovery_v3.Resource{{Name: "l1", Resource: l1}},
|
||||
},
|
||||
{
|
||||
TypeUrl: routeConfigurationTypeURL,
|
||||
Resources: []*envoy_service_discovery_v3.Resource{{Name: "rc1", Resource: rc1}},
|
||||
},
|
||||
{
|
||||
TypeUrl: routeConfigurationTypeURL,
|
||||
RemovedResources: []string{"rc2"},
|
||||
},
|
||||
{
|
||||
TypeUrl: listenerTypeURL,
|
||||
RemovedResources: []string{"l2"},
|
||||
},
|
||||
{
|
||||
TypeUrl: clusterTypeURL,
|
||||
RemovedResources: []string{"c2"},
|
||||
},
|
||||
}, responses)
|
||||
}
|
||||
|
|
|
@ -204,7 +204,7 @@ func (src *ConfigSource) buildPolicyFromProto(_ context.Context, routepb *config
|
|||
}
|
||||
|
||||
func (src *ConfigSource) addPolicies(ctx context.Context, cfg *config.Config, policies []*config.Policy) {
|
||||
seen := make(map[uint64]struct{}, len(policies)+cfg.Options.NumPolicies())
|
||||
seen := make(map[string]struct{}, len(policies)+cfg.Options.NumPolicies())
|
||||
for policy := range cfg.Options.GetAllPolicies() {
|
||||
id, err := policy.RouteID()
|
||||
if err != nil {
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"net/http"
|
||||
stdhttputil "net/http/httputil"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
|
@ -29,46 +28,40 @@ type Handler struct {
|
|||
mu sync.RWMutex
|
||||
key []byte
|
||||
options *config.Options
|
||||
policies map[uint64]*config.Policy
|
||||
policies map[string]*config.Policy
|
||||
}
|
||||
|
||||
// New creates a new Handler.
|
||||
func New() *Handler {
|
||||
h := new(Handler)
|
||||
h.policies = make(map[uint64]*config.Policy)
|
||||
h.policies = make(map[string]*config.Policy)
|
||||
return h
|
||||
}
|
||||
|
||||
// GetPolicyIDFromHeaders gets a policy id from http headers. If no policy id is found
|
||||
// or the HMAC isn't valid, false will be returned.
|
||||
func (h *Handler) GetPolicyIDFromHeaders(headers http.Header) (uint64, bool) {
|
||||
policyStr := headers.Get(httputil.HeaderPomeriumReproxyPolicy)
|
||||
func (h *Handler) GetPolicyIDFromHeaders(headers http.Header) (string, bool) {
|
||||
policyID := headers.Get(httputil.HeaderPomeriumReproxyPolicy)
|
||||
hmacStr := headers.Get(httputil.HeaderPomeriumReproxyPolicyHMAC)
|
||||
hmac, err := base64.StdEncoding.DecodeString(hmacStr)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
policyID, err := strconv.ParseUint(policyStr, 10, 64)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
return "", false
|
||||
}
|
||||
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
return policyID, cryptutil.CheckHMAC([]byte(policyStr), hmac, h.key)
|
||||
return policyID, cryptutil.CheckHMAC([]byte(policyID), hmac, h.key)
|
||||
}
|
||||
|
||||
// GetPolicyIDHeaders returns http headers for the given policy id.
|
||||
func (h *Handler) GetPolicyIDHeaders(policyID uint64) [][2]string {
|
||||
func (h *Handler) GetPolicyIDHeaders(policyID string) [][2]string {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
s := strconv.FormatUint(policyID, 10)
|
||||
hmac := base64.StdEncoding.EncodeToString(cryptutil.GenerateHMAC([]byte(s), h.key))
|
||||
hmac := base64.StdEncoding.EncodeToString(cryptutil.GenerateHMAC([]byte(policyID), h.key))
|
||||
return [][2]string{
|
||||
{httputil.HeaderPomeriumReproxyPolicy, s},
|
||||
{httputil.HeaderPomeriumReproxyPolicy, policyID},
|
||||
{httputil.HeaderPomeriumReproxyPolicyHMAC, hmac},
|
||||
}
|
||||
}
|
||||
|
@ -133,7 +126,7 @@ func (h *Handler) Update(ctx context.Context, cfg *config.Config) {
|
|||
|
||||
h.key, _ = cfg.Options.GetSharedKey()
|
||||
h.options = cfg.Options
|
||||
h.policies = make(map[uint64]*config.Policy, cfg.Options.NumPolicies())
|
||||
h.policies = make(map[string]*config.Policy, cfg.Options.NumPolicies())
|
||||
for p := range cfg.Options.GetAllPolicies() {
|
||||
id, err := p.RouteID()
|
||||
if err != nil {
|
||||
|
|
|
@ -12,6 +12,8 @@ type AuthorizeLogField string
|
|||
const (
|
||||
AuthorizeLogFieldCheckRequestID AuthorizeLogField = "check-request-id"
|
||||
AuthorizeLogFieldEmail AuthorizeLogField = "email"
|
||||
AuthorizeLogFieldEnvoyRouteChecksum AuthorizeLogField = "envoy-route-checksum"
|
||||
AuthorizeLogFieldEnvoyRouteID AuthorizeLogField = "envoy-route-id"
|
||||
AuthorizeLogFieldHeaders = AuthorizeLogField(headersFieldName)
|
||||
AuthorizeLogFieldHost AuthorizeLogField = "host"
|
||||
AuthorizeLogFieldIDToken AuthorizeLogField = "id-token"
|
||||
|
@ -25,6 +27,8 @@ const (
|
|||
AuthorizeLogFieldQuery AuthorizeLogField = "query"
|
||||
AuthorizeLogFieldRemovedGroupsCount AuthorizeLogField = "removed-groups-count"
|
||||
AuthorizeLogFieldRequestID AuthorizeLogField = "request-id"
|
||||
AuthorizeLogFieldRouteChecksum AuthorizeLogField = "route-checksum"
|
||||
AuthorizeLogFieldRouteID AuthorizeLogField = "route-id"
|
||||
AuthorizeLogFieldServiceAccountID AuthorizeLogField = "service-account-id"
|
||||
AuthorizeLogFieldSessionID AuthorizeLogField = "session-id"
|
||||
AuthorizeLogFieldUser AuthorizeLogField = "user"
|
||||
|
@ -46,6 +50,10 @@ var DefaultAuthorizeLogFields = []AuthorizeLogField{
|
|||
AuthorizeLogFieldServiceAccountID,
|
||||
AuthorizeLogFieldUser,
|
||||
AuthorizeLogFieldEmail,
|
||||
AuthorizeLogFieldEnvoyRouteChecksum,
|
||||
AuthorizeLogFieldEnvoyRouteID,
|
||||
AuthorizeLogFieldRouteChecksum,
|
||||
AuthorizeLogFieldRouteID,
|
||||
}
|
||||
|
||||
// ErrUnknownAuthorizeLogField indicates that an authorize log field is unknown.
|
||||
|
@ -54,6 +62,8 @@ var ErrUnknownAuthorizeLogField = errors.New("unknown authorize log field")
|
|||
var authorizeLogFieldLookup = map[AuthorizeLogField]struct{}{
|
||||
AuthorizeLogFieldCheckRequestID: {},
|
||||
AuthorizeLogFieldEmail: {},
|
||||
AuthorizeLogFieldEnvoyRouteChecksum: {},
|
||||
AuthorizeLogFieldEnvoyRouteID: {},
|
||||
AuthorizeLogFieldHeaders: {},
|
||||
AuthorizeLogFieldHost: {},
|
||||
AuthorizeLogFieldIDToken: {},
|
||||
|
@ -67,6 +77,8 @@ var authorizeLogFieldLookup = map[AuthorizeLogField]struct{}{
|
|||
AuthorizeLogFieldQuery: {},
|
||||
AuthorizeLogFieldRemovedGroupsCount: {},
|
||||
AuthorizeLogFieldRequestID: {},
|
||||
AuthorizeLogFieldRouteChecksum: {},
|
||||
AuthorizeLogFieldRouteID: {},
|
||||
AuthorizeLogFieldServiceAccountID: {},
|
||||
AuthorizeLogFieldSessionID: {},
|
||||
AuthorizeLogFieldUser: {},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue