diff --git a/config/http.go b/config/http.go index f8ac27138..e5684ca32 100644 --- a/config/http.go +++ b/config/http.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/tripper" "github.com/pomerium/pomerium/pkg/cryptutil" ) @@ -49,3 +50,52 @@ func (t *httpTransport) update(options *Options) { func (t *httpTransport) RoundTrip(req *http.Request) (*http.Response, error) { return t.transport.Load().(http.RoundTripper).RoundTrip(req) } + +// Clone returns a clone of the transport. +func (t *httpTransport) Clone() *http.Transport { + return t.transport.Load().(*http.Transport).Clone() +} + +// NewPolicyHTTPTransport creates a new http RoundTripper for a policy. +func NewPolicyHTTPTransport(options *Options, policy *Policy) http.RoundTripper { + transport := http.DefaultTransport.(interface { + Clone() *http.Transport + }).Clone() + c := tripper.NewChain() + + var tlsClientConfig tls.Config + var isCustomClientConfig bool + + if policy.TLSSkipVerify { + tlsClientConfig.InsecureSkipVerify = true + isCustomClientConfig = true + } + + if options.CA != "" || options.CAFile != "" { + rootCAs, err := cryptutil.GetCertPool(options.CA, options.CAFile) + if err == nil { + tlsClientConfig.RootCAs = rootCAs + tlsClientConfig.MinVersion = tls.VersionTLS12 + isCustomClientConfig = true + } else { + log.Error().Err(err).Msg("config: error getting cert pool") + } + } + + if policy.ClientCertificate != nil { + tlsClientConfig.Certificates = []tls.Certificate{*policy.ClientCertificate} + isCustomClientConfig = true + } + + if policy.TLSServerName != "" { + tlsClientConfig.ServerName = policy.TLSServerName + isCustomClientConfig = true + } + + // We avoid setting a custom client config unless we have to as + // if TLSClientConfig is nil, the default configuration is used. + if isCustomClientConfig { + transport.TLSClientConfig = &tlsClientConfig + } + return c.Then(transport) +} diff --git a/config/policy.go b/config/policy.go index 9b572191b..71889f270 100644 --- a/config/policy.go +++ b/config/policy.go @@ -530,6 +530,11 @@ func (p *Policy) Matches(requestURL url.URL) bool { return true } +// IsForKubernetes returns true if the policy is for kubernetes. +func (p *Policy) IsForKubernetes() bool { + return p.KubernetesServiceAccountTokenFile != "" || p.KubernetesServiceAccountToken != "" +} + // StringURL stores a URL as a string in json. type StringURL struct { *url.URL diff --git a/internal/controlplane/http.go b/internal/controlplane/http.go index 26d8730c1..7007f3c70 100644 --- a/internal/controlplane/http.go +++ b/internal/controlplane/http.go @@ -17,6 +17,7 @@ import ( func (srv *Server) addHTTPMiddleware() { root := srv.HTTPRouter + root.Use(srv.reproxy.Middleware) root.Use(requestid.HTTPMiddleware()) root.Use(log.NewHandler(log.Logger)) root.Use(log.AccessHandler(func(r *http.Request, status, size int, duration time.Duration) { diff --git a/internal/controlplane/server.go b/internal/controlplane/server.go index 90e5ddd86..7c6a12701 100644 --- a/internal/controlplane/server.go +++ b/internal/controlplane/server.go @@ -17,6 +17,7 @@ import ( "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/controlplane/filemgr" "github.com/pomerium/pomerium/internal/controlplane/xdsmgr" + "github.com/pomerium/pomerium/internal/httputil/reproxy" "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/telemetry" "github.com/pomerium/pomerium/internal/telemetry/requestid" @@ -53,12 +54,14 @@ type Server struct { xdsmgr *xdsmgr.Manager filemgr *filemgr.Manager metricsMgr *config.MetricsManager + reproxy *reproxy.Handler } // NewServer creates a new Server. Listener ports are chosen by the OS. func NewServer(name string, metricsMgr *config.MetricsManager) (*Server, error) { srv := &Server{ metricsMgr: metricsMgr, + reproxy: reproxy.New(), } srv.currentConfig.Store(versionedConfig{ Config: &config.Config{Options: &config.Options{}}, @@ -167,6 +170,7 @@ func (srv *Server) Run(ctx context.Context) error { // OnConfigChange updates the pomerium config options. func (srv *Server) OnConfigChange(cfg *config.Config) error { + srv.reproxy.Update(cfg) prev := srv.currentConfig.Load() srv.currentConfig.Store(versionedConfig{ Config: cfg, diff --git a/internal/controlplane/xds_routes.go b/internal/controlplane/xds_routes.go index e9e084027..68c0e8e08 100644 --- a/internal/controlplane/xds_routes.go +++ b/internal/controlplane/xds_routes.go @@ -317,11 +317,25 @@ func (srv *Server) buildPolicyRoutes(options *config.Options, domain string) ([] } luaMetadata["remove_impersonate_headers"] = &structpb.Value{ Kind: &structpb.Value_BoolValue{ - BoolValue: policy.KubernetesServiceAccountTokenFile != "" || policy.KubernetesServiceAccountToken != "", + BoolValue: policy.IsForKubernetes(), }, } } + if policy.IsForKubernetes() { + policyID, _ := policy.RouteID() + for _, hdr := range srv.reproxy.GetPolicyIDHeaders(policyID) { + envoyRoute.RequestHeadersToAdd = append(envoyRoute.RequestHeadersToAdd, + &envoy_config_core_v3.HeaderValueOption{ + Header: &envoy_config_core_v3.HeaderValue{ + Key: hdr[0], + Value: hdr[1], + }, + Append: wrapperspb.Bool(false), + }) + } + } + envoyRoute.Metadata.FilterMetadata = map[string]*structpb.Struct{ "envoy.filters.http.lua": {Fields: luaMetadata}, } @@ -370,6 +384,10 @@ func (srv *Server) buildPolicyRouteRedirectAction(r *config.PolicyRedirect) (*en func (srv *Server) buildPolicyRouteRouteAction(options *config.Options, policy *config.Policy) (*envoy_config_route_v3.RouteAction, error) { clusterName := getClusterID(policy) + // kubernetes requests are sent to the http control plane to be reproxied + if policy.IsForKubernetes() { + clusterName = httpCluster + } routeTimeout := getRouteTimeout(options, policy) idleTimeout := getRouteIdleTimeout(policy) prefixRewrite, regexRewrite := getRewriteOptions(policy) @@ -465,6 +483,11 @@ func getRequestHeadersToRemove(options *config.Options, policy *config.Policy) [ requestHeadersToRemove = append(requestHeadersToRemove, httputil.PomeriumJWTHeaderName(claim)) } } + // remove these headers to prevent a user from re-proxying requests through the control plane + requestHeadersToRemove = append(requestHeadersToRemove, + httputil.HeaderPomeriumReproxyPolicy, + httputil.HeaderPomeriumReproxyPolicyHMAC, + ) return requestHeadersToRemove } diff --git a/internal/controlplane/xds_routes_test.go b/internal/controlplane/xds_routes_test.go index 965609203..20e114dc5 100644 --- a/internal/controlplane/xds_routes_test.go +++ b/internal/controlplane/xds_routes_test.go @@ -308,7 +308,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-2", @@ -334,7 +338,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": true, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-3", @@ -366,7 +374,11 @@ func Test_buildPolicyRoutes(t *testing.T) { "key": "HEADER-KEY", "value": "HEADER-VALUE" } - }] + }], + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-4", @@ -394,7 +406,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-5", @@ -420,7 +436,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": false, "upgradeType": "spdy/3.1"} ] }, - "requestHeadersToRemove": ["HEADER-KEY"] + "requestHeadersToRemove": [ + "HEADER-KEY", + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-6", @@ -445,7 +465,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": true, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-7", @@ -471,7 +495,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": true, "upgradeType": "websocket"}, { "enabled": true, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-8", @@ -497,7 +525,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": true, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] } ] `, routes) @@ -540,6 +572,10 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": false, "upgradeType": "spdy/3.1"} ] }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ], "typedPerFilterConfig": { "envoy.filters.http.ext_authz": { "@type": "type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthzPerRoute", @@ -596,7 +632,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": false, "upgradeType": "spdy/3.1"}, { "enabled": true, "upgradeType": "CONNECT", "connectConfig": {} } ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-1", @@ -623,7 +663,11 @@ func Test_buildPolicyRoutes(t *testing.T) { { "enabled": false, "upgradeType": "spdy/3.1"}, { "enabled": true, "upgradeType": "CONNECT", "connectConfig": {} } ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] } ] `, routes) @@ -707,7 +751,11 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-1", @@ -733,7 +781,11 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-2", @@ -765,7 +817,11 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-3", @@ -791,7 +847,11 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-4", @@ -817,7 +877,11 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] }, { "name": "policy-5", @@ -849,7 +913,11 @@ func Test_buildPolicyRoutesRewrite(t *testing.T) { { "enabled": false, "upgradeType": "websocket"}, { "enabled": false, "upgradeType": "spdy/3.1"} ] - } + }, + "requestHeadersToRemove": [ + "x-pomerium-reproxy-policy", + "x-pomerium-reproxy-policy-hmac" + ] } ] `, routes) diff --git a/internal/httputil/headers.go b/internal/httputil/headers.go index aa5c8cb20..ee77b72a7 100644 --- a/internal/httputil/headers.go +++ b/internal/httputil/headers.go @@ -5,7 +5,8 @@ const AuthorizationTypePomerium = "Pomerium" // Standard headers const ( - HeaderReferrer = "Referer" + HeaderReferrer = "Referer" + HeaderImpersonateGroup = "Impersonate-Group" ) // Pomerium headers contain information added to a request. @@ -17,6 +18,10 @@ const ( HeaderPomeriumResponse = "x-pomerium-intercepted-response" // HeaderPomeriumJWTAssertion is the header key containing JWT signed user details. HeaderPomeriumJWTAssertion = "x-pomerium-jwt-assertion" + // HeaderPomeriumReproxyPolicy is the header key containing the policy to reproxy a request to. + HeaderPomeriumReproxyPolicy = "x-pomerium-reproxy-policy" + // HeaderPomeriumReproxyPolicyHMAC is an HMAC of the HeaderPomeriumReproxyPolicy header. + HeaderPomeriumReproxyPolicyHMAC = "x-pomerium-reproxy-policy-hmac" ) // HeadersContentSecurityPolicy are the content security headers added to the service's handlers diff --git a/internal/httputil/reproxy/reproxy.go b/internal/httputil/reproxy/reproxy.go new file mode 100644 index 000000000..f78de1d70 --- /dev/null +++ b/internal/httputil/reproxy/reproxy.go @@ -0,0 +1,138 @@ +// Package reproxy contains a handler for re-proxying traffic through the http controlplane. +package reproxy + +import ( + "encoding/base64" + "errors" + "math/rand" + "net/http" + stdhttputil "net/http/httputil" + "net/url" + "strconv" + "strings" + "sync" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/internal/httputil" + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/pkg/cryptutil" +) + +// The Handler looks for an X-Pomerium-Reproxy-Policy header and if found re-proxies the request upstream +// to the destination. +// +// This is used to forward requests to Kubernetes with headers split to multiple values instead of coalesced via a +// comma. (https://github.com/kubernetes/kubernetes/issues/94683) If the upstream issue is fixed we will remove this. +type Handler struct { + mu sync.RWMutex + key []byte + options *config.Options + policies map[uint64]*config.Policy +} + +// New creates a new Handler. +func New() *Handler { + h := new(Handler) + h.policies = make(map[uint64]*config.Policy) + return h +} + +// GetPolicyIDFromHeaders gets a policy id from http headers. If no policy id is found +// or the HMAC isn't valid, false will be returned. +func (h *Handler) GetPolicyIDFromHeaders(headers http.Header) (uint64, bool) { + policyStr := headers.Get(httputil.HeaderPomeriumReproxyPolicy) + hmacStr := headers.Get(httputil.HeaderPomeriumReproxyPolicyHMAC) + hmac, err := base64.StdEncoding.DecodeString(hmacStr) + if err != nil { + return 0, false + } + + policyID, err := strconv.ParseUint(policyStr, 10, 64) + if err != nil { + return 0, false + } + + h.mu.RLock() + defer h.mu.RUnlock() + + return policyID, cryptutil.CheckHMAC([]byte(policyStr), hmac, string(h.key)) +} + +// GetPolicyIDHeaders returns http headers for the given policy id. +func (h *Handler) GetPolicyIDHeaders(policyID uint64) [][2]string { + h.mu.RLock() + defer h.mu.RUnlock() + + s := strconv.FormatUint(policyID, 10) + hmac := base64.StdEncoding.EncodeToString(cryptutil.GenerateHMAC([]byte(s), string(h.key))) + return [][2]string{ + {httputil.HeaderPomeriumReproxyPolicy, s}, + {httputil.HeaderPomeriumReproxyPolicyHMAC, hmac}, + } +} + +// Middleware returns an HTTP middleware for handling reproxying. +func (h *Handler) Middleware(next http.Handler) http.Handler { + return httputil.HandlerFunc(func(w http.ResponseWriter, r *http.Request) error { + policyID, ok := h.GetPolicyIDFromHeaders(r.Header) + if !ok { + next.ServeHTTP(w, r) + return nil + } + + h.mu.RLock() + options := h.options + policy, ok := h.policies[policyID] + h.mu.RUnlock() + + if !ok || !policy.IsForKubernetes() { + return httputil.NewError(http.StatusNotFound, errors.New("policy not found")) + } + + // remove these headers from the request to kubernetes + r.Header.Del(httputil.HeaderPomeriumReproxyPolicy) + r.Header.Del(httputil.HeaderPomeriumReproxyPolicyHMAC) + + // fix the impersonate group header + if vs := r.Header.Values(httputil.HeaderImpersonateGroup); len(vs) > 0 { + vs = strings.Split(strings.Join(vs, ","), ",") + r.Header.Del(httputil.HeaderImpersonateGroup) + for _, v := range vs { + r.Header.Add(httputil.HeaderImpersonateGroup, v) + } + } + + var dsts []url.URL + for _, wu := range policy.To { + dsts = append(dsts, wu.URL) + } + if len(dsts) == 0 { + return httputil.NewError(http.StatusNotFound, errors.New("policy destination not found")) + } + // regular rand is fine for this + dst := dsts[rand.Intn(len(dsts))] // nolint:gosec + + h := stdhttputil.NewSingleHostReverseProxy(&dst) + h.Transport = config.NewPolicyHTTPTransport(options, policy) + h.ServeHTTP(w, r) + return nil + }) +} + +// Update updates the handler with new configuration. +func (h *Handler) Update(cfg *config.Config) { + h.mu.Lock() + defer h.mu.Unlock() + + h.key, _ = base64.StdEncoding.DecodeString(cfg.Options.SharedKey) + h.options = cfg.Options + h.policies = make(map[uint64]*config.Policy) + for i, p := range cfg.Options.Policies { + id, err := p.RouteID() + if err != nil { + log.Warn().Err(err).Msg("reproxy: error getting route id") + continue + } + h.policies[id] = &cfg.Options.Policies[i] + } +} diff --git a/internal/httputil/reproxy/reproxy_test.go b/internal/httputil/reproxy/reproxy_test.go new file mode 100644 index 000000000..63979c595 --- /dev/null +++ b/internal/httputil/reproxy/reproxy_test.go @@ -0,0 +1,80 @@ +package reproxy + +import ( + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pomerium/pomerium/config" + "github.com/pomerium/pomerium/pkg/cryptutil" +) + +func TestMiddleware(t *testing.T) { + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "NEXT") + }) + + t.Run("next", func(t *testing.T) { + h := New() + + srv := httptest.NewServer(h.Middleware(next)) + defer srv.Close() + + res, err := http.Get(srv.URL) + require.NoError(t, err) + + body, err := ioutil.ReadAll(res.Body) + require.NoError(t, err) + res.Body.Close() + + assert.Equal(t, "NEXT", string(body)) + }) + t.Run("proxy", func(t *testing.T) { + h := New() + + srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, "SERVER1") + })) + defer srv1.Close() + + u, err := url.Parse(srv1.URL) + require.NoError(t, err) + + srv2 := httptest.NewServer(h.Middleware(next)) + defer srv2.Close() + + cfg := &config.Config{ + Options: &config.Options{ + SharedKey: cryptutil.NewBase64Key(), + Policies: []config.Policy{{ + To: config.WeightedURLs{{URL: *u}}, + KubernetesServiceAccountToken: "ABCD", + }}, + }, + } + h.Update(cfg) + + policyID, _ := cfg.Options.Policies[0].RouteID() + + req, err := http.NewRequest("GET", srv2.URL, nil) + require.NoError(t, err) + for _, hdr := range h.GetPolicyIDHeaders(policyID) { + req.Header.Set(hdr[0], hdr[1]) + } + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + body, err := ioutil.ReadAll(res.Body) + require.NoError(t, err) + res.Body.Close() + + assert.Equal(t, "SERVER1", string(body)) + }) +}