diff --git a/.github/workflows/router-ci.yaml b/.github/workflows/router-ci.yaml index 7ae5bc390c..113a5cfdaf 100644 --- a/.github/workflows/router-ci.yaml +++ b/.github/workflows/router-ci.yaml @@ -156,6 +156,16 @@ jobs: with: cache-dependency-path: | router-tests/go.sum + - name: Setup Redis Cluster (for Cluster tests) + uses: vishnudxb/redis-cluster@1.0.9 + with: + master1-port: 7000 + master2-port: 7001 + master3-port: 7002 + slave1-port: 7003 + slave2-port: 7004 + slave3-port: 7005 + sleep-duration: 5 - uses: nick-fields/retry@v3 with: timeout_minutes: 30 diff --git a/docker-compose.yml b/docker-compose.yml index 473b6d39fb..116f93e850 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -243,6 +243,55 @@ services: profiles: - dev +# 3 node minimum for a cluster, per redis documentation + redis-cluster-node-0: + image: redis:${DC_REDIS_VERSION:-7.2.4}-alpine + command: redis-server /usr/local/etc/redis/redis.conf + networks: + - primary + ports: + - "7000:6379" + volumes: + - ./docker/redis/redis-cluster.conf:/usr/local/etc/redis/redis.conf + profiles: + - dev + + redis-cluster-node-1: + image: redis:${DC_REDIS_VERSION:-7.2.4}-alpine + command: redis-server /usr/local/etc/redis/redis.conf + networks: + - primary + ports: + - "7001:6379" + volumes: + - ./docker/redis/redis-cluster.conf:/usr/local/etc/redis/redis.conf + profiles: + - dev + + redis-cluster-node-2: + image: redis:${DC_REDIS_VERSION:-7.2.4}-alpine + command: redis-server /usr/local/etc/redis/redis.conf + networks: + - primary + ports: + - "7002:6379" + volumes: + - ./docker/redis/redis-cluster.conf:/usr/local/etc/redis/redis.conf + profiles: + - dev + + redis-cluster-configure: + image: redis:${DC_REDIS_VERSION:-7.2.4}-alpine + command: /usr/local/etc/redis/redis-cluster-create.sh + networks: + - primary + depends_on: + - redis-cluster-node-0 + - redis-cluster-node-1 + - redis-cluster-node-2 + volumes: + - ./docker/redis/:/usr/local/etc/redis/ + kafka: image: bitnami/kafka:3.7.0 ports: @@ -279,3 +328,7 @@ volumes: minio: redis: redis-slave: + redis-cluster-configure: + redis-cluster-node-0: + redis-cluster-node-1: + redis-cluster-node-2: diff --git a/docker/redis/redis-cluster-create.sh b/docker/redis/redis-cluster-create.sh new file mode 100755 index 0000000000..f9ca4fda67 --- /dev/null +++ b/docker/redis/redis-cluster-create.sh @@ -0,0 +1,13 @@ +# wait for the docker-compose depends_on to spin up the redis nodes usually takes this long +sleep 10 + +node_0_ip=$(getent hosts redis-cluster-node-0 | awk '{ print $1 }') +node_1_ip=$(getent hosts redis-cluster-node-1 | awk '{ print $1 }') +node_2_ip=$(getent hosts redis-cluster-node-2 | awk '{ print $1 }') + + +redis-cli --cluster create \ + $node_0_ip:6379 \ + $node_1_ip:6379 \ + $node_2_ip:6379 \ + --cluster-replicas 0 --cluster-yes \ No newline at end of file diff --git a/docker/redis/redis-cluster.conf b/docker/redis/redis-cluster.conf new file mode 100644 index 0000000000..53d443eb7c --- /dev/null +++ b/docker/redis/redis-cluster.conf @@ -0,0 +1,9 @@ +cluster-enabled yes +cluster-config-file nodes.conf +cluster-node-timeout 5000 +bind 0.0.0.0 +protected-mode no +maxmemory 100mb +maxmemory-policy noeviction +save 60 1 +appendonly no \ No newline at end of file diff --git a/router-tests/automatic_persisted_queries_test.go b/router-tests/automatic_persisted_queries_test.go index 2a7fb7acd4..0f7717e65e 100644 --- a/router-tests/automatic_persisted_queries_test.go +++ b/router-tests/automatic_persisted_queries_test.go @@ -164,6 +164,7 @@ func TestAutomaticPersistedQueries(t *testing.T) { redisLocalUrl = "localhost:6379" redisUrl = fmt.Sprintf("redis://%s", redisLocalUrl) redisPassword = "test" + client = redis.NewClient(&redis.Options{Addr: redisLocalUrl, Password: redisPassword}) ) t.Parallel() @@ -172,7 +173,6 @@ func TestAutomaticPersistedQueries(t *testing.T) { key := uuid.New().String() t.Cleanup(func() { - client := redis.NewClient(&redis.Options{Addr: redisLocalUrl, Password: redisPassword}) del := client.Del(context.Background(), key) require.NoError(t, del.Err()) }) @@ -180,10 +180,10 @@ func TestAutomaticPersistedQueries(t *testing.T) { testenv.Run(t, &testenv.Config{ RouterOptions: []core.Option{ core.WithStorageProviders(config.StorageProviders{ - Redis: []config.BaseStorageProvider{ + Redis: []config.RedisStorageProvider{ { - URL: redisUrl, - ID: "redis", + URLs: []string{redisUrl}, + ID: "redis", }, }})}, ApqConfig: config.AutomaticPersistedQueriesConfig{ @@ -206,7 +206,6 @@ func TestAutomaticPersistedQueries(t *testing.T) { key := uuid.New().String() t.Cleanup(func() { - client := redis.NewClient(&redis.Options{Addr: redisLocalUrl, Password: redisPassword}) del := client.Del(context.Background(), key) require.NoError(t, del.Err()) }) @@ -214,10 +213,10 @@ func TestAutomaticPersistedQueries(t *testing.T) { testenv.Run(t, &testenv.Config{ RouterOptions: []core.Option{ core.WithStorageProviders(config.StorageProviders{ - Redis: []config.BaseStorageProvider{ + Redis: []config.RedisStorageProvider{ { - URL: redisUrl, - ID: "redis", + URLs: []string{redisUrl}, + ID: "redis", }, }})}, ApqConfig: config.AutomaticPersistedQueriesConfig{ @@ -264,7 +263,6 @@ func TestAutomaticPersistedQueries(t *testing.T) { key := uuid.New().String() t.Cleanup(func() { - client := redis.NewClient(&redis.Options{Addr: redisLocalUrl, Password: redisPassword}) del := client.Del(context.Background(), key) require.NoError(t, del.Err()) }) @@ -272,10 +270,10 @@ func TestAutomaticPersistedQueries(t *testing.T) { testenv.Run(t, &testenv.Config{ RouterOptions: []core.Option{ core.WithStorageProviders(config.StorageProviders{ - Redis: []config.BaseStorageProvider{ + Redis: []config.RedisStorageProvider{ { - URL: redisUrl, - ID: "redis", + URLs: []string{redisUrl}, + ID: "redis", }, }})}, ApqConfig: config.AutomaticPersistedQueriesConfig{ @@ -314,7 +312,6 @@ func TestAutomaticPersistedQueries(t *testing.T) { key := uuid.New().String() t.Cleanup(func() { - client := redis.NewClient(&redis.Options{Addr: redisLocalUrl, Password: redisPassword}) del := client.Del(context.Background(), key) require.NoError(t, del.Err()) }) @@ -322,10 +319,10 @@ func TestAutomaticPersistedQueries(t *testing.T) { testenv.Run(t, &testenv.Config{ RouterOptions: []core.Option{ core.WithStorageProviders(config.StorageProviders{ - Redis: []config.BaseStorageProvider{ + Redis: []config.RedisStorageProvider{ { - URL: redisUrl, - ID: "redis", + URLs: []string{redisUrl}, + ID: "redis", }, }})}, ApqConfig: config.AutomaticPersistedQueriesConfig{ @@ -366,6 +363,70 @@ func TestAutomaticPersistedQueries(t *testing.T) { require.Equal(t, `{"data":{"__typename":"Query"}}`, res3.Body) }) }) + + t.Run("works with cluster mode", func(t *testing.T) { + t.Parallel() + + clusterUrls := []string{"localhost:7000", "localhost:7001"} + clusterClient := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: clusterUrls, + Password: redisPassword, + }) + + key := uuid.New().String() + t.Cleanup(func() { + del := clusterClient.Del(context.Background(), key) + require.NoError(t, del.Err()) + }) + + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithStorageProviders(config.StorageProviders{ + Redis: []config.RedisStorageProvider{ + { + ClusterEnabled: true, + URLs: clusterUrls, + ID: "redis", + }, + }})}, + ApqConfig: config.AutomaticPersistedQueriesConfig{ + Enabled: true, + Storage: config.AutomaticPersistedQueriesStorageConfig{ + ProviderID: "redis", + ObjectPrefix: key, + }, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + header := make(http.Header) + header.Add("graphql-client-name", "my-client") + res0 := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38"}}`), + Header: header, + }) + require.Equal(t, `{"errors":[{"message":"PersistedQueryNotFound","extensions":{"code":"PERSISTED_QUERY_NOT_FOUND"}}]}`, res0.Body) + + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `{__typename}`, + Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38"}}`), + Header: header, + }) + require.Equal(t, `{"data":{"__typename":"Query"}}`, res.Body) + + res2 := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38"}}`), + Header: header, + }) + require.Equal(t, `{"data":{"__typename":"Query"}}`, res2.Body) + + header2 := make(http.Header) + header2.Add("graphql-client-name", "not-my-client") + res3 := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Extensions: []byte(`{"persistedQuery": {"version": 1, "sha256Hash": "ecf4edb46db40b5132295c0291d62fb65d6759a9eedfa4d5d612dd5ec54a6b38"}}`), + Header: header2, + }) + require.Equal(t, `{"data":{"__typename":"Query"}}`, res3.Body) + }) + }) }) } diff --git a/router-tests/ratelimit_test.go b/router-tests/ratelimit_test.go index 6c2586e71b..2abaa2860a 100644 --- a/router-tests/ratelimit_test.go +++ b/router-tests/ratelimit_test.go @@ -52,7 +52,7 @@ func TestRateLimit(t *testing.T) { core.WithRateLimitConfig(&config.RateLimitConfiguration{ Enabled: false, Storage: config.RedisConfiguration{ - Url: "redis://localhost:1", + URLs: []string{"redis://localhost:1"}, KeyPrefix: "non", }, }), @@ -86,7 +86,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -121,7 +121,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -162,7 +162,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -225,7 +225,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, KeySuffixExpression: "request.header.Get('X-Forwarded-For')", @@ -282,7 +282,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -325,7 +325,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -380,7 +380,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -414,7 +414,7 @@ func TestRateLimit(t *testing.T) { RejectExceedingRequests: false, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -449,7 +449,7 @@ func TestRateLimit(t *testing.T) { RejectStatusCode: http.StatusOK, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -487,7 +487,7 @@ func TestRateLimit(t *testing.T) { RejectStatusCode: http.StatusTooManyRequests, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -524,7 +524,7 @@ func TestRateLimit(t *testing.T) { HideStatsFromResponseExtension: true, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -560,7 +560,7 @@ func TestRateLimit(t *testing.T) { HideStatsFromResponseExtension: true, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -606,7 +606,7 @@ func TestRateLimit(t *testing.T) { HideStatsFromResponseExtension: true, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -653,7 +653,7 @@ func TestRateLimit(t *testing.T) { HideStatsFromResponseExtension: true, }, Storage: config.RedisConfiguration{ - Url: "redis://localhost:6379", + URLs: []string{"redis://localhost:6379"}, KeyPrefix: key, }, Debug: true, @@ -673,6 +673,145 @@ func TestRateLimit(t *testing.T) { require.Equal(t, `{"errors":[{"message":"Rate limit exceeded"}],"data":null}`, res.Body) }) }) + t.Run("Cluster Mode", func(t *testing.T) { + var ( + clusterUrlSlice = []string{"localhost:7000", "localhost:7001", "localhost:7002"} + password = "test" + ) + + t.Run("enabled - below limit", func(t *testing.T) { + t.Parallel() + + key := uuid.New().String() + t.Cleanup(func() { + client := redis.NewClusterClient(&redis.ClusterOptions{Addrs: clusterUrlSlice, Password: password}) + del := client.Del(context.Background(), key) + require.NoError(t, del.Err()) + }) + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithRateLimitConfig(&config.RateLimitConfiguration{ + Enabled: true, + Strategy: "simple", + SimpleStrategy: config.RateLimitSimpleStrategy{ + Rate: 1, + Burst: 1, + Period: time.Second * 2, + RejectExceedingRequests: false, + }, + Storage: config.RedisConfiguration{ + ClusterEnabled: true, + URLs: clusterUrlSlice, + KeyPrefix: key, + }, + Debug: true, + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query ($n:Int!) { employee(id:$n) { id details { forename surname } } }`, + Variables: json.RawMessage(`{"n":1}`), + }) + require.Equal(t, fmt.Sprintf(`{"data":{"employee":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}},"extensions":{"rateLimit":{"key":"%s","requestRate":1,"remaining":0,"retryAfterMs":1234,"resetAfterMs":1234}}}`, key), res.Body) + }) + }) + t.Run("enabled - header key", func(t *testing.T) { + t.Parallel() + + key := uuid.New().String() + t.Cleanup(func() { + client := redis.NewClusterClient(&redis.ClusterOptions{Addrs: clusterUrlSlice, Password: password}) + del := client.Del(context.Background(), fmt.Sprintf("%s:localhost", key)) + require.NoError(t, del.Err()) + }) + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithRateLimitConfig(&config.RateLimitConfiguration{ + Enabled: true, + Strategy: "simple", + SimpleStrategy: config.RateLimitSimpleStrategy{ + Rate: 1, + Burst: 1, + Period: time.Second * 2, + RejectExceedingRequests: false, + }, + Storage: config.RedisConfiguration{ + ClusterEnabled: true, + URLs: clusterUrlSlice, + KeyPrefix: key, + }, + Debug: true, + KeySuffixExpression: "request.header.Get('X-Forwarded-For')", + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res, err := xEnv.MakeGraphQLRequestWithHeaders(testenv.GraphQLRequest{ + Query: `query ($n:Int!) { employee(id:$n) { id details { forename surname } } }`, + Variables: json.RawMessage(`{"n":1}`), + }, map[string]string{ + "X-Forwarded-For": "localhost", + }) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf(`{"data":{"employee":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}},"extensions":{"rateLimit":{"key":"%s:localhost","requestRate":1,"remaining":0,"retryAfterMs":1234,"resetAfterMs":1234}}}`, key), res.Body) + }) + }) + t.Run("enabled - above limit", func(t *testing.T) { + t.Parallel() + + key := uuid.New().String() + t.Cleanup(func() { + client := redis.NewClusterClient(&redis.ClusterOptions{Addrs: clusterUrlSlice, Password: password}) + del := client.Del(context.Background(), key) + require.NoError(t, del.Err()) + }) + testenv.Run(t, &testenv.Config{ + RouterOptions: []core.Option{ + core.WithRateLimitConfig(&config.RateLimitConfiguration{ + Enabled: true, + Strategy: "simple", + SimpleStrategy: config.RateLimitSimpleStrategy{ + Rate: 2, + Burst: 2, + Period: time.Second * 2, + RejectExceedingRequests: false, + }, + Storage: config.RedisConfiguration{ + ClusterEnabled: true, + URLs: clusterUrlSlice, + KeyPrefix: key, + }, + Debug: true, + }), + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query ($n:Int!) { employee(id:$n) { id details { forename surname } } }`, + Variables: json.RawMessage(`{"n":1}`), + }) + require.Equal(t, fmt.Sprintf(`{"data":{"employee":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}},"extensions":{"rateLimit":{"key":"%s","requestRate":1,"remaining":1,"retryAfterMs":1234,"resetAfterMs":1234}}}`, key), res.Body) + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query ($n:Int!) { employee(id:$n) { id details { forename surname } } }`, + Variables: json.RawMessage(`{"n":1}`), + }) + require.Equal(t, fmt.Sprintf(`{"data":{"employee":{"id":1,"details":{"forename":"Jens","surname":"Neuse"}}},"extensions":{"rateLimit":{"key":"%s","requestRate":1,"remaining":0,"retryAfterMs":1234,"resetAfterMs":1234}}}`, key), res.Body) + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query ($n:Int!) { employee(id:$n) { id details { forename surname } } }`, + Variables: json.RawMessage(`{"n":1}`), + }) + require.Equal(t, fmt.Sprintf(`{"errors":[{"message":"Rate limit exceeded for Subgraph 'employees'."}],"data":{"employee":null},"extensions":{"rateLimit":{"key":"%s","requestRate":1,"remaining":0,"retryAfterMs":1234,"resetAfterMs":1234}}}`, key), res.Body) + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query ($n:Int!) { employee(id:$n) { id details { forename surname } } }`, + Variables: json.RawMessage(`{"n":1}`), + }) + require.Equal(t, fmt.Sprintf(`{"errors":[{"message":"Rate limit exceeded for Subgraph 'employees'."}],"data":{"employee":null},"extensions":{"rateLimit":{"key":"%s","requestRate":1,"remaining":0,"retryAfterMs":1234,"resetAfterMs":1234}}}`, key), res.Body) + res = xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `query ($n:Int!) { employee(id:$n) { id details { forename surname } } }`, + Variables: json.RawMessage(`{"n":1}`), + }) + require.Equal(t, fmt.Sprintf(`{"errors":[{"message":"Rate limit exceeded for Subgraph 'employees'."}],"data":{"employee":null},"extensions":{"rateLimit":{"key":"%s","requestRate":1,"remaining":0,"retryAfterMs":1234,"resetAfterMs":1234}}}`, key), res.Body) + }) + }) + }) } const ( diff --git a/router/core/ratelimiter.go b/router/core/ratelimiter.go index c76fc4fb8e..2e2b0d2268 100644 --- a/router/core/ratelimiter.go +++ b/router/core/ratelimiter.go @@ -6,12 +6,12 @@ import ( "encoding/json" "errors" "fmt" + rd "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/redis" "io" "sync" "github.com/expr-lang/expr/vm" "github.com/go-redis/redis_rate/v10" - "github.com/redis/go-redis/v9" "github.com/wundergraph/cosmo/router/internal/expr" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" ) @@ -21,7 +21,7 @@ var ( ) type CosmoRateLimiterOptions struct { - RedisClient *redis.Client + RedisClient rd.RDCloser Debug bool RejectStatusCode int @@ -50,7 +50,7 @@ func NewCosmoRateLimiter(opts *CosmoRateLimiterOptions) (rl *CosmoRateLimiter, e } type CosmoRateLimiter struct { - client *redis.Client + client rd.RDCloser limiter *redis_rate.Limiter debug bool diff --git a/router/core/router.go b/router/core/router.go index 4971893f05..268de1230b 100644 --- a/router/core/router.go +++ b/router/core/router.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "errors" "fmt" + rd "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/redis" "net" "net/http" "net/url" @@ -16,7 +17,6 @@ import ( "connectrpc.com/connect" "github.com/mitchellh/mapstructure" "github.com/nats-io/nuid" - "github.com/redis/go-redis/v9" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/propagation" @@ -196,7 +196,7 @@ type ( fileUploadConfig *config.FileUpload accessController *AccessController retryOptions retrytransport.RetryOptions - redisClient *redis.Client + redisClient rd.RDCloser processStartTime time.Time developmentMode bool healthcheck health.Checker @@ -840,12 +840,16 @@ func (r *Router) bootstrap(ctx context.Context) error { } if r.Config.rateLimit != nil && r.Config.rateLimit.Enabled { - options, err := redis.ParseURL(r.Config.rateLimit.Storage.Url) + var err error + r.redisClient, err = rd.NewRedisCloser(&rd.RedisCloserOptions{ + URLs: r.Config.rateLimit.Storage.URLs, + ClusterEnabled: r.Config.rateLimit.Storage.ClusterEnabled, + Logger: r.logger, + }) if err != nil { - return fmt.Errorf("failed to parse the redis connection url: %w", err) + return fmt.Errorf("failed to create redis client: %w", err) } - r.redisClient = redis.NewClient(options) } if r.metricConfig.OpenTelemetry.EngineStats.Enabled() || r.metricConfig.Prometheus.EngineStats.Enabled() || r.engineExecutionConfiguration.Debug.ReportWebSocketConnections { @@ -903,7 +907,7 @@ func (r *Router) bootstrap(ctx context.Context) error { func (r *Router) buildClients() error { s3Providers := map[string]config.S3StorageProvider{} cdnProviders := map[string]config.BaseStorageProvider{} - redisProviders := map[string]config.BaseStorageProvider{} + redisProviders := map[string]config.RedisStorageProvider{} for _, provider := range r.storageProviders.S3 { if _, ok := s3Providers[provider.ID]; ok { diff --git a/router/go.mod b/router/go.mod index 0b7d3c3a0c..bed8a0432d 100644 --- a/router/go.mod +++ b/router/go.mod @@ -60,6 +60,7 @@ require ( github.com/KimMachineGun/automemlimit v0.6.1 github.com/MicahParks/jwkset v0.5.19 github.com/MicahParks/keyfunc/v3 v3.3.5 + github.com/alicebob/miniredis/v2 v2.34.0 github.com/bep/debounce v1.2.1 github.com/caarlos0/env/v11 v11.1.0 github.com/dgraph-io/ristretto/v2 v2.1.0 @@ -79,6 +80,7 @@ require ( require ( github.com/99designs/gqlgen v0.17.49 // indirect + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -132,6 +134,7 @@ require ( github.com/tklauser/numcpus v0.6.1 // indirect github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect github.com/vektah/gqlparser/v2 v2.5.16 // indirect + github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect go.opentelemetry.io/proto/otlp v1.1.0 // indirect diff --git a/router/go.sum b/router/go.sum index 5dabb1661f..73166346b5 100644 --- a/router/go.sum +++ b/router/go.sum @@ -11,6 +11,10 @@ github.com/MicahParks/keyfunc/v3 v3.3.5 h1:7ceAJLUAldnoueHDNzF8Bx06oVcQ5CfJnYwNt github.com/MicahParks/keyfunc/v3 v3.3.5/go.mod h1:SdCCyMJn/bYqWDvARspC6nCT8Sk74MjuAY22C7dCST8= github.com/agnivade/levenshtein v1.1.1 h1:QY8M92nrzkmr798gCo3kmMyqXFzdQVpxLlGPRBij0P8= github.com/agnivade/levenshtein v1.1.1/go.mod h1:veldBMzWxcCG2ZvUTKD2kJNRdCk5hVbJomOvKkmgYbo= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0= +github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= @@ -277,6 +281,8 @@ github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083 h1:8/D7f8gKxTB github.com/wundergraph/astjson v0.0.0-20250106123708-be463c97e083/go.mod h1:eOTL6acwctsN4F3b7YE+eE2t8zcJ/doLm9sZzsxxxrE= github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.145 h1:3JuBmRux6YB/UZgh6COvgLXzQhMIsdHV7A02NsYdAVE= github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.145/go.mod h1:B7eV0Qh8Lop9QzIOQcsvKp3S0ejfC6mgyWoJnI917yQ= +github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M= +github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= diff --git a/router/internal/persistedoperation/apq/redis.go b/router/internal/persistedoperation/apq/redis.go index 6911e07d8b..8ca2b67735 100644 --- a/router/internal/persistedoperation/apq/redis.go +++ b/router/internal/persistedoperation/apq/redis.go @@ -3,8 +3,8 @@ package apq import ( "context" "errors" - "fmt" "github.com/redis/go-redis/v9" + "github.com/wundergraph/cosmo/router/internal/persistedoperation/operationstorage/redis" "github.com/wundergraph/cosmo/router/pkg/config" "go.uber.org/zap" "time" @@ -12,14 +12,14 @@ import ( type RedisOptions struct { Logger *zap.Logger - StorageConfig *config.BaseStorageProvider + StorageConfig *config.RedisStorageProvider ApqConfig *config.AutomaticPersistedQueriesConfig Prefix string } type redisClient struct { logger *zap.Logger - client *redis.Client + client rd.RDCloser prefix string } @@ -28,20 +28,19 @@ func NewRedisClient(opts *RedisOptions) (KVClient, error) { return nil, errors.New("storage config is nil") } - options, err := redis.ParseURL(opts.StorageConfig.URL) - if err != nil { - return nil, fmt.Errorf("failed to parse the redis connection url: %w", err) - } - - innerClient := redis.NewClient(options) + rdb, err := rd.NewRedisCloser(&rd.RedisCloserOptions{ + Logger: opts.Logger, + URLs: opts.StorageConfig.URLs, + ClusterEnabled: opts.StorageConfig.ClusterEnabled, + }) rclient := &redisClient{ logger: opts.Logger, - client: innerClient, + client: rdb, prefix: opts.Prefix, } - return rclient, nil + return rclient, err } func (r *redisClient) Get(ctx context.Context, operationHash string) ([]byte, error) { diff --git a/router/internal/persistedoperation/operationstorage/redis/rdcloser.go b/router/internal/persistedoperation/operationstorage/redis/rdcloser.go new file mode 100644 index 0000000000..e7029d6e61 --- /dev/null +++ b/router/internal/persistedoperation/operationstorage/redis/rdcloser.go @@ -0,0 +1,84 @@ +package rd + +import ( + "context" + "fmt" + "github.com/redis/go-redis/v9" + "go.uber.org/zap" + "io" + "strings" +) + +// RDCloser is an interface that combines the redis.Cmdable and io.Closer interfaces, ensuring that we can close the +// client connection. +type RDCloser interface { + redis.Cmdable + io.Closer +} + +type RedisCloserOptions struct { + Logger *zap.Logger + URLs []string + ClusterEnabled bool + Password string +} + +func NewRedisCloser(opts *RedisCloserOptions) (RDCloser, error) { + if len(opts.URLs) == 0 { + return nil, fmt.Errorf("no redis URLs provided") + } + + var rdb RDCloser + // If provided, prefer cluster URLs to single URL + if opts.ClusterEnabled { + opts.Logger.Info("Detected that redis is running in cluster mode.") + strippedUrls := []string{} + for _, url := range opts.URLs { + strippedUrls = append(strippedUrls, strings.ReplaceAll(url, "redis://", "")) + } + rdb = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: strippedUrls, + Password: opts.Password, + }) + } else { + options, err := redis.ParseURL(opts.URLs[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse the redis connection url: %w", err) + } + options.Password = opts.Password + rdb = redis.NewClient(options) + + if isClusterClient(rdb) { + opts.Logger.Warn("Detected that redis is running in cluster mode. You may encounter issues as a result") + } + } + + if !IsFunctioningClient(rdb) { + return rdb, fmt.Errorf("failed to create a functioning redis client") + } + + return rdb, nil +} + +func IsFunctioningClient(rdb RDCloser) bool { + if rdb == nil { + return false + } + + res, err := rdb.Ping(context.Background()).Result() + return err == nil && res == "PONG" +} + +func isClusterClient(rdb RDCloser) bool { + if rdb == nil { + return false + } + + info, err := rdb.Info(context.Background(), "cluster").Result() + if err != nil { + return false + } + + // Check if the response indicates cluster mode + return strings.Contains(info, "cluster_enabled:1") +} diff --git a/router/internal/persistedoperation/operationstorage/redis/rdcloser_test.go b/router/internal/persistedoperation/operationstorage/redis/rdcloser_test.go new file mode 100644 index 0000000000..3390f5ae5d --- /dev/null +++ b/router/internal/persistedoperation/operationstorage/redis/rdcloser_test.go @@ -0,0 +1,46 @@ +package rd + +import ( + "fmt" + "github.com/alicebob/miniredis/v2" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "testing" +) + +func TestRedisCloser(t *testing.T) { + t.Parallel() + + t.Run("fails if no urls provided", func(t *testing.T) { + _, err := NewRedisCloser(&RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + }) + + require.Error(t, err) + require.ErrorContains(t, err, "no redis URLs provided") + }) + + t.Run("Creates default client for normal redis", func(t *testing.T) { + mr := miniredis.RunT(t) + + cl, err := NewRedisCloser(&RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + URLs: []string{fmt.Sprintf("redis://%s", mr.Addr())}, + }) + + require.NoError(t, err) + require.NotNil(t, cl) + require.True(t, IsFunctioningClient(cl)) + require.False(t, isClusterClient(cl)) + }) + + t.Run("Unresponsive redis fails", func(t *testing.T) { + _, err := NewRedisCloser(&RedisCloserOptions{ + Logger: zaptest.NewLogger(t), + URLs: []string{"redis://localhost:7000"}, + }) + + require.Error(t, err) + require.ErrorContains(t, err, "failed to create a functioning redis client") + }) +} diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 2322a14bfb..c3c92a4752 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -445,8 +445,9 @@ type RateLimitErrorExtensionCode struct { } type RedisConfiguration struct { - Url string `yaml:"url,omitempty" envDefault:"redis://localhost:6379" env:"RATE_LIMIT_REDIS_URL"` - KeyPrefix string `yaml:"key_prefix,omitempty" envDefault:"cosmo_rate_limit" env:"RATE_LIMIT_REDIS_KEY_PREFIX"` + URLs []string `yaml:"urls,omitempty" env:"RATE_LIMIT_REDIS_URLS"` + ClusterEnabled bool `yaml:"cluster_enabled,omitempty" envDefault:"false" env:"RATE_LIMIT_REDIS_CLUSTER_ENABLED"` + KeyPrefix string `yaml:"key_prefix,omitempty" envDefault:"cosmo_rate_limit" env:"RATE_LIMIT_REDIS_KEY_PREFIX"` } type RateLimitSimpleStrategy struct { @@ -619,9 +620,9 @@ type SubgraphErrorPropagationConfiguration struct { } type StorageProviders struct { - S3 []S3StorageProvider `yaml:"s3,omitempty"` - CDN []BaseStorageProvider `yaml:"cdn,omitempty"` - Redis []BaseStorageProvider `yaml:"redis,omitempty"` + S3 []S3StorageProvider `yaml:"s3,omitempty"` + CDN []BaseStorageProvider `yaml:"cdn,omitempty"` + Redis []RedisStorageProvider `yaml:"redis,omitempty"` } type PersistedOperationsStorageConfig struct { @@ -650,8 +651,9 @@ type BaseStorageProvider struct { } type RedisStorageProvider struct { - ID string `yaml:"id,omitempty" env:"STORAGE_PROVIDER_REDIS_ID"` - URL string `yaml:"url,omitempty" env:"STORAGE_PROVIDER_REDIS_URL"` + ID string `yaml:"id,omitempty" env:"STORAGE_PROVIDER_REDIS_ID"` + URLs []string `yaml:"urls,omitempty" env:"STORAGE_PROVIDER_REDIS_URLS"` + ClusterEnabled bool `yaml:"cluster_enabled,omitempty" envDefault:"false" env:"STORAGE_PROVIDER_REDIS_CLUSTER_ENABLED"` } type PersistedOperationsCDNProvider struct { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index a9c2b35163..ab51a335d9 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -56,16 +56,25 @@ "type": "array", "items": { "type": "object", - "required": ["url", "id"], + "required": ["urls", "id"], "additionalProperties": false, "properties": { "id": { "type": "string", "description": "The ID of the redis provider. The ID is used to identify the storage provider in the configuration." }, - "url": { - "type": "string", - "description": "The URL of the redis storage. The URL is specified as a string with the format 'scheme://host:port, with auth included if necessary'." + "urls": { + "type": "array", + "description": "The cluster connection URLs. The values are specified as a string with the format 'scheme://host:port'. If cluster enabled, will use these urls for a cluster connection. If cluster is not enabled, we will just select the first URL provided.", + "default": [], + "items": { + "type": "string" + } + }, + "cluster_enabled": { + "type": "boolean", + "description": "Enable Redis Cluster connection, using the supplied URLs.", + "default": false } } } @@ -1603,12 +1612,20 @@ "storage": { "type": "object", "additionalProperties": false, + "required": ["urls"], "properties": { - "url": { - "type": "string", - "description": "The connection URL. The value is specified as a string with the format 'scheme://host:port'.", - "default": "redis://localhost:6379", - "format": "url" + "cluster_enabled": { + "type": "boolean", + "description": "Enable Redis Cluster connection, using the supplied URLs.", + "default": false + }, + "urls": { + "type": "array", + "description": "The Redis connection URLs. The values are specified as a string with the format 'scheme://host:port'. If cluster is enabled, will use them to instantiate a cluster connection. f", + "default": [], + "items": { + "type": "string" + } }, "key_prefix": { "type": "string", diff --git a/router/pkg/config/fixtures/full.yaml b/router/pkg/config/fixtures/full.yaml index c543ab8568..9e2b8c1a9e 100644 --- a/router/pkg/config/fixtures/full.yaml +++ b/router/pkg/config/fixtures/full.yaml @@ -282,7 +282,10 @@ rate_limit: enabled: true strategy: "simple" storage: - url: "redis://:test@localhost:6379" + cluster_enabled: true + urls: + - "test@localhost:8000" + - "test2@localhost:8001" key_prefix: "cosmo_rate_limit" simple_strategy: rate: 60 @@ -335,7 +338,10 @@ storage_providers: secure: false redis: - id: "my_redis" - url: "redis://:test@localhost:6379" + cluster_enabled: false + urls: + - "test@localhost:8000" + - "test2@localhost:8001" security: complexity_calculation_cache: diff --git a/router/pkg/config/testdata/config_defaults.json b/router/pkg/config/testdata/config_defaults.json index 3d353b379b..af0135d47d 100644 --- a/router/pkg/config/testdata/config_defaults.json +++ b/router/pkg/config/testdata/config_defaults.json @@ -203,7 +203,8 @@ "HideStatsFromResponseExtension": false }, "Storage": { - "Url": "redis://localhost:6379", + "URLs": null, + "ClusterEnabled": false, "KeyPrefix": "cosmo_rate_limit" }, "Debug": false, diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index e99cb1719b..cd91fba4e5 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -399,7 +399,11 @@ "HideStatsFromResponseExtension": false }, "Storage": { - "Url": "redis://:test@localhost:6379", + "URLs": [ + "test@localhost:8000", + "test2@localhost:8001" + ], + "ClusterEnabled": true, "KeyPrefix": "cosmo_rate_limit" }, "Debug": false, @@ -620,7 +624,11 @@ "Redis": [ { "ID": "my_redis", - "URL": "redis://:test@localhost:6379" + "URLs": [ + "test@localhost:8000", + "test2@localhost:8001" + ], + "ClusterEnabled": false } ] },