integration: add cluster setup and configuration and a few tests

This commit is contained in:
Caleb Doxsey 2020-04-28 07:33:33 -06:00
parent 9860c3ce9f
commit 8fd716e1d8
24 changed files with 1689 additions and 2 deletions

View file

@ -0,0 +1,65 @@
package cluster
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"github.com/google/uuid"
)
type TLSCerts struct {
CA string
Cert string
Key string
}
func bootstrapCerts(ctx context.Context) (*TLSCerts, error) {
err := run(ctx, "mkcert", withArgs("-install"))
if err != nil {
return nil, fmt.Errorf("error install root certificate: %w", err)
}
var buf bytes.Buffer
err = run(ctx, "mkcert", withArgs("-CAROOT"), withStdout(&buf))
if err != nil {
return nil, fmt.Errorf("error running mkcert")
}
caPath := strings.TrimSpace(buf.String())
ca, err := ioutil.ReadFile(filepath.Join(caPath, "rootCA.pem"))
if err != nil {
return nil, fmt.Errorf("error reading root ca: %w", err)
}
wd := filepath.Join(os.TempDir(), uuid.New().String())
err = os.MkdirAll(wd, 0755)
if err != nil {
return nil, fmt.Errorf("error creating temporary directory: %w", err)
}
err = run(ctx, "mkcert", withArgs("*.localhost.pomerium.io"), withWorkingDir(wd))
if err != nil {
return nil, fmt.Errorf("error generating certificates: %w", err)
}
cert, err := ioutil.ReadFile(filepath.Join(wd, "_wildcard.localhost.pomerium.io.pem"))
if err != nil {
return nil, fmt.Errorf("error reading certificate: %w", err)
}
key, err := ioutil.ReadFile(filepath.Join(wd, "_wildcard.localhost.pomerium.io-key.pem"))
if err != nil {
return nil, fmt.Errorf("error reading certificate key: %w", err)
}
return &TLSCerts{
CA: string(ca),
Cert: string(cert),
Key: string(key),
}, nil
}

View file

@ -0,0 +1,46 @@
package cluster
import (
"net/http"
"net/http/cookiejar"
"github.com/rs/zerolog/log"
"golang.org/x/net/publicsuffix"
)
type Cluster struct {
workingDir string
transport http.RoundTripper
certs *TLSCerts
}
func New(workingDir string) *Cluster {
return &Cluster{
workingDir: workingDir,
}
}
func (cluster *Cluster) NewHTTPClient() *http.Client {
jar, err := cookiejar.New(&cookiejar.Options{PublicSuffixList: publicsuffix.List})
if err != nil {
panic(err)
}
return &http.Client{
Transport: &loggingRoundTripper{cluster.transport},
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Jar: jar,
}
}
type loggingRoundTripper struct {
http.RoundTripper
}
func (rt *loggingRoundTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
res, err = rt.RoundTripper.RoundTrip(req)
log.Debug().Str("method", req.Method).Str("url", req.URL.String()).Msg("http request")
return res, err
}

View file

@ -0,0 +1,70 @@
package cluster
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"github.com/rs/zerolog/log"
)
type cmdOption func(*exec.Cmd)
func withArgs(args ...string) cmdOption {
return func(cmd *exec.Cmd) {
cmd.Args = append([]string{"kubectl"}, args...)
}
}
func withStdin(rdr io.Reader) cmdOption {
return func(cmd *exec.Cmd) {
cmd.Stdin = rdr
}
}
func withStdout(w io.Writer) cmdOption {
return func(cmd *exec.Cmd) {
cmd.Stdout = w
}
}
func withWorkingDir(wd string) cmdOption {
return func(cmd *exec.Cmd) {
cmd.Dir = wd
}
}
func run(ctx context.Context, name string, options ...cmdOption) error {
cmd := commandContext(ctx, name)
for _, o := range options {
o(cmd)
}
if cmd.Stderr == nil {
stderr, err := cmd.StderrPipe()
if err != nil {
return fmt.Errorf("failed to create stderr pipe for %s: %w", name, err)
}
go cmdLogger(stderr)
defer stderr.Close()
}
if cmd.Stdout == nil {
stdout, err := cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("failed to create stdout pipe for %s: %w", name, err)
}
go cmdLogger(stdout)
defer stdout.Close()
}
log.Debug().Strs("args", cmd.Args).Msgf("running %s", name)
return cmd.Run()
}
func cmdLogger(rdr io.Reader) {
s := bufio.NewScanner(rdr)
for s.Scan() {
log.Debug().Msg(s.Text())
}
}

View file

@ -0,0 +1,24 @@
// +build linux
package cluster
import (
"context"
"os/exec"
"syscall"
"github.com/onsi/gocleanup"
)
func commandContext(ctx context.Context, name string, args ...string) *exec.Cmd {
cmd := exec.CommandContext(ctx, name, args...)
cmd.SysProcAttr = &syscall.SysProcAttr{
Pdeathsig: syscall.SIGTERM,
}
gocleanup.Register(func() {
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
})
return cmd
}

View file

@ -0,0 +1,20 @@
// +build !linux
package cluster
import (
"context"
"os/exec"
"github.com/onsi/gocleanup"
)
func commandContext(ctx context.Context, name string, args ...string) *exec.Cmd {
cmd := exec.CommandContext(ctx, name, args...)
gocleanup.Register(func() {
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
})
return cmd
}

View file

@ -0,0 +1,6 @@
package cluster
type Config struct {
WorkingDirectory string
HTTPSPort int
}

View file

@ -0,0 +1,222 @@
package cluster
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/google/go-jsonnet"
"github.com/pomerium/pomerium/integration/internal/httputil"
"github.com/rs/zerolog/log"
)
var requiredDeployments = []string{
"default/httpbin",
"default/httpecho",
"default/openid",
"default/pomerium-authenticate",
"default/pomerium-authorize",
"default/pomerium-proxy",
"ingress-nginx/nginx-ingress-controller",
}
// Setup configures the test cluster so that it is ready for the integration tests.
func (cluster *Cluster) Setup(ctx context.Context) error {
err := run(ctx, "kubectl", withArgs("cluster-info"))
if err != nil {
return fmt.Errorf("error running kubectl cluster-info: %w", err)
}
cluster.certs, err = bootstrapCerts(ctx)
if err != nil {
return err
}
jsonsrc, err := cluster.generateManifests(ctx)
if err != nil {
return err
}
err = applyManifests(ctx, jsonsrc)
if err != nil {
return err
}
hostport, err := cluster.getNodeHTTPSAddr(ctx)
if err != nil {
return err
}
cluster.transport = httputil.NewLocalRoundTripper(&http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}, map[string]string{
"443": hostport,
})
return nil
}
func (cluster *Cluster) getNodeHTTPSAddr(ctx context.Context) (hostport string, err error) {
var buf bytes.Buffer
args := []string{"get", "service", "--namespace", "ingress-nginx", "--output", "json",
"ingress-nginx-nodeport"}
err = run(ctx, "kubectl", withArgs(args...), withStdout(&buf))
if err != nil {
return "", fmt.Errorf("error getting service details with kubectl: %w", err)
}
var svcResult struct {
Spec struct {
Ports []struct {
Name string `json:"name"`
NodePort int `json:"nodePort"`
} `json:"ports"`
Selector map[string]string `json:"selector"`
} `json:"spec"`
}
err = json.Unmarshal(buf.Bytes(), &svcResult)
if err != nil {
return "", fmt.Errorf("error unmarshaling service details from kubectl: %w", err)
}
buf.Reset()
args = []string{"get", "pods", "--namespace", "ingress-nginx", "--output", "json"}
var sel []string
for k, v := range svcResult.Spec.Selector {
sel = append(sel, k+"="+v)
}
args = append(args, "--selector", strings.Join(sel, ","))
err = run(ctx, "kubectl", withArgs(args...), withStdout(&buf))
if err != nil {
return "", fmt.Errorf("error getting pod details with kubectl: %w", err)
}
var podsResult struct {
Items []struct {
Status struct {
HostIP string `json:"hostIP"`
} `json:"status"`
} `json:"items"`
}
err = json.Unmarshal(buf.Bytes(), &podsResult)
if err != nil {
return "", fmt.Errorf("error unmarshaling pod details from kubectl (json=%s): %w", buf.String(), err)
}
var port string
for _, p := range svcResult.Spec.Ports {
if p.Name == "https" {
port = strconv.Itoa(p.NodePort)
}
}
if port == "" {
return "", fmt.Errorf("failed to find https port in kubectl service results (result=%v)", svcResult)
}
var hostIP string
for _, item := range podsResult.Items {
hostIP = item.Status.HostIP
}
if hostIP == "" {
return "", fmt.Errorf("failed to find host ip in kubectl pod results: %w", err)
}
return net.JoinHostPort(hostIP, port), nil
}
func (cluster *Cluster) generateManifests(ctx context.Context) (string, error) {
src, err := ioutil.ReadFile(filepath.Join(cluster.workingDir, "manifests", "manifests.jsonnet"))
if err != nil {
return "", fmt.Errorf("error reading manifest jsonnet src: %w", err)
}
vm := jsonnet.MakeVM()
vm.ExtVar("tls-ca", cluster.certs.CA)
vm.ExtVar("tls-cert", cluster.certs.Cert)
vm.ExtVar("tls-key", cluster.certs.Key)
vm.Importer(&jsonnet.FileImporter{
JPaths: []string{filepath.Join(cluster.workingDir, "manifests")},
})
jsonsrc, err := vm.EvaluateSnippet("manifests.jsonnet", string(src))
if err != nil {
return "", fmt.Errorf("error evaluating jsonnet (filename=manifests.jsonnet): %w", err)
}
return jsonsrc, nil
}
func applyManifests(ctx context.Context, jsonsrc string) error {
err := run(ctx, "kubectl", withArgs("apply", "-f", "-"), withStdin(strings.NewReader(jsonsrc)))
if err != nil {
return fmt.Errorf("error applying manifests: %w", err)
}
log.Info().Msg("waiting for deployments to come up")
ctx, clearTimeout := context.WithTimeout(ctx, 5*time.Minute)
defer clearTimeout()
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
var buf bytes.Buffer
err = run(ctx, "kubectl", withArgs("get", "deployments", "--all-namespaces", "--output", "json"),
withStdout(&buf))
if err != nil {
return fmt.Errorf("error polling for deployment status: %w", err)
}
var results struct {
Items []struct {
Metadata struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
} `json:"metadata"`
Status struct {
AvailableReplicas int `json:"availableReplicas"`
} `json:"status"`
} `json:"items"`
}
err = json.Unmarshal(buf.Bytes(), &results)
if err != nil {
return fmt.Errorf("error unmarshaling kubectl results: %w", err)
}
byName := map[string]int{}
for _, item := range results.Items {
byName[item.Metadata.Namespace+"/"+item.Metadata.Name] = item.Status.AvailableReplicas
}
done := true
for _, dep := range requiredDeployments {
if byName[dep] < 1 {
done = false
log.Warn().Str("deployment", dep).Msg("deployment is not ready yet")
}
}
if done {
break
}
select {
case <-ticker.C:
case <-ctx.Done():
return ctx.Err()
}
<-ticker.C
}
log.Info().Msg("all deployments are ready")
return nil
}