Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions router/core/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,9 +832,13 @@ func (r *Router) bootstrap(ctx context.Context) error {
if r.Config.rateLimit != nil && r.Config.rateLimit.Enabled {
var err error
r.redisClient, err = rd.NewRedisCloser(&rd.RedisCloserOptions{
URLs: r.Config.rateLimit.Storage.URLs,
ClusterEnabled: r.Config.rateLimit.Storage.ClusterEnabled,
Logger: r.logger,
URLs: r.Config.rateLimit.Storage.URLs,
ClusterEnabled: r.Config.rateLimit.Storage.ClusterEnabled,
SentinelEnabled: r.Config.rateLimit.Storage.SentinelEnabled,
MasterName: r.Config.rateLimit.Storage.MasterName,
SentinelAddrs: r.Config.rateLimit.Storage.SentinelAddrs,
SentinelPassword: r.Config.rateLimit.Storage.SentinelPassword,
Logger: r.logger,
})
if err != nil {
return fmt.Errorf("failed to create redis client: %w", err)
Expand Down
157 changes: 118 additions & 39 deletions router/internal/rediscloser/rediscloser.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,60 +21,139 @@ type RedisCloserOptions struct {
URLs []string
ClusterEnabled bool
Password string

// Redis Sentinel configuration
SentinelEnabled bool
MasterName string
SentinelAddrs []string
SentinelPassword string
}

func NewRedisCloser(opts *RedisCloserOptions) (RDCloser, error) {
if len(opts.URLs) == 0 {
return nil, fmt.Errorf("no redis URLs provided")
if opts == nil {
return nil, fmt.Errorf("nil RedisCloserOptions")
}
if opts.Logger == nil {
opts.Logger = zap.NewNop()
}
if err := validateRedisConfig(opts); err != nil {
return nil, err
}

var rdb RDCloser
// If provided, we create a cluster client
if opts.ClusterEnabled {
opts.Logger.Info("Detected that redis is running in cluster mode.")
switch {
case opts.SentinelEnabled:
return createSentinelClient(opts)
case opts.ClusterEnabled:
return createClusterClient(opts)
default:
return createDirectClient(opts)
}
}

// Parse the first URL to get the cluster options. We assume that the first URL provided is the primary URL
// and append further URLs as secondary addr params to the URL, as required by the go-redis library.
// e.g. redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
parsedUrl, err := url.Parse(opts.URLs[0])
if err != nil {
return nil, fmt.Errorf("failed to parse the redis connection url: %w", err)
// validateRedisConfig validates the Redis configuration options
func validateRedisConfig(opts *RedisCloserOptions) error {
switch {
case opts.SentinelEnabled:
if opts.MasterName == "" {
return fmt.Errorf("master_name is required when sentinel_enabled is true")
}

// This operates on the URL query, and if there are more urls, appends them to the addr param
addClusterUrlsToQuery(opts, parsedUrl)
// Parse the cluster URL using the library method, to pick up all of the options encoded
clusterOps, err := redis.ParseClusterURL(parsedUrl.String())

if err != nil {
return nil, fmt.Errorf("failed to parse the redis connection url into ops: %w", err)
if len(opts.SentinelAddrs) == 0 {
return fmt.Errorf("sentinel_addrs is required when sentinel_enabled is true")
}
if opts.Password != "" {
// If they explicitly provide a password, assume that it's overwriting the URL password or that none was
// provided in the URL
clusterOps.Password = opts.Password
if opts.ClusterEnabled {
return fmt.Errorf("cannot enable both sentinel_enabled and cluster_enabled")
}

rdb = redis.NewClusterClient(clusterOps)
} else {
urlEncodedOpts, err := redis.ParseURL(opts.URLs[0])
if err != nil {
return nil, fmt.Errorf("failed to parse the redis connection url: %w", err)
case opts.ClusterEnabled:
if len(opts.URLs) == 0 {
return fmt.Errorf("urls is required when cluster_enabled is true")
}
if opts.Password != "" {
// If they explicitly provide a password, assume that it's overwriting the URL password or that none was
// provided in the URL
urlEncodedOpts.Password = opts.Password
default:
if len(opts.URLs) == 0 {
return fmt.Errorf("urls is required for direct Redis")
}
rdb = redis.NewClient(urlEncodedOpts)
}
return nil
}

if isClusterClient(rdb) {
opts.Logger.Warn("Detected that redis is running in cluster mode. You may encounter issues as a result")
}
// createSentinelClient creates a Redis sentinel client
func createSentinelClient(opts *RedisCloserOptions) (RDCloser, error) {
opts.Logger.Info("Creating Redis client in sentinel mode.",
zap.String("master_name", opts.MasterName),
zap.Int("sentinel_count", len(opts.SentinelAddrs)))

rdb := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: opts.MasterName,
SentinelAddrs: opts.SentinelAddrs,
SentinelPassword: opts.SentinelPassword,
Password: opts.Password,
})

if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning {
_ = rdb.Close()
return rdb, fmt.Errorf("failed to create a functioning Redis sentinel client: %w", err)
}

return rdb, nil
}

// createClusterClient creates a Redis cluster client
func createClusterClient(opts *RedisCloserOptions) (RDCloser, error) {
opts.Logger.Info("Creating Redis client in cluster mode.")

// Parse the first URL to get the cluster options. We assume that the first URL provided is the primary URL
// and append further URLs as secondary addr params to the URL, as required by the go-redis library.
// e.g. redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
parsedUrl, err := url.Parse(opts.URLs[0])
if err != nil {
return nil, fmt.Errorf("failed to parse the redis connection url: %w", err)
}

// This operates on the URL query, and if there are more urls, appends them to the addr param
addClusterUrlsToQuery(opts, parsedUrl)
// Parse the cluster URL using the library method, to pick up all of the options encoded
clusterOps, err := redis.ParseClusterURL(parsedUrl.String())

if err != nil {
return nil, fmt.Errorf("failed to parse the redis connection url into ops: %w", err)
}
if opts.Password != "" {
// If they explicitly provide a password, assume that it's overwriting the URL password or that none was
// provided in the URL
clusterOps.Password = opts.Password
}

rdb := redis.NewClusterClient(clusterOps)

if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning {
_ = rdb.Close()
return rdb, fmt.Errorf("failed to create a functioning Redis cluster client: %w", err)
}

return rdb, nil
}

// createDirectClient creates a direct Redis client (single instance or master-slave without automatic failover)
func createDirectClient(opts *RedisCloserOptions) (RDCloser, error) {
opts.Logger.Info("Creating Redis client in direct mode.")

urlEncodedOpts, err := redis.ParseURL(opts.URLs[0])
if err != nil {
return nil, fmt.Errorf("failed to parse the redis connection url: %w", err)
}
if opts.Password != "" {
// If they explicitly provide a password, assume that it's overwriting the URL password or that none was
// provided in the URL
urlEncodedOpts.Password = opts.Password
}
rdb := redis.NewClient(urlEncodedOpts)

if isClusterClient(rdb) {
opts.Logger.Warn("Detected that redis is running in cluster mode. You may encounter issues as a result")
}

if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning {
return rdb, fmt.Errorf("failed to create a functioning redis client with the provided URLs: %w", err)
_ = rdb.Close()
return rdb, fmt.Errorf("failed to create a functioning Redis direct client: %w", err)
}

return rdb, nil
Expand Down
180 changes: 178 additions & 2 deletions router/internal/rediscloser/rediscloser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestRedisCloser(t *testing.T) {
})

require.Error(t, err)
require.ErrorContains(t, err, "no redis URLs provided")
require.ErrorContains(t, err, "urls is required for direct Redis")
})

t.Run("Creates default client for normal redis", func(t *testing.T) {
Expand Down Expand Up @@ -62,6 +62,182 @@ func TestRedisCloser(t *testing.T) {
})

require.Error(t, err)
require.ErrorContains(t, err, "failed to create a functioning redis client")
require.ErrorContains(t, err, "failed to create a functioning Redis direct client")
})
}

func TestRedisCloser_SentinelMode(t *testing.T) {
t.Parallel()

t.Run("validates sentinel configuration", func(t *testing.T) {
testCases := []struct {
name string
opts *RedisCloserOptions
expectedErr string
}{
{
name: "missing master name",
opts: &RedisCloserOptions{
Logger: zaptest.NewLogger(t),
SentinelEnabled: true,
SentinelAddrs: []string{"127.0.0.1:26379"},
},
expectedErr: "master_name is required when sentinel_enabled is true",
},
{
name: "missing sentinel addresses",
opts: &RedisCloserOptions{
Logger: zaptest.NewLogger(t),
SentinelEnabled: true,
MasterName: "mymaster",
},
expectedErr: "sentinel_addrs is required when sentinel_enabled is true",
},
{
name: "sentinel and cluster both enabled",
opts: &RedisCloserOptions{
Logger: zaptest.NewLogger(t),
SentinelEnabled: true,
ClusterEnabled: true,
MasterName: "mymaster",
SentinelAddrs: []string{"127.0.0.1:26379"},
},
expectedErr: "cannot enable both sentinel_enabled and cluster_enabled",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, err := NewRedisCloser(tc.opts)
require.Error(t, err)
require.ErrorContains(t, err, tc.expectedErr)
})
}
})

t.Run("creates sentinel client with valid config", func(t *testing.T) {
// Note: This test will fail to connect since we don't have a real sentinel,
// but it validates the configuration parsing and client creation logic
_, err := NewRedisCloser(&RedisCloserOptions{
Logger: zaptest.NewLogger(t),
SentinelEnabled: true,
MasterName: "mymaster",
SentinelAddrs: []string{"127.0.0.1:26379", "127.0.0.1:26380"},
SentinelPassword: "sentinel_pass",
Password: "redis_pass",
})

// We expect this to fail with a connection error since no real sentinel is running
require.Error(t, err)
require.ErrorContains(t, err, "failed to create a functioning Redis sentinel client")
})

t.Run("handles single sentinel address", func(t *testing.T) {
_, err := NewRedisCloser(&RedisCloserOptions{
Logger: zaptest.NewLogger(t),
SentinelEnabled: true,
MasterName: "mymaster",
SentinelAddrs: []string{"127.0.0.1:26379"},
})

// We expect this to fail with a connection error since no real sentinel is running
require.Error(t, err)
require.ErrorContains(t, err, "failed to create a functioning Redis sentinel client")
})
}

func TestValidateRedisConfig(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
opts *RedisCloserOptions
expectError bool
errorMsg string
}{
{
name: "valid direct config",
opts: &RedisCloserOptions{
URLs: []string{"redis://localhost:6379"},
},
expectError: false,
},
{
name: "valid cluster config",
opts: &RedisCloserOptions{
ClusterEnabled: true,
URLs: []string{"redis://localhost:6379"},
},
expectError: false,
},
{
name: "valid sentinel config",
opts: &RedisCloserOptions{
SentinelEnabled: true,
MasterName: "mymaster",
SentinelAddrs: []string{"127.0.0.1:26379"},
},
expectError: false,
},
{
name: "missing URLs for direct",
opts: &RedisCloserOptions{
URLs: []string{},
},
expectError: true,
errorMsg: "urls is required for direct Redis",
},
{
name: "missing URLs for cluster",
opts: &RedisCloserOptions{
ClusterEnabled: true,
URLs: []string{},
},
expectError: true,
errorMsg: "urls is required when cluster_enabled is true",
},
{
name: "sentinel without master name",
opts: &RedisCloserOptions{
SentinelEnabled: true,
SentinelAddrs: []string{"127.0.0.1:26379"},
},
expectError: true,
errorMsg: "master_name is required when sentinel_enabled is true",
},
{
name: "sentinel without addresses",
opts: &RedisCloserOptions{
SentinelEnabled: true,
MasterName: "mymaster",
SentinelAddrs: []string{},
},
expectError: true,
errorMsg: "sentinel_addrs is required when sentinel_enabled is true",
},
{
name: "both sentinel and cluster enabled",
opts: &RedisCloserOptions{
SentinelEnabled: true,
ClusterEnabled: true,
MasterName: "mymaster",
SentinelAddrs: []string{"127.0.0.1:26379"},
URLs: []string{"redis://localhost:6379"},
},
expectError: true,
errorMsg: "cannot enable both sentinel_enabled and cluster_enabled",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := validateRedisConfig(tc.opts)
if tc.expectError {
require.Error(t, err)
require.ErrorContains(t, err, tc.errorMsg)
} else {
require.NoError(t, err)
}
})
}
}
Loading