redis: add redis cluster support (#1992)

* redis: add redis cluster support

* redis: update docs
This commit is contained in:
Caleb Doxsey 2021-03-17 13:48:41 -06:00 committed by GitHub
parent 0b1e89925a
commit 77fe37c8c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 386 additions and 12 deletions

View file

@ -952,6 +952,14 @@ The backend storage that databroker server will use.
The connection string that the databroker service will use to connect to storage backend.
For `redis`, the following URL types are supported:
- simple: `redis://{username}:{password}@{host}:{port}/{db}`
- sentinel: `redis+sentinel://[:password@]host:port[,host2:port2,...]/[master_name[/db]][?param1=value1[&param2=value2&...]]`
- cluster: `redis+cluster://[username:password@]host:port[,host2:port2,...]/[?param1=value1[&param2=value=2&...]]`
You can also enable TLS with `rediss://`, `rediss+sentinel://` and `rediss+cluster://`.
### Data Broker Storage Certificate File
- Environment Variable: `DATABROKER_STORAGE_CERT_FILE`

View file

@ -1071,6 +1071,14 @@ settings:
- Example: `"redis://localhost:6379/0"`, `"rediss://localhost:6379/0"`
doc: |
The connection string that the databroker service will use to connect to storage backend.
For `redis`, the following URL types are supported:
- simple: `redis://[username:password@]host:port/[db]`
- sentinel: `redis+sentinel://[:password@]host:port[,host2:port2,...]/[master_name[/db]][?param1=value1[&param2=value2&...]]`
- cluster: `redis+cluster://[username:password@]host:port[,host2:port2,...]/[?param1=value1[&param2=value=2&...]]`
You can also enable TLS with `rediss://`, `rediss+sentinel://` and `rediss+cluster://`.
- name: "Data Broker Storage Certificate File"
keys: ["databroker_storage_cert_file"]
attributes: |

1
go.mod
View file

@ -48,6 +48,7 @@ require (
github.com/rjeczalik/notify v0.9.3-0.20201210012515-e2a77dcc14cf
github.com/rs/cors v1.7.0
github.com/rs/zerolog v1.20.0
github.com/scylladb/go-set v1.0.2
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
github.com/spf13/afero v1.5.1 // indirect
github.com/spf13/cast v1.3.1 // indirect

4
go.sum
View file

@ -177,6 +177,8 @@ github.com/envoyproxy/protoc-gen-validate v0.4.1 h1:7dLaJvASGRD7X49jSCSXXHwKPm0Z
github.com/envoyproxy/protoc-gen-validate v0.4.1/go.mod h1:E+IEazqdaWv3FrnGtZIu3b9fPFMK8AzeTTrk9SfVwWs=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA=
github.com/fatih/set v0.2.1/go.mod h1:+RKtMCH+favT2+3YecHGxcc0b4KyVWA1QWWJUs4E0CI=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
@ -549,6 +551,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/scylladb/go-set v1.0.2 h1:SkvlMCKhP0wyyct6j+0IHJkBkSZL+TDzZ4E7f7BCcRE=
github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=

View file

@ -85,6 +85,96 @@ func WithTestRedis(useTLS bool, handler func(rawURL string) error) error {
return e
}
// WithTestRedisCluster creates a new redis cluster 3 node cluster.
func WithTestRedisCluster(handler func(rawURL string) error) error {
ctx, clearTimeout := context.WithTimeout(context.Background(), maxWait)
defer clearTimeout()
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
pool, err := dockertest.NewPool("")
if err != nil {
return err
}
redises := make([]*dockertest.Resource, 3)
for i := range redises {
conf := "cluster-enabled yes\ncluster-config-file nodes.conf"
r, err := pool.RunWithOptions(&dockertest.RunOptions{
Hostname: fmt.Sprintf("redis%d", i),
Repository: "redis",
Tag: "6",
Entrypoint: []string{
"/bin/bash", "-c",
`echo "` + conf + `" >/tmp/redis.conf && chmod 0777 /tmp/redis.conf && exec docker-entrypoint.sh /tmp/redis.conf`,
},
ExposedPorts: []string{
"6379/tcp",
"26379/tcp",
},
})
if err != nil {
return err
}
defer r.Close()
_ = r.Expire(uint(maxWait.Seconds()))
go func() {
_ = pool.Client.Logs(docker.LogsOptions{
Context: ctx,
Stderr: true,
Stdout: true,
Follow: true,
Timestamps: true,
Container: r.Container.ID,
OutputStream: os.Stderr,
ErrorStream: os.Stderr,
})
}()
redises[i] = r
}
addrs := make([]string, 3)
for i, r := range redises {
addrs[i] = net.JoinHostPort(
r.Container.NetworkSettings.IPAddress,
"6379",
)
}
for _, addr := range addrs {
err := pool.Retry(func() error {
options, err := redis.ParseURL(fmt.Sprintf("redis://%s/0", addr))
if err != nil {
return err
}
client := redis.NewClient(options)
defer client.Close()
return client.Ping(ctx).Err()
})
if err != nil {
return err
}
}
// join the nodes to the cluster
err = bootstrapRedisCluster(ctx, redises)
if err != nil {
return err
}
e := handler(fmt.Sprintf("redis+cluster://%s", strings.Join(addrs, ",")))
for _, r := range redises {
if err := pool.Purge(r); err != nil {
return err
}
}
return e
}
// WithTestRedisSentinel creates a new redis sentinel 3 node cluster.
func WithTestRedisSentinel(handler func(rawURL string) error) error {
ctx, clearTimeout := context.WithTimeout(context.Background(), maxWait)
@ -167,7 +257,7 @@ func WithTestRedisSentinel(handler func(rawURL string) error) error {
)
}
redisURL := fmt.Sprintf("redis-sentinel://%s/master/0", strings.Join(addrs, ","))
redisURL := fmt.Sprintf("redis+sentinel://%s/master/0", strings.Join(addrs, ","))
for _, r := range redises {
addr := net.JoinHostPort(
@ -239,3 +329,61 @@ func RedisTLSConfig() *tls.Config {
}
return tlsConfig
}
func bootstrapRedisCluster(ctx context.Context, resources []*dockertest.Resource) error {
clients := make([]redis.UniversalClient, len(resources))
for i, r := range resources {
addr := net.JoinHostPort(r.Container.NetworkSettings.IPAddress, "6379")
options, err := redis.ParseURL(fmt.Sprintf("redis://%s/0", addr))
if err != nil {
return err
}
clients[i] = redis.NewClient(options)
defer func() { _ = clients[i].Close() }()
if i > 0 {
err := clients[i].ClusterMeet(ctx, resources[0].Container.NetworkSettings.IPAddress, "6379").Err()
if err != nil {
return err
}
}
}
// set slots
const redisSlotCount = 16384
assignments := make([][]int, len(resources))
for i := 0; i < redisSlotCount; i++ {
assignments[i%len(assignments)] = append(assignments[i%len(assignments)], i)
}
for i, c := range clients {
err := c.ClusterAddSlots(ctx, assignments[i]...).Err()
if err != nil {
return err
}
}
// wait for ready
ticker := time.NewTicker(time.Millisecond * 50)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
ready := 0
for _, c := range clients {
str, err := c.ClusterInfo(ctx).Result()
if err != nil {
return err
}
if strings.Contains(str, "cluster_state:ok") {
ready++
}
}
if ready == len(clients) {
return nil
}
}
}

View file

@ -10,16 +10,47 @@ import (
"time"
"github.com/go-redis/redis/v8"
"github.com/scylladb/go-set"
)
func newClientFromURL(rawurl string, tlsConfig *tls.Config) (*redis.Client, error) {
var (
standardSchemes = set.NewStringSet("redis", "rediss", "unix")
clusterSchemes = set.NewStringSet(
"redis+cluster", "redis-cluster",
"rediss+cluster", "rediss-cluster",
"redis+clusters", "redis-clusters",
)
sentinelSchemes = set.NewStringSet(
"redis+sentinel", "redis-sentinel",
"rediss+sentinel", "rediss-sentinel",
"redis+sentinels", "redis-sentinels",
)
sentinelClusterSchemes = set.NewStringSet(
"redis+sentinel+cluster", "redis-sentinel-cluster",
"rediss+sentinel+cluster", "rediss-sentinel-cluster",
"redis+sentinels+cluster", "redis-sentinels-cluster",
"redis+sentinel+clusters", "redis-sentinel-clusters",
)
tlsSchemes = set.NewStringSet(
"rediss",
"rediss+cluster", "rediss-cluster",
"redis+clusters", "redis-clusters",
"rediss+sentinel", "rediss-sentinel",
"redis+sentinels", "redis-sentinels",
"rediss+sentinel+cluster", "rediss-sentinel-cluster",
"redis+sentinels+cluster", "redis-sentinels-cluster",
"redis+sentinel+clusters", "redis-sentinel-clusters",
)
)
func newClientFromURL(rawurl string, tlsConfig *tls.Config) (redis.UniversalClient, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
switch u.Scheme {
case "redis", "rediss", "unix":
switch {
case standardSchemes.Has(u.Scheme):
opts, err := redis.ParseURL(rawurl)
if err != nil {
return nil, err
@ -30,7 +61,17 @@ func newClientFromURL(rawurl string, tlsConfig *tls.Config) (*redis.Client, erro
}
return redis.NewClient(opts), nil
case "redis-sentinel", "rediss-sentinel", "redis-sentinels":
case clusterSchemes.Has(u.Scheme):
opts, err := ParseClusterURL(rawurl)
if err != nil {
return nil, err
}
if opts.TLSConfig != nil {
opts.TLSConfig = tlsConfig
}
return redis.NewClusterClient(opts), nil
case sentinelSchemes.Has(u.Scheme):
opts, err := ParseSentinelURL(rawurl)
if err != nil {
return nil, err
@ -40,16 +81,129 @@ func newClientFromURL(rawurl string, tlsConfig *tls.Config) (*redis.Client, erro
}
return redis.NewFailoverClient(opts), nil
case sentinelClusterSchemes.Has(u.Scheme):
opts, err := ParseSentinelURL(rawurl)
if err != nil {
return nil, err
}
if opts.TLSConfig != nil {
opts.TLSConfig = tlsConfig
}
return redis.NewFailoverClusterClient(opts), nil
default:
return nil, fmt.Errorf("unsupported URL scheme: %s", u.Scheme)
}
}
// ParseClusterURL parses a redis-cluster URL. Format is:
//
// redis+cluster://[username:password@]host:port[,host2:port2,...]/[?param1=value1[&param2=value=2&...]]
//
// Additionally TLS is supported with rediss+cluster, or redis+clusters. Supported query params:
//
// max_redirects: int
// read_only: bool
// route_by_latency: bool
// route_randomly: bool
// max_retries: int
// min_retry_backoff: duration
// max_retry_backoff: duration
// dial_timeout: duration
// read_timeout: duration
// write_timeout: duration
// pool_size: int
// min_idle_conns: int
// max_conn_age: duration
// pool_timeout: duration
// idle_timeout: duration
// idle_check_frequency: duration
//
func ParseClusterURL(rawurl string) (*redis.ClusterOptions, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, err
}
opts := new(redis.ClusterOptions)
hostParts := strings.Split(u.Host, ",")
for _, hostPart := range hostParts {
host, port, err := net.SplitHostPort(hostPart)
if err != nil {
host = hostPart
port = "6379"
}
opts.Addrs = append(opts.Addrs,
net.JoinHostPort(host, port))
}
q := u.Query()
if err := parseIntParam(&opts.MaxRedirects, q, "max_redirects"); err != nil {
return nil, err
}
if err := parseBoolParam(&opts.ReadOnly, q, "read_only"); err != nil {
return nil, err
}
if err := parseBoolParam(&opts.RouteByLatency, q, "route_by_latency"); err != nil {
return nil, err
}
if err := parseBoolParam(&opts.RouteRandomly, q, "route_randomly"); err != nil {
return nil, err
}
if ui := u.User; ui != nil {
opts.Username = ui.Username()
opts.Password, _ = ui.Password()
}
if err := parseIntParam(&opts.MaxRetries, q, "max_retries"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.MinRetryBackoff, q, "min_retry_backoff"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.MaxRetryBackoff, q, "max_retry_backoff"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.DialTimeout, q, "dial_timeout"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.ReadTimeout, q, "read_timeout"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.WriteTimeout, q, "write_timeout"); err != nil {
return nil, err
}
if err := parseIntParam(&opts.PoolSize, q, "pool_size"); err != nil {
return nil, err
}
if err := parseIntParam(&opts.MinIdleConns, q, "min_idle_conns"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.MaxConnAge, q, "max_conn_age"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.PoolTimeout, q, "pool_timeout"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.IdleTimeout, q, "idle_timeout"); err != nil {
return nil, err
}
if err := parseDurationParam(&opts.IdleCheckFrequency, q, "idle_check_frequency"); err != nil {
return nil, err
}
if tlsSchemes.Has(u.Scheme) {
opts.TLSConfig = &tls.Config{} //nolint
}
return opts, nil
}
// ParseSentinelURL parses a redis-sentinel URL. Format is based on https://github.com/exponea/redis-sentinel-url:
//
// redis+sentinel://[:password@]host:port[,host2:port2,...][/service_name[/db]][?param1=value1[&param2=value=2&...]]
//
// Additionally TLS is supported with rediss-sentinel, or redis-sentinels. Supported query params:
// Additionally TLS is supported with rediss+sentinel, or redis+sentinels. Supported query params:
//
// slave_only: bool
// use_disconnected_slaves: bool
@ -107,6 +261,12 @@ func ParseSentinelURL(rawurl string) (*redis.FailoverOptions, error) {
if err := parseBoolParam(&opts.SlaveOnly, q, "slave_only"); err != nil {
return nil, err
}
if err := parseBoolParam(&opts.RouteByLatency, q, "route_by_latency"); err != nil {
return nil, err
}
if err := parseBoolParam(&opts.RouteRandomly, q, "route_randomly"); err != nil {
return nil, err
}
if err := parseBoolParam(&opts.UseDisconnectedSlaves, q, "use_disconnected_slaves"); err != nil {
return nil, err
}
@ -152,7 +312,7 @@ func ParseSentinelURL(rawurl string) (*redis.FailoverOptions, error) {
return nil, err
}
if u.Scheme == "rediss-sentinel" || u.Scheme == "redis-sentinels" {
if tlsSchemes.Has(u.Scheme) {
opts.TLSConfig = &tls.Config{} //nolint
}

View file

@ -9,6 +9,43 @@ import (
"github.com/stretchr/testify/require"
)
func TestParseClusterURL(t *testing.T) {
opts, err := ParseClusterURL("redis+cluster://CLUSTER_USERNAME:CLUSTER_PASSWORD@localhost:26379,otherhost:26479/?" + (&url.Values{
"read_only": {"true"},
"username": {"USERNAME"},
"password": {"PASSWORD"},
"max_retries": {"11"},
"min_retry_backoff": {"31s"},
"max_retry_backoff": {"22m"},
"dial_timeout": {"3m"},
"read_timeout": {"4m"},
"write_timeout": {"5m"},
"pool_size": {"7"},
"min_idle_conns": {"2"},
"max_conn_age": {"1h"},
"pool_timeout": {"30m"},
"idle_timeout": {"31m"},
"idle_check_frequency": {"32m"},
}).Encode())
require.NoError(t, err)
assert.Equal(t, []string{"localhost:26379", "otherhost:26479"}, opts.Addrs)
assert.Equal(t, "CLUSTER_USERNAME", opts.Username)
assert.Equal(t, "CLUSTER_PASSWORD", opts.Password)
assert.True(t, opts.ReadOnly)
assert.Equal(t, 11, opts.MaxRetries)
assert.Equal(t, time.Second*31, opts.MinRetryBackoff)
assert.Equal(t, time.Minute*22, opts.MaxRetryBackoff)
assert.Equal(t, time.Minute*3, opts.DialTimeout)
assert.Equal(t, time.Minute*4, opts.ReadTimeout)
assert.Equal(t, time.Minute*5, opts.WriteTimeout)
assert.Equal(t, 7, opts.PoolSize)
assert.Equal(t, 2, opts.MinIdleConns)
assert.Equal(t, time.Hour, opts.MaxConnAge)
assert.Equal(t, time.Minute*30, opts.PoolTimeout)
assert.Equal(t, time.Minute*31, opts.IdleTimeout)
assert.Equal(t, time.Minute*32, opts.IdleCheckFrequency)
}
func TestParseSentinelURL(t *testing.T) {
opts, err := ParseSentinelURL("redis+sentinel://:SENTINEL_PASSWORD@localhost:26379,otherhost:26479/mymaster/3?" + (&url.Values{
"slave_only": {"true"},

View file

@ -25,10 +25,12 @@ const (
maxTransactionRetries = 100
watchPollInterval = 30 * time.Second
lastVersionKey = "pomerium.last_version"
lastVersionChKey = "pomerium.last_version_ch"
recordHashKey = "pomerium.records"
changesSetKey = "pomerium.changes"
// we rely on transactions in redis, so all redis-cluster keys need to be
// on the same node. Using a `hash tag` gives us this capability.
lastVersionKey = "{pomerium}.last_version"
lastVersionChKey = "{pomerium}.last_version_ch"
recordHashKey = "{pomerium}.records"
changesSetKey = "{pomerium}.changes"
)
// custom errors
@ -40,7 +42,7 @@ var (
type Backend struct {
cfg *config
client *redis.Client
client redis.UniversalClient
onChange *signal.Signal
closeOnce sync.Once

View file

@ -91,6 +91,12 @@ func TestBackend(t *testing.T) {
}))
})
t.Run("cluster", func(t *testing.T) {
require.NoError(t, testutil.WithTestRedisCluster(func(rawURL string) error {
return handler(t, false, rawURL)
}))
})
t.Run("sentinel", func(t *testing.T) {
require.NoError(t, testutil.WithTestRedisSentinel(func(rawURL string) error {
return handler(t, false, rawURL)