envoy: add hash policy and routing key for hash-based load balancers (#2791)

* envoy: add hash policy and routing key for hash-based load balancers

* fix integration test

* fix nginx
This commit is contained in:
Caleb Doxsey 2021-12-01 13:42:12 -07:00 committed by GitHub
parent bd0a5389bf
commit c97dcf7e0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
56 changed files with 12935 additions and 182 deletions

View file

@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
"github.com/open-policy-agent/opa/rego"
"github.com/pomerium/pomerium/authorize/evaluator/opa"
@ -16,6 +17,7 @@ import (
// HeadersRequest is the input to the headers.rego script.
type HeadersRequest struct {
EnableGoogleCloudServerlessAuthentication bool `json:"enable_google_cloud_serverless_authentication"`
EnableRoutingKey bool `json:"enable_routing_key"`
FromAudience string `json:"from_audience"`
KubernetesServiceAccountToken string `json:"kubernetes_service_account_token"`
ToAudience string `json:"to_audience"`
@ -26,6 +28,8 @@ type HeadersRequest struct {
func NewHeadersRequestFromPolicy(policy *config.Policy) *HeadersRequest {
input := new(HeadersRequest)
input.EnableGoogleCloudServerlessAuthentication = policy.EnableGoogleCloudServerlessAuthentication
input.EnableRoutingKey = policy.EnvoyOpts.GetLbPolicy() == envoy_config_cluster_v3.Cluster_RING_HASH ||
policy.EnvoyOpts.GetLbPolicy() == envoy_config_cluster_v3.Cluster_MAGLEV
if u, err := urlutil.ParseAndValidateURL(policy.From); err == nil {
input.FromAudience = u.Hostname()
}

View file

@ -2,6 +2,7 @@ package pomerium.headers
# input:
# enable_google_cloud_serverless_authentication: boolean
# enable_routing_key: boolean
# from_audience: string
# kubernetes_service_account_token: string
# session:
@ -206,6 +207,15 @@ google_cloud_serverless_headers = h {
true
}
routing_key_headers = h {
input.enable_routing_key
h := [
["x-pomerium-routing-key", crypto.sha256(input.session.id)]
]
} else = [] {
true
}
identity_headers := {key: values |
h1 := [["x-pomerium-jwt-assertion", signed_jwt]]
h2 := [[header_name, header_value] |
@ -223,8 +233,9 @@ identity_headers := {key: values |
h3 := kubernetes_headers
h4 := [[k, v] | v := google_cloud_serverless_headers[k]]
h5 := routing_key_headers
h := array.concat(array.concat(array.concat(h1, h2), h3), h4)
h := array.concat(array.concat(array.concat(array.concat(h1, h2), h3), h4), h5)
some i
[key, v1] := h[i]

View file

@ -420,8 +420,29 @@ func (b *Builder) buildPolicyRouteRouteAction(options *config.Options, policy *c
IdleTimeout: idleTimeout,
PrefixRewrite: prefixRewrite,
RegexRewrite: regexRewrite,
HashPolicy: []*envoy_config_route_v3.RouteAction_HashPolicy{
// hash by the routing key, which is added by authorize.
{
PolicySpecifier: &envoy_config_route_v3.RouteAction_HashPolicy_Header_{
Header: &envoy_config_route_v3.RouteAction_HashPolicy_Header{
HeaderName: httputil.HeaderPomeriumRoutingKey,
},
},
Terminal: true,
},
// if the routing key is missing, hash by the ip.
{
PolicySpecifier: &envoy_config_route_v3.RouteAction_HashPolicy_ConnectionProperties_{
ConnectionProperties: &envoy_config_route_v3.RouteAction_HashPolicy_ConnectionProperties{
SourceIp: true,
},
},
Terminal: true,
},
},
}
setHostRewriteOptions(policy, action)
return action, nil
}

View file

@ -263,6 +263,20 @@ func TestTimeouts(t *testing.T) {
%s,
"autoHostRewrite": true,
"cluster": "policy",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"upgradeConfigs": [
{ "enabled": %v, "upgradeType": "websocket"},
{ "enabled": false, "upgradeType": "spdy/3.1"}
@ -367,6 +381,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-1",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -396,6 +424,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": false,
"cluster": "policy-2",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"idleTimeout": "0s",
"timeout": "0s",
"upgradeConfigs": [
@ -426,6 +468,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-3",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "60s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -465,6 +521,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-4",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -494,6 +564,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-5",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "60s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -524,6 +608,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": false,
"cluster": "policy-6",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -553,6 +651,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": false,
"cluster": "policy-7",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"idleTimeout": "0s",
"timeout": "0s",
"upgradeConfigs": [
@ -583,6 +695,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": false,
"cluster": "policy-8",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"idleTimeout": "0s",
"timeout": "10s",
"upgradeConfigs": [
@ -630,6 +756,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-9",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -689,6 +829,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-10",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"idleTimeout": "0s",
"timeout": "0s",
"upgradeConfigs": [
@ -720,6 +874,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-11",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"idleTimeout": "0s",
"timeout": "10s",
"upgradeConfigs": [
@ -774,6 +942,20 @@ func Test_buildPolicyRoutes(t *testing.T) {
"route": {
"autoHostRewrite": true,
"cluster": "policy-12",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -865,6 +1047,20 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) {
"autoHostRewrite": true,
"prefixRewrite": "/bar",
"cluster": "policy-1",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -895,6 +1091,20 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) {
"autoHostRewrite": true,
"prefixRewrite": "/foo",
"cluster": "policy-2",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -931,6 +1141,20 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) {
"substitution": "\\2/instance/\\1"
},
"cluster": "policy-3",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -961,6 +1185,20 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) {
"hostRewriteLiteral": "literal.example.com",
"prefixRewrite": "/bar",
"cluster": "policy-4",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -991,6 +1229,20 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) {
"hostRewriteHeader": "HOST_HEADER",
"prefixRewrite": "/bar",
"cluster": "policy-5",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},
@ -1027,6 +1279,20 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) {
},
"prefixRewrite": "/bar",
"cluster": "policy-6",
"hashPolicy": [
{
"header": {
"headerName": "x-pomerium-routing-key"
},
"terminal": true
},
{
"connectionProperties": {
"sourceIp": true
},
"terminal": true
}
],
"timeout": "3s",
"upgradeConfigs": [
{ "enabled": false, "upgradeType": "websocket"},

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -4,12 +4,15 @@ import (
"context"
"crypto/tls"
"encoding/json"
"io"
"net/http"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/pomerium/pomerium/integration/flows"
)
func TestCORS(t *testing.T) {
@ -248,3 +251,72 @@ func TestGoogleCloudRun(t *testing.T) {
assert.NotEmpty(t, result.Headers["authorization"], "expected authorization header when cloudrun is enabled")
}
}
func TestLoadBalancer(t *testing.T) {
if ClusterType == "traefik" || ClusterType == "nginx" {
t.Skip()
return
}
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Minute*10)
defer clearTimeout()
getDistribution := func(t *testing.T, path string) map[string]float64 {
client := getClient()
distribution := map[string]float64{}
res, err := flows.Authenticate(ctx, client,
mustParseURL("https://httpdetails.localhost.pomerium.io/"+path),
flows.WithEmail("user1@dogs.test"))
if !assert.NoError(t, err) {
return distribution
}
_, _ = io.ReadAll(res.Body)
_ = res.Body.Close()
for i := 0; i < 100; i++ {
req, err := http.NewRequestWithContext(ctx, "GET",
"https://httpdetails.localhost.pomerium.io/"+path, nil)
if !assert.NoError(t, err) {
return distribution
}
res, err = client.Do(req)
if !assert.NoError(t, err) {
return distribution
}
var result struct {
Hostname string `json:"hostname"`
}
err = json.NewDecoder(res.Body).Decode(&result)
_ = res.Body.Close()
assert.NoError(t, err)
distribution[result.Hostname]++
}
return distribution
}
t.Run("round robin", func(t *testing.T) {
distribution := getDistribution(t, "round-robin")
var xs []float64
for _, x := range distribution {
xs = append(xs, x)
}
assert.Lessf(t, standardDeviation(xs), 10.0, "should distribute requests evenly, got: %v",
distribution)
})
t.Run("ring hash", func(t *testing.T) {
distribution := getDistribution(t, "ring-hash")
assert.Lenf(t, distribution, 1, "should distribute requests to a single backend, got: %v",
distribution)
})
t.Run("maglev", func(t *testing.T) {
distribution := getDistribution(t, "maglev")
assert.Lenf(t, distribution, 1, "should distribute requests to a single backend, got: %v",
distribution)
})
}

26
integration/stats.go Normal file
View file

@ -0,0 +1,26 @@
package main
import "math"
func mean(xs []float64) float64 {
var sum float64
for _, x := range xs {
sum += x
}
return sum / float64(len(xs))
}
func variance(xs []float64) float64 {
m := mean(xs)
var sum float64
for _, x := range xs {
dx := x - m
sum += dx * dx
}
return sum / float64(len(xs))
}
func standardDeviation(xs []float64) float64 {
return math.Sqrt(variance(xs))
}

View file

@ -7,6 +7,21 @@ local Variations() =
cert: importstr '../files/trusted.pem',
key: importstr '../files/trusted-key.pem',
},
{
name: 'trusted-1',
cert: importstr '../files/trusted.pem',
key: importstr '../files/trusted-key.pem',
},
{
name: 'trusted-2',
cert: importstr '../files/trusted.pem',
key: importstr '../files/trusted-key.pem',
},
{
name: 'trusted-3',
cert: importstr '../files/trusted.pem',
key: importstr '../files/trusted-key.pem',
},
{
name: 'untrusted',
cert: importstr '../files/untrusted.pem',

View file

@ -88,6 +88,9 @@ local RouteLocationConfig(route) =
if std.objectHas(route, 'prefix') then '^~ ' + route.prefix
else if std.objectHas(route, 'path') then '= ' + route.path
else '/';
local to =
if std.isArray(route.to) then route.to[0]
else route.to;
|||
location %s {
proxy_pass %s;
@ -100,7 +103,7 @@ local RouteLocationConfig(route) =
auth_request_set $auth_cookie $upstream_http_set_cookie;
add_header Set-Cookie $auth_cookie;
}
||| % [rule, route.to];
||| % [rule, to];
local DomainServerConfig(domain, routes) =
local locations = std.join('\n', std.map(function(route) RouteLocationConfig(route), routes));

View file

@ -121,6 +121,9 @@ local ComposeService(name, definition, additionalAliases=[]) =
'mock-idp',
'redis',
'trusted-httpdetails',
'trusted-1-httpdetails',
'trusted-2-httpdetails',
'trusted-3-httpdetails',
'untrusted-httpdetails',
'verify',
'websocket-echo',

View file

@ -149,6 +149,42 @@ local Routes(mode, idp, dns_suffix) =
allowed_users: ['user1@dogs.test'],
pass_identity_headers: true,
},
// round robin load balancer
{
from: 'https://httpdetails.localhost.pomerium.io',
to: [
'http://trusted-1-httpdetails' + dns_suffix + ':8080',
'http://trusted-2-httpdetails' + dns_suffix + ':8080',
'http://trusted-3-httpdetails' + dns_suffix + ':8080',
],
prefix: '/round-robin',
allow_any_authenticated_user: true,
lb_policy: 'ROUND_ROBIN',
},
// ring hash load balancer
{
from: 'https://httpdetails.localhost.pomerium.io',
to: [
'http://trusted-1-httpdetails' + dns_suffix + ':8080',
'http://trusted-2-httpdetails' + dns_suffix + ':8080',
'http://trusted-3-httpdetails' + dns_suffix + ':8080',
],
prefix: '/ring-hash',
allow_any_authenticated_user: true,
lb_policy: 'RING_HASH',
},
// maglev load balancer
{
from: 'https://httpdetails.localhost.pomerium.io',
to: [
'http://trusted-1-httpdetails' + dns_suffix + ':8080',
'http://trusted-2-httpdetails' + dns_suffix + ':8080',
'http://trusted-3-httpdetails' + dns_suffix + ':8080',
],
prefix: '/maglev',
allow_any_authenticated_user: true,
lb_policy: 'MAGLEV',
},
// catch-all
{
from: 'https://httpdetails.localhost.pomerium.io',

View file

@ -30,6 +30,8 @@ const (
HeaderPomeriumReproxyPolicy = "x-pomerium-reproxy-policy"
// HeaderPomeriumReproxyPolicyHMAC is an HMAC of the HeaderPomeriumReproxyPolicy header.
HeaderPomeriumReproxyPolicyHMAC = "x-pomerium-reproxy-policy-hmac"
// HeaderPomeriumRoutingKey is a string used for routing user requests to a consistent upstream server.
HeaderPomeriumRoutingKey = "x-pomerium-routing-key"
)
// HeadersContentSecurityPolicy are the content security headers added to the service's handlers