From 77fe37c8c00d3c867c7f2765caf464f3b090b62a Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Wed, 17 Mar 2021 13:48:41 -0600 Subject: [PATCH] redis: add redis cluster support (#1992) * redis: add redis cluster support * redis: update docs --- docs/reference/readme.md | 8 ++ docs/reference/settings.yaml | 8 ++ go.mod | 1 + go.sum | 4 + internal/testutil/redis.go | 150 ++++++++++++++++++++++++++- pkg/storage/redis/client.go | 172 +++++++++++++++++++++++++++++-- pkg/storage/redis/client_test.go | 37 +++++++ pkg/storage/redis/redis.go | 12 ++- pkg/storage/redis/redis_test.go | 6 ++ 9 files changed, 386 insertions(+), 12 deletions(-) diff --git a/docs/reference/readme.md b/docs/reference/readme.md index e0dcf1943..104b07e8b 100644 --- a/docs/reference/readme.md +++ b/docs/reference/readme.md @@ -952,6 +952,14 @@ The backend storage that databroker server will use. The connection string that the databroker service will use to connect to storage backend. +For `redis`, the following URL types are supported: + +- simple: `redis://{username}:{password}@{host}:{port}/{db}` +- sentinel: `redis+sentinel://[:password@]host:port[,host2:port2,...]/[master_name[/db]][?param1=value1[¶m2=value2&...]]` +- cluster: `redis+cluster://[username:password@]host:port[,host2:port2,...]/[?param1=value1[¶m2=value=2&...]]` + +You can also enable TLS with `rediss://`, `rediss+sentinel://` and `rediss+cluster://`. + ### Data Broker Storage Certificate File - Environment Variable: `DATABROKER_STORAGE_CERT_FILE` diff --git a/docs/reference/settings.yaml b/docs/reference/settings.yaml index 2d658ee8a..ddf31044d 100644 --- a/docs/reference/settings.yaml +++ b/docs/reference/settings.yaml @@ -1071,6 +1071,14 @@ settings: - Example: `"redis://localhost:6379/0"`, `"rediss://localhost:6379/0"` doc: | The connection string that the databroker service will use to connect to storage backend. + + For `redis`, the following URL types are supported: + + - simple: `redis://[username:password@]host:port/[db]` + - sentinel: `redis+sentinel://[:password@]host:port[,host2:port2,...]/[master_name[/db]][?param1=value1[¶m2=value2&...]]` + - cluster: `redis+cluster://[username:password@]host:port[,host2:port2,...]/[?param1=value1[¶m2=value=2&...]]` + + You can also enable TLS with `rediss://`, `rediss+sentinel://` and `rediss+cluster://`. - name: "Data Broker Storage Certificate File" keys: ["databroker_storage_cert_file"] attributes: | diff --git a/go.mod b/go.mod index b222ee4a8..2ba626c24 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/rjeczalik/notify v0.9.3-0.20201210012515-e2a77dcc14cf github.com/rs/cors v1.7.0 github.com/rs/zerolog v1.20.0 + github.com/scylladb/go-set v1.0.2 github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966 github.com/spf13/afero v1.5.1 // indirect github.com/spf13/cast v1.3.1 // indirect diff --git a/go.sum b/go.sum index 8db46b6dd..1aa55a389 100644 --- a/go.sum +++ b/go.sum @@ -177,6 +177,8 @@ github.com/envoyproxy/protoc-gen-validate v0.4.1 h1:7dLaJvASGRD7X49jSCSXXHwKPm0Z github.com/envoyproxy/protoc-gen-validate v0.4.1/go.mod h1:E+IEazqdaWv3FrnGtZIu3b9fPFMK8AzeTTrk9SfVwWs= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= +github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA= +github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -549,6 +551,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE= +github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= diff --git a/internal/testutil/redis.go b/internal/testutil/redis.go index c16ea84d7..72afa0230 100644 --- a/internal/testutil/redis.go +++ b/internal/testutil/redis.go @@ -85,6 +85,96 @@ func WithTestRedis(useTLS bool, handler func(rawURL string) error) error { return e } +// WithTestRedisCluster creates a new redis cluster 3 node cluster. +func WithTestRedisCluster(handler func(rawURL string) error) error { + ctx, clearTimeout := context.WithTimeout(context.Background(), maxWait) + defer clearTimeout() + + // uses a sensible default on windows (tcp/http) and linux/osx (socket) + pool, err := dockertest.NewPool("") + if err != nil { + return err + } + + redises := make([]*dockertest.Resource, 3) + for i := range redises { + conf := "cluster-enabled yes\ncluster-config-file nodes.conf" + r, err := pool.RunWithOptions(&dockertest.RunOptions{ + Hostname: fmt.Sprintf("redis%d", i), + Repository: "redis", + Tag: "6", + Entrypoint: []string{ + "/bin/bash", "-c", + `echo "` + conf + `" >/tmp/redis.conf && chmod 0777 /tmp/redis.conf && exec docker-entrypoint.sh /tmp/redis.conf`, + }, + ExposedPorts: []string{ + "6379/tcp", + "26379/tcp", + }, + }) + if err != nil { + return err + } + defer r.Close() + _ = r.Expire(uint(maxWait.Seconds())) + + go func() { + _ = pool.Client.Logs(docker.LogsOptions{ + Context: ctx, + Stderr: true, + Stdout: true, + Follow: true, + Timestamps: true, + Container: r.Container.ID, + OutputStream: os.Stderr, + ErrorStream: os.Stderr, + }) + }() + + redises[i] = r + } + addrs := make([]string, 3) + for i, r := range redises { + addrs[i] = net.JoinHostPort( + r.Container.NetworkSettings.IPAddress, + "6379", + ) + } + + for _, addr := range addrs { + err := pool.Retry(func() error { + options, err := redis.ParseURL(fmt.Sprintf("redis://%s/0", addr)) + if err != nil { + return err + } + + client := redis.NewClient(options) + defer client.Close() + + return client.Ping(ctx).Err() + }) + if err != nil { + return err + } + } + + // join the nodes to the cluster + err = bootstrapRedisCluster(ctx, redises) + if err != nil { + return err + } + + e := handler(fmt.Sprintf("redis+cluster://%s", strings.Join(addrs, ","))) + + for _, r := range redises { + if err := pool.Purge(r); err != nil { + return err + } + } + + return e +} + // WithTestRedisSentinel creates a new redis sentinel 3 node cluster. func WithTestRedisSentinel(handler func(rawURL string) error) error { ctx, clearTimeout := context.WithTimeout(context.Background(), maxWait) @@ -167,7 +257,7 @@ func WithTestRedisSentinel(handler func(rawURL string) error) error { ) } - redisURL := fmt.Sprintf("redis-sentinel://%s/master/0", strings.Join(addrs, ",")) + redisURL := fmt.Sprintf("redis+sentinel://%s/master/0", strings.Join(addrs, ",")) for _, r := range redises { addr := net.JoinHostPort( @@ -239,3 +329,61 @@ func RedisTLSConfig() *tls.Config { } return tlsConfig } + +func bootstrapRedisCluster(ctx context.Context, resources []*dockertest.Resource) error { + clients := make([]redis.UniversalClient, len(resources)) + for i, r := range resources { + addr := net.JoinHostPort(r.Container.NetworkSettings.IPAddress, "6379") + options, err := redis.ParseURL(fmt.Sprintf("redis://%s/0", addr)) + if err != nil { + return err + } + clients[i] = redis.NewClient(options) + defer func() { _ = clients[i].Close() }() + + if i > 0 { + err := clients[i].ClusterMeet(ctx, resources[0].Container.NetworkSettings.IPAddress, "6379").Err() + if err != nil { + return err + } + } + } + + // set slots + const redisSlotCount = 16384 + assignments := make([][]int, len(resources)) + for i := 0; i < redisSlotCount; i++ { + assignments[i%len(assignments)] = append(assignments[i%len(assignments)], i) + } + for i, c := range clients { + err := c.ClusterAddSlots(ctx, assignments[i]...).Err() + if err != nil { + return err + } + } + + // wait for ready + ticker := time.NewTicker(time.Millisecond * 50) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + + ready := 0 + for _, c := range clients { + str, err := c.ClusterInfo(ctx).Result() + if err != nil { + return err + } + if strings.Contains(str, "cluster_state:ok") { + ready++ + } + } + if ready == len(clients) { + return nil + } + } +} diff --git a/pkg/storage/redis/client.go b/pkg/storage/redis/client.go index bdd33a409..93009e25f 100644 --- a/pkg/storage/redis/client.go +++ b/pkg/storage/redis/client.go @@ -10,16 +10,47 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/scylladb/go-set" ) -func newClientFromURL(rawurl string, tlsConfig *tls.Config) (*redis.Client, error) { +var ( + standardSchemes = set.NewStringSet("redis", "rediss", "unix") + clusterSchemes = set.NewStringSet( + "redis+cluster", "redis-cluster", + "rediss+cluster", "rediss-cluster", + "redis+clusters", "redis-clusters", + ) + sentinelSchemes = set.NewStringSet( + "redis+sentinel", "redis-sentinel", + "rediss+sentinel", "rediss-sentinel", + "redis+sentinels", "redis-sentinels", + ) + sentinelClusterSchemes = set.NewStringSet( + "redis+sentinel+cluster", "redis-sentinel-cluster", + "rediss+sentinel+cluster", "rediss-sentinel-cluster", + "redis+sentinels+cluster", "redis-sentinels-cluster", + "redis+sentinel+clusters", "redis-sentinel-clusters", + ) + tlsSchemes = set.NewStringSet( + "rediss", + "rediss+cluster", "rediss-cluster", + "redis+clusters", "redis-clusters", + "rediss+sentinel", "rediss-sentinel", + "redis+sentinels", "redis-sentinels", + "rediss+sentinel+cluster", "rediss-sentinel-cluster", + "redis+sentinels+cluster", "redis-sentinels-cluster", + "redis+sentinel+clusters", "redis-sentinel-clusters", + ) +) + +func newClientFromURL(rawurl string, tlsConfig *tls.Config) (redis.UniversalClient, error) { u, err := url.Parse(rawurl) if err != nil { return nil, err } - switch u.Scheme { - case "redis", "rediss", "unix": + switch { + case standardSchemes.Has(u.Scheme): opts, err := redis.ParseURL(rawurl) if err != nil { return nil, err @@ -30,7 +61,17 @@ func newClientFromURL(rawurl string, tlsConfig *tls.Config) (*redis.Client, erro } return redis.NewClient(opts), nil - case "redis-sentinel", "rediss-sentinel", "redis-sentinels": + case clusterSchemes.Has(u.Scheme): + opts, err := ParseClusterURL(rawurl) + if err != nil { + return nil, err + } + if opts.TLSConfig != nil { + opts.TLSConfig = tlsConfig + } + return redis.NewClusterClient(opts), nil + + case sentinelSchemes.Has(u.Scheme): opts, err := ParseSentinelURL(rawurl) if err != nil { return nil, err @@ -40,16 +81,129 @@ func newClientFromURL(rawurl string, tlsConfig *tls.Config) (*redis.Client, erro } return redis.NewFailoverClient(opts), nil + case sentinelClusterSchemes.Has(u.Scheme): + opts, err := ParseSentinelURL(rawurl) + if err != nil { + return nil, err + } + if opts.TLSConfig != nil { + opts.TLSConfig = tlsConfig + } + return redis.NewFailoverClusterClient(opts), nil + default: return nil, fmt.Errorf("unsupported URL scheme: %s", u.Scheme) } } +// ParseClusterURL parses a redis-cluster URL. Format is: +// +// redis+cluster://[username:password@]host:port[,host2:port2,...]/[?param1=value1[¶m2=value=2&...]] +// +// Additionally TLS is supported with rediss+cluster, or redis+clusters. Supported query params: +// +// max_redirects: int +// read_only: bool +// route_by_latency: bool +// route_randomly: bool +// max_retries: int +// min_retry_backoff: duration +// max_retry_backoff: duration +// dial_timeout: duration +// read_timeout: duration +// write_timeout: duration +// pool_size: int +// min_idle_conns: int +// max_conn_age: duration +// pool_timeout: duration +// idle_timeout: duration +// idle_check_frequency: duration +// +func ParseClusterURL(rawurl string) (*redis.ClusterOptions, error) { + u, err := url.Parse(rawurl) + if err != nil { + return nil, err + } + + opts := new(redis.ClusterOptions) + + hostParts := strings.Split(u.Host, ",") + for _, hostPart := range hostParts { + host, port, err := net.SplitHostPort(hostPart) + if err != nil { + host = hostPart + port = "6379" + } + opts.Addrs = append(opts.Addrs, + net.JoinHostPort(host, port)) + } + + q := u.Query() + if err := parseIntParam(&opts.MaxRedirects, q, "max_redirects"); err != nil { + return nil, err + } + if err := parseBoolParam(&opts.ReadOnly, q, "read_only"); err != nil { + return nil, err + } + if err := parseBoolParam(&opts.RouteByLatency, q, "route_by_latency"); err != nil { + return nil, err + } + if err := parseBoolParam(&opts.RouteRandomly, q, "route_randomly"); err != nil { + return nil, err + } + if ui := u.User; ui != nil { + opts.Username = ui.Username() + opts.Password, _ = ui.Password() + } + if err := parseIntParam(&opts.MaxRetries, q, "max_retries"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.MinRetryBackoff, q, "min_retry_backoff"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.MaxRetryBackoff, q, "max_retry_backoff"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.DialTimeout, q, "dial_timeout"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.ReadTimeout, q, "read_timeout"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.WriteTimeout, q, "write_timeout"); err != nil { + return nil, err + } + if err := parseIntParam(&opts.PoolSize, q, "pool_size"); err != nil { + return nil, err + } + if err := parseIntParam(&opts.MinIdleConns, q, "min_idle_conns"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.MaxConnAge, q, "max_conn_age"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.PoolTimeout, q, "pool_timeout"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.IdleTimeout, q, "idle_timeout"); err != nil { + return nil, err + } + if err := parseDurationParam(&opts.IdleCheckFrequency, q, "idle_check_frequency"); err != nil { + return nil, err + } + + if tlsSchemes.Has(u.Scheme) { + opts.TLSConfig = &tls.Config{} //nolint + } + + return opts, nil +} + // ParseSentinelURL parses a redis-sentinel URL. Format is based on https://github.com/exponea/redis-sentinel-url: // // redis+sentinel://[:password@]host:port[,host2:port2,...][/service_name[/db]][?param1=value1[¶m2=value=2&...]] // -// Additionally TLS is supported with rediss-sentinel, or redis-sentinels. Supported query params: +// Additionally TLS is supported with rediss+sentinel, or redis+sentinels. Supported query params: // // slave_only: bool // use_disconnected_slaves: bool @@ -107,6 +261,12 @@ func ParseSentinelURL(rawurl string) (*redis.FailoverOptions, error) { if err := parseBoolParam(&opts.SlaveOnly, q, "slave_only"); err != nil { return nil, err } + if err := parseBoolParam(&opts.RouteByLatency, q, "route_by_latency"); err != nil { + return nil, err + } + if err := parseBoolParam(&opts.RouteRandomly, q, "route_randomly"); err != nil { + return nil, err + } if err := parseBoolParam(&opts.UseDisconnectedSlaves, q, "use_disconnected_slaves"); err != nil { return nil, err } @@ -152,7 +312,7 @@ func ParseSentinelURL(rawurl string) (*redis.FailoverOptions, error) { return nil, err } - if u.Scheme == "rediss-sentinel" || u.Scheme == "redis-sentinels" { + if tlsSchemes.Has(u.Scheme) { opts.TLSConfig = &tls.Config{} //nolint } diff --git a/pkg/storage/redis/client_test.go b/pkg/storage/redis/client_test.go index a92fbe4c6..cf19f1bab 100644 --- a/pkg/storage/redis/client_test.go +++ b/pkg/storage/redis/client_test.go @@ -9,6 +9,43 @@ import ( "github.com/stretchr/testify/require" ) +func TestParseClusterURL(t *testing.T) { + opts, err := ParseClusterURL("redis+cluster://CLUSTER_USERNAME:CLUSTER_PASSWORD@localhost:26379,otherhost:26479/?" + (&url.Values{ + "read_only": {"true"}, + "username": {"USERNAME"}, + "password": {"PASSWORD"}, + "max_retries": {"11"}, + "min_retry_backoff": {"31s"}, + "max_retry_backoff": {"22m"}, + "dial_timeout": {"3m"}, + "read_timeout": {"4m"}, + "write_timeout": {"5m"}, + "pool_size": {"7"}, + "min_idle_conns": {"2"}, + "max_conn_age": {"1h"}, + "pool_timeout": {"30m"}, + "idle_timeout": {"31m"}, + "idle_check_frequency": {"32m"}, + }).Encode()) + require.NoError(t, err) + assert.Equal(t, []string{"localhost:26379", "otherhost:26479"}, opts.Addrs) + assert.Equal(t, "CLUSTER_USERNAME", opts.Username) + assert.Equal(t, "CLUSTER_PASSWORD", opts.Password) + assert.True(t, opts.ReadOnly) + assert.Equal(t, 11, opts.MaxRetries) + assert.Equal(t, time.Second*31, opts.MinRetryBackoff) + assert.Equal(t, time.Minute*22, opts.MaxRetryBackoff) + assert.Equal(t, time.Minute*3, opts.DialTimeout) + assert.Equal(t, time.Minute*4, opts.ReadTimeout) + assert.Equal(t, time.Minute*5, opts.WriteTimeout) + assert.Equal(t, 7, opts.PoolSize) + assert.Equal(t, 2, opts.MinIdleConns) + assert.Equal(t, time.Hour, opts.MaxConnAge) + assert.Equal(t, time.Minute*30, opts.PoolTimeout) + assert.Equal(t, time.Minute*31, opts.IdleTimeout) + assert.Equal(t, time.Minute*32, opts.IdleCheckFrequency) +} + func TestParseSentinelURL(t *testing.T) { opts, err := ParseSentinelURL("redis+sentinel://:SENTINEL_PASSWORD@localhost:26379,otherhost:26479/mymaster/3?" + (&url.Values{ "slave_only": {"true"}, diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index 4f6c0c5d3..99fa0c3a5 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -25,10 +25,12 @@ const ( maxTransactionRetries = 100 watchPollInterval = 30 * time.Second - lastVersionKey = "pomerium.last_version" - lastVersionChKey = "pomerium.last_version_ch" - recordHashKey = "pomerium.records" - changesSetKey = "pomerium.changes" + // we rely on transactions in redis, so all redis-cluster keys need to be + // on the same node. Using a `hash tag` gives us this capability. + lastVersionKey = "{pomerium}.last_version" + lastVersionChKey = "{pomerium}.last_version_ch" + recordHashKey = "{pomerium}.records" + changesSetKey = "{pomerium}.changes" ) // custom errors @@ -40,7 +42,7 @@ var ( type Backend struct { cfg *config - client *redis.Client + client redis.UniversalClient onChange *signal.Signal closeOnce sync.Once diff --git a/pkg/storage/redis/redis_test.go b/pkg/storage/redis/redis_test.go index 90d51982d..a79bc32ab 100644 --- a/pkg/storage/redis/redis_test.go +++ b/pkg/storage/redis/redis_test.go @@ -91,6 +91,12 @@ func TestBackend(t *testing.T) { })) }) + t.Run("cluster", func(t *testing.T) { + require.NoError(t, testutil.WithTestRedisCluster(func(rawURL string) error { + return handler(t, false, rawURL) + })) + }) + t.Run("sentinel", func(t *testing.T) { require.NoError(t, testutil.WithTestRedisSentinel(func(rawURL string) error { return handler(t, false, rawURL)