From f0c1810a274f91ff2b10f3c8e18a707c1a5d6b1a Mon Sep 17 00:00:00 2001 From: "y.li" Date: Thu, 9 Oct 2025 14:35:47 +0800 Subject: [PATCH 1/4] feat(redis): add sentinel mode support to RedisCloser --- router/core/router.go | 11 +- router/internal/rediscloser/rediscloser.go | 148 +++++++++++---- .../internal/rediscloser/rediscloser_test.go | 176 ++++++++++++++++++ router/pkg/config/config.go | 6 + router/pkg/config/config.schema.json | 23 ++- 5 files changed, 320 insertions(+), 44 deletions(-) diff --git a/router/core/router.go b/router/core/router.go index 30fd7a08c0..9fb64ab278 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1,4 +1,5 @@ package core +package core import ( "context" @@ -832,9 +833,13 @@ func (r *Router) bootstrap(ctx context.Context) error { if r.Config.rateLimit != nil && r.Config.rateLimit.Enabled { var err error r.redisClient, err = rd.NewRedisCloser(&rd.RedisCloserOptions{ - URLs: r.Config.rateLimit.Storage.URLs, - ClusterEnabled: r.Config.rateLimit.Storage.ClusterEnabled, - Logger: r.logger, + URLs: r.Config.rateLimit.Storage.URLs, + ClusterEnabled: r.Config.rateLimit.Storage.ClusterEnabled, + SentinelEnabled: r.Config.rateLimit.Storage.SentinelEnabled, + MasterName: r.Config.rateLimit.Storage.MasterName, + SentinelAddrs: r.Config.rateLimit.Storage.SentinelAddrs, + SentinelPassword: r.Config.rateLimit.Storage.SentinelPassword, + Logger: r.logger, }) if err != nil { return fmt.Errorf("failed to create redis client: %w", err) diff --git a/router/internal/rediscloser/rediscloser.go b/router/internal/rediscloser/rediscloser.go index 2bbc7c3459..fb5af945a6 100644 --- a/router/internal/rediscloser/rediscloser.go +++ b/router/internal/rediscloser/rediscloser.go @@ -21,60 +21,130 @@ type RedisCloserOptions struct { URLs []string ClusterEnabled bool Password string + + // Redis Sentinel configuration + SentinelEnabled bool + MasterName string + SentinelAddrs []string + SentinelPassword string } func NewRedisCloser(opts *RedisCloserOptions) (RDCloser, error) { - if len(opts.URLs) == 0 { - return nil, fmt.Errorf("no redis URLs provided") + if err := validateRedisConfig(opts); err != nil { + return nil, err } - var rdb RDCloser - // If provided, we create a cluster client - if opts.ClusterEnabled { - opts.Logger.Info("Detected that redis is running in cluster mode.") + switch { + case opts.SentinelEnabled: + return createSentinelClient(opts) + case opts.ClusterEnabled: + return createClusterClient(opts) + default: + return createStandaloneClient(opts) + } +} - // Parse the first URL to get the cluster options. We assume that the first URL provided is the primary URL - // and append further URLs as secondary addr params to the URL, as required by the go-redis library. - // e.g. redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791 - parsedUrl, err := url.Parse(opts.URLs[0]) - if err != nil { - return nil, fmt.Errorf("failed to parse the redis connection url: %w", err) +// validateRedisConfig validates the Redis configuration options +func validateRedisConfig(opts *RedisCloserOptions) error { + switch { + case opts.SentinelEnabled: + if opts.MasterName == "" { + return fmt.Errorf("master_name is required when sentinel_enabled is true") } - - // This operates on the URL query, and if there are more urls, appends them to the addr param - addClusterUrlsToQuery(opts, parsedUrl) - // Parse the cluster URL using the library method, to pick up all of the options encoded - clusterOps, err := redis.ParseClusterURL(parsedUrl.String()) - - if err != nil { - return nil, fmt.Errorf("failed to parse the redis connection url into ops: %w", err) + if len(opts.SentinelAddrs) == 0 { + return fmt.Errorf("sentinel_addrs is required when sentinel_enabled is true") } - if opts.Password != "" { - // If they explicitly provide a password, assume that it's overwriting the URL password or that none was - // provided in the URL - clusterOps.Password = opts.Password + if opts.ClusterEnabled { + return fmt.Errorf("cannot enable both sentinel_enabled and cluster_enabled") } - - rdb = redis.NewClusterClient(clusterOps) - } else { - urlEncodedOpts, err := redis.ParseURL(opts.URLs[0]) - if err != nil { - return nil, fmt.Errorf("failed to parse the redis connection url: %w", err) + case opts.ClusterEnabled: + if len(opts.URLs) == 0 { + return fmt.Errorf("urls is required when cluster_enabled is true") } - if opts.Password != "" { - // If they explicitly provide a password, assume that it's overwriting the URL password or that none was - // provided in the URL - urlEncodedOpts.Password = opts.Password + default: + if len(opts.URLs) == 0 { + return fmt.Errorf("urls is required for standalone Redis") } - rdb = redis.NewClient(urlEncodedOpts) + } + return nil +} - if isClusterClient(rdb) { - opts.Logger.Warn("Detected that redis is running in cluster mode. You may encounter issues as a result") - } +// createSentinelClient creates a Redis sentinel client +func createSentinelClient(opts *RedisCloserOptions) (RDCloser, error) { + opts.Logger.Info("Creating Redis client in sentinel mode.", + zap.String("master_name", opts.MasterName), + zap.Int("sentinel_count", len(opts.SentinelAddrs))) + + rdb := redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: opts.MasterName, + SentinelAddrs: opts.SentinelAddrs, + SentinelPassword: opts.SentinelPassword, + Password: opts.Password, + }) + + if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning { + return rdb, fmt.Errorf("failed to create a functioning Redis sentinel client: %w", err) + } + + return rdb, nil +} + +// createClusterClient creates a Redis cluster client +func createClusterClient(opts *RedisCloserOptions) (RDCloser, error) { + opts.Logger.Info("Creating Redis client in cluster mode.") + + // Parse the first URL to get the cluster options. We assume that the first URL provided is the primary URL + // and append further URLs as secondary addr params to the URL, as required by the go-redis library. + // e.g. redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791 + parsedUrl, err := url.Parse(opts.URLs[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse the redis connection url: %w", err) + } + + // This operates on the URL query, and if there are more urls, appends them to the addr param + addClusterUrlsToQuery(opts, parsedUrl) + // Parse the cluster URL using the library method, to pick up all of the options encoded + clusterOps, err := redis.ParseClusterURL(parsedUrl.String()) + + if err != nil { + return nil, fmt.Errorf("failed to parse the redis connection url into ops: %w", err) + } + if opts.Password != "" { + // If they explicitly provide a password, assume that it's overwriting the URL password or that none was + // provided in the URL + clusterOps.Password = opts.Password + } + + rdb := redis.NewClusterClient(clusterOps) + + if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning { + return rdb, fmt.Errorf("failed to create a functioning Redis cluster client: %w", err) + } + + return rdb, nil +} + +// createStandaloneClient creates a standalone Redis client +func createStandaloneClient(opts *RedisCloserOptions) (RDCloser, error) { + opts.Logger.Info("Creating Redis client in standalone mode.") + + urlEncodedOpts, err := redis.ParseURL(opts.URLs[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse the redis connection url: %w", err) + } + if opts.Password != "" { + // If they explicitly provide a password, assume that it's overwriting the URL password or that none was + // provided in the URL + urlEncodedOpts.Password = opts.Password + } + rdb := redis.NewClient(urlEncodedOpts) + + if isClusterClient(rdb) { + opts.Logger.Warn("Detected that redis is running in cluster mode. You may encounter issues as a result") } if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning { - return rdb, fmt.Errorf("failed to create a functioning redis client with the provided URLs: %w", err) + return rdb, fmt.Errorf("failed to create a functioning Redis standalone client: %w", err) } return rdb, nil diff --git a/router/internal/rediscloser/rediscloser_test.go b/router/internal/rediscloser/rediscloser_test.go index 79da15f237..a68137ed11 100644 --- a/router/internal/rediscloser/rediscloser_test.go +++ b/router/internal/rediscloser/rediscloser_test.go @@ -65,3 +65,179 @@ func TestRedisCloser(t *testing.T) { require.ErrorContains(t, err, "failed to create a functioning redis client") }) } + +func TestRedisCloser_SentinelMode(t *testing.T) { + t.Parallel() + + t.Run("validates sentinel configuration", func(t *testing.T) { + testCases := []struct { + name string + opts *RedisCloserOptions + expectedErr string + }{ + { + name: "missing master name", + opts: &RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + SentinelEnabled: true, + SentinelAddrs: []string{"127.0.0.1:26379"}, + }, + expectedErr: "master_name is required when sentinel_enabled is true", + }, + { + name: "missing sentinel addresses", + opts: &RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + SentinelEnabled: true, + MasterName: "mymaster", + }, + expectedErr: "sentinel_addrs is required when sentinel_enabled is true", + }, + { + name: "sentinel and cluster both enabled", + opts: &RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + SentinelEnabled: true, + ClusterEnabled: true, + MasterName: "mymaster", + SentinelAddrs: []string{"127.0.0.1:26379"}, + }, + expectedErr: "cannot enable both sentinel_enabled and cluster_enabled", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := NewRedisCloser(tc.opts) + require.Error(t, err) + require.ErrorContains(t, err, tc.expectedErr) + }) + } + }) + + t.Run("creates sentinel client with valid config", func(t *testing.T) { + // Note: This test will fail to connect since we don't have a real sentinel, + // but it validates the configuration parsing and client creation logic + _, err := NewRedisCloser(&RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + SentinelEnabled: true, + MasterName: "mymaster", + SentinelAddrs: []string{"127.0.0.1:26379", "127.0.0.1:26380"}, + SentinelPassword: "sentinel_pass", + Password: "redis_pass", + }) + + // We expect this to fail with a connection error since no real sentinel is running + require.Error(t, err) + require.ErrorContains(t, err, "failed to create a functioning Redis sentinel client") + }) + + t.Run("handles single sentinel address", func(t *testing.T) { + _, err := NewRedisCloser(&RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + SentinelEnabled: true, + MasterName: "mymaster", + SentinelAddrs: []string{"127.0.0.1:26379"}, + }) + + // We expect this to fail with a connection error since no real sentinel is running + require.Error(t, err) + require.ErrorContains(t, err, "failed to create a functioning Redis sentinel client") + }) +} + +func TestValidateRedisConfig(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + opts *RedisCloserOptions + expectError bool + errorMsg string + }{ + { + name: "valid standalone config", + opts: &RedisCloserOptions{ + URLs: []string{"redis://localhost:6379"}, + }, + expectError: false, + }, + { + name: "valid cluster config", + opts: &RedisCloserOptions{ + ClusterEnabled: true, + URLs: []string{"redis://localhost:6379"}, + }, + expectError: false, + }, + { + name: "valid sentinel config", + opts: &RedisCloserOptions{ + SentinelEnabled: true, + MasterName: "mymaster", + SentinelAddrs: []string{"127.0.0.1:26379"}, + }, + expectError: false, + }, + { + name: "missing URLs for standalone", + opts: &RedisCloserOptions{ + URLs: []string{}, + }, + expectError: true, + errorMsg: "urls is required for standalone Redis", + }, + { + name: "missing URLs for cluster", + opts: &RedisCloserOptions{ + ClusterEnabled: true, + URLs: []string{}, + }, + expectError: true, + errorMsg: "urls is required when cluster_enabled is true", + }, + { + name: "sentinel without master name", + opts: &RedisCloserOptions{ + SentinelEnabled: true, + SentinelAddrs: []string{"127.0.0.1:26379"}, + }, + expectError: true, + errorMsg: "master_name is required when sentinel_enabled is true", + }, + { + name: "sentinel without addresses", + opts: &RedisCloserOptions{ + SentinelEnabled: true, + MasterName: "mymaster", + SentinelAddrs: []string{}, + }, + expectError: true, + errorMsg: "sentinel_addrs is required when sentinel_enabled is true", + }, + { + name: "both sentinel and cluster enabled", + opts: &RedisCloserOptions{ + SentinelEnabled: true, + ClusterEnabled: true, + MasterName: "mymaster", + SentinelAddrs: []string{"127.0.0.1:26379"}, + URLs: []string{"redis://localhost:6379"}, + }, + expectError: true, + errorMsg: "cannot enable both sentinel_enabled and cluster_enabled", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := validateRedisConfig(tc.opts) + if tc.expectError { + require.Error(t, err) + require.ErrorContains(t, err, tc.errorMsg) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index a4de546a96..b054b2b2b3 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -535,6 +535,12 @@ type RedisConfiguration struct { URLs []string `yaml:"urls,omitempty" env:"RATE_LIMIT_REDIS_URLS"` ClusterEnabled bool `yaml:"cluster_enabled,omitempty" envDefault:"false" env:"RATE_LIMIT_REDIS_CLUSTER_ENABLED"` KeyPrefix string `yaml:"key_prefix,omitempty" envDefault:"cosmo_rate_limit" env:"RATE_LIMIT_REDIS_KEY_PREFIX"` + + // Redis Sentinel configuration for high availability + SentinelEnabled bool `yaml:"sentinel_enabled,omitempty" envDefault:"false" env:"RATE_LIMIT_REDIS_SENTINEL_ENABLED"` + MasterName string `yaml:"master_name,omitempty" env:"RATE_LIMIT_REDIS_MASTER_NAME"` + SentinelAddrs []string `yaml:"sentinel_addrs,omitempty" env:"RATE_LIMIT_REDIS_SENTINEL_ADDRS"` + SentinelPassword string `yaml:"sentinel_password,omitempty" env:"RATE_LIMIT_REDIS_SENTINEL_PASSWORD"` } type RateLimitSimpleStrategy struct { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index d07f2bd0d0..4e4ee1ad2d 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1907,7 +1907,6 @@ "storage": { "type": "object", "additionalProperties": false, - "required": ["urls"], "properties": { "cluster_enabled": { "type": "boolean", @@ -1916,7 +1915,7 @@ }, "urls": { "type": "array", - "description": "The Redis connection URLs. The values are specified as a string with the format 'scheme://host:port', with optional auth and options added in to the URL. If cluster is enabled, will use them to instantiate a cluster connection.", + "description": "The Redis connection URLs. The values are specified as a string with the format 'scheme://host:port', with optional auth and options added in to the URL. If cluster is enabled, will use them to instantiate a cluster connection. Required for standalone and cluster modes.", "default": [], "items": { "type": "string" @@ -1926,6 +1925,26 @@ "type": "string", "description": "The prefix of the keys used to store the rate limit data.", "default": "cosmo_rate_limit" + }, + "sentinel_enabled": { + "type": "boolean", + "description": "Enable Redis Sentinel mode for high availability. Cannot be used together with cluster_enabled.", + "default": false + }, + "master_name": { + "type": "string", + "description": "The name of the Redis master as configured in Sentinel. Required when sentinel_enabled is true." + }, + "sentinel_addrs": { + "type": "array", + "description": "List of Redis Sentinel addresses in the format 'host:port'. Required when sentinel_enabled is true.", + "items": { + "type": "string" + } + }, + "sentinel_password": { + "type": "string", + "description": "Password for authenticating with Redis Sentinel nodes, if required." } } }, From fceaf8a2d697651a30a2e7196dd4fe4a5ef55d18 Mon Sep 17 00:00:00 2001 From: "y.li" Date: Thu, 9 Oct 2025 14:45:19 +0800 Subject: [PATCH 2/4] fix: naming --- router/core/router.go | 1 - router/internal/rediscloser/rediscloser.go | 12 ++++++------ router/internal/rediscloser/rediscloser_test.go | 10 +++++----- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/router/core/router.go b/router/core/router.go index 9fb64ab278..0dd1fca4cd 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -1,5 +1,4 @@ package core -package core import ( "context" diff --git a/router/internal/rediscloser/rediscloser.go b/router/internal/rediscloser/rediscloser.go index fb5af945a6..210739995c 100644 --- a/router/internal/rediscloser/rediscloser.go +++ b/router/internal/rediscloser/rediscloser.go @@ -40,7 +40,7 @@ func NewRedisCloser(opts *RedisCloserOptions) (RDCloser, error) { case opts.ClusterEnabled: return createClusterClient(opts) default: - return createStandaloneClient(opts) + return createDirectClient(opts) } } @@ -63,7 +63,7 @@ func validateRedisConfig(opts *RedisCloserOptions) error { } default: if len(opts.URLs) == 0 { - return fmt.Errorf("urls is required for standalone Redis") + return fmt.Errorf("urls is required for direct Redis") } } return nil @@ -124,9 +124,9 @@ func createClusterClient(opts *RedisCloserOptions) (RDCloser, error) { return rdb, nil } -// createStandaloneClient creates a standalone Redis client -func createStandaloneClient(opts *RedisCloserOptions) (RDCloser, error) { - opts.Logger.Info("Creating Redis client in standalone mode.") +// createDirectClient creates a direct Redis client (single instance or master-slave without automatic failover) +func createDirectClient(opts *RedisCloserOptions) (RDCloser, error) { + opts.Logger.Info("Creating Redis client in direct mode.") urlEncodedOpts, err := redis.ParseURL(opts.URLs[0]) if err != nil { @@ -144,7 +144,7 @@ func createStandaloneClient(opts *RedisCloserOptions) (RDCloser, error) { } if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning { - return rdb, fmt.Errorf("failed to create a functioning Redis standalone client: %w", err) + return rdb, fmt.Errorf("failed to create a functioning Redis direct client: %w", err) } return rdb, nil diff --git a/router/internal/rediscloser/rediscloser_test.go b/router/internal/rediscloser/rediscloser_test.go index a68137ed11..e567158525 100644 --- a/router/internal/rediscloser/rediscloser_test.go +++ b/router/internal/rediscloser/rediscloser_test.go @@ -18,7 +18,7 @@ func TestRedisCloser(t *testing.T) { }) require.Error(t, err) - require.ErrorContains(t, err, "no redis URLs provided") + require.ErrorContains(t, err, "urls is required for direct Redis") }) t.Run("Creates default client for normal redis", func(t *testing.T) { @@ -62,7 +62,7 @@ func TestRedisCloser(t *testing.T) { }) require.Error(t, err) - require.ErrorContains(t, err, "failed to create a functioning redis client") + require.ErrorContains(t, err, "failed to create a functioning Redis direct client") }) } @@ -156,7 +156,7 @@ func TestValidateRedisConfig(t *testing.T) { errorMsg string }{ { - name: "valid standalone config", + name: "valid direct config", opts: &RedisCloserOptions{ URLs: []string{"redis://localhost:6379"}, }, @@ -180,12 +180,12 @@ func TestValidateRedisConfig(t *testing.T) { expectError: false, }, { - name: "missing URLs for standalone", + name: "missing URLs for direct", opts: &RedisCloserOptions{ URLs: []string{}, }, expectError: true, - errorMsg: "urls is required for standalone Redis", + errorMsg: "urls is required for direct Redis", }, { name: "missing URLs for cluster", From 10302c495187f9ac6e5921f1e004ea6ea60017b3 Mon Sep 17 00:00:00 2001 From: "y.li" Date: Thu, 9 Oct 2025 17:05:32 +0800 Subject: [PATCH 3/4] fix: add check logic --- router/internal/rediscloser/rediscloser.go | 8 ++++++++ router/pkg/config/config.schema.json | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/router/internal/rediscloser/rediscloser.go b/router/internal/rediscloser/rediscloser.go index 210739995c..ac586916b2 100644 --- a/router/internal/rediscloser/rediscloser.go +++ b/router/internal/rediscloser/rediscloser.go @@ -30,6 +30,12 @@ type RedisCloserOptions struct { } func NewRedisCloser(opts *RedisCloserOptions) (RDCloser, error) { + if opts == nil { + return nil, fmt.Errorf("nil RedisCloserOptions") + } + if opts.Logger == nil { + opts.Logger = zap.NewNop() + } if err := validateRedisConfig(opts); err != nil { return nil, err } @@ -83,6 +89,7 @@ func createSentinelClient(opts *RedisCloserOptions) (RDCloser, error) { }) if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning { + _ = rdb.Close() return rdb, fmt.Errorf("failed to create a functioning Redis sentinel client: %w", err) } @@ -118,6 +125,7 @@ func createClusterClient(opts *RedisCloserOptions) (RDCloser, error) { rdb := redis.NewClusterClient(clusterOps) if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning { + _ = rdb.Close() return rdb, fmt.Errorf("failed to create a functioning Redis cluster client: %w", err) } diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 4e4ee1ad2d..8395f32800 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -1915,7 +1915,7 @@ }, "urls": { "type": "array", - "description": "The Redis connection URLs. The values are specified as a string with the format 'scheme://host:port', with optional auth and options added in to the URL. If cluster is enabled, will use them to instantiate a cluster connection. Required for standalone and cluster modes.", + "description": "The Redis connection URLs. The values are specified as a string with the format 'scheme://host:port', with optional auth and options added in to the URL. If cluster is enabled, will use them to instantiate a cluster connection. Required for direct and cluster modes.", "default": [], "items": { "type": "string" From 7bc51c6721ad1121529aa904b29f3433648a953a Mon Sep 17 00:00:00 2001 From: "y.li" Date: Thu, 9 Oct 2025 17:13:03 +0800 Subject: [PATCH 4/4] fix: close rdb if init fail --- router/internal/rediscloser/rediscloser.go | 1 + 1 file changed, 1 insertion(+) diff --git a/router/internal/rediscloser/rediscloser.go b/router/internal/rediscloser/rediscloser.go index ac586916b2..e6852e9e10 100644 --- a/router/internal/rediscloser/rediscloser.go +++ b/router/internal/rediscloser/rediscloser.go @@ -152,6 +152,7 @@ func createDirectClient(opts *RedisCloserOptions) (RDCloser, error) { } if isFunctioning, err := IsFunctioningClient(rdb); !isFunctioning { + _ = rdb.Close() return rdb, fmt.Errorf("failed to create a functioning Redis direct client: %w", err) }