diff --git a/.github/workflows/router-ci.yaml b/.github/workflows/router-ci.yaml index ab6ec7b26e..7b3978b486 100644 --- a/.github/workflows/router-ci.yaml +++ b/.github/workflows/router-ci.yaml @@ -187,11 +187,50 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 5 - credentials: - username: ${{secrets.DOCKER_USERNAME}} - password: ${{secrets.DOCKER_PASSWORD}} ports: - 6379:6379 + redis-0: + image: bitnamilegacy/redis-cluster:7.2 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + env: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-0 redis-1 redis-2" + REDIS_CLUSTER_REPLICAS: "0" + REDIS_CLUSTER_CREATOR: "no" + ports: + - 7001:6379 + redis-1: + image: bitnamilegacy/redis-cluster:7.2 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + env: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-0 redis-1 redis-2" + REDIS_CLUSTER_REPLICAS: "0" + REDIS_CLUSTER_CREATOR: "no" + ports: + - 7002:6379 + redis-2: + image: bitnamilegacy/redis-cluster:7.2 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + env: + ALLOW_EMPTY_PASSWORD: "yes" + REDIS_NODES: "redis-0 redis-1 redis-2" + REDIS_CLUSTER_REPLICAS: "0" + REDIS_CLUSTER_CREATOR: "yes" + ports: + - 7003:6379 kafka: image: bitnamilegacy/kafka:3.7.0 options: >- @@ -214,6 +253,45 @@ jobs: ports: - '9092:9092' steps: + - name: Wait for Redis Cluster + run: | + echo "[CHECK] Waiting for Redis Cluster to become healthy..." + cluster_containers=$(docker ps --quiet --filter "ancestor=bitnamilegacy/redis-cluster:7.2") + + success=0 + for i in {1..30}; do + if [ $i -eq 1 ]; then + echo "[INIT] Forcing cluster creation..." + # pick one container as the "creator" + creator=$(echo $cluster_containers | awk '{print $1}') + # run the cluster create command inside it + docker exec "$creator" redis-cli --cluster create redis-0:6379 redis-1:6379 redis-2:6379 --cluster-replicas 0 --cluster-yes || true + fi + + for cid in $cluster_containers; do + docker exec "$cid" redis-cli -p 6379 cluster info + if docker exec "$cid" redis-cli -p 6379 cluster info 2>/dev/null | grep -q "cluster_state:ok"; then + echo "[SUCCESS] Redis Cluster is ready (reported by $cid)" + success=1 + break 2 + fi + done + + echo "[WAITING] Cluster not ready yet (attempt $i)..." + sleep 2 + done + + if [ $success -eq 0 ]; then + echo "[ERROR] Redis Cluster did not become healthy in time" + for cid in $cluster_containers; do + echo "--- Cluster info for $cid ---" + docker exec "$cid" redis-cli -p 6379 cluster info || true + docker exec "$cid" redis-cli -p 6379 cluster nodes || true + echo "--- Logs for $cid ---" + docker logs "$cid" | tail -n 100 + done + exit 1 + fi - uses: actions/checkout@v4 - uses: ./.github/actions/go with: @@ -224,39 +302,26 @@ jobs: - name: Install dependencies working-directory: ./router-tests run: go mod download - - name: Setup Redis Cluster (for Cluster tests) - uses: vishnudxb/redis-cluster@1.0.9 - with: - master1-port: 7001 - master2-port: 7002 - master3-port: 7003 - slave1-port: 7004 - slave2-port: 7005 - slave3-port: 7006 - sleep-duration: 5 - name: Configure Redis Authentication & ACL run: | - sudo apt-get install -y redis-tools docker ps -a # Set a password for each master node - for port in 7001 7002 7003; do - redis-cli -h 127.0.0.1 -p $port ACL SETUSER cosmo on ">test" "~*" "+@all" - redis-cli -u "redis://cosmo:test@127.0.0.1:$port" ping - echo "ACL user 'cosmo' created with full access on port $port" + for cid in $(docker ps --format "{{.ID}} {{.Image}}" | grep "redis-cluster" | awk '{print $1}'); do + echo "Configuring ACLs in container $cid" + docker exec "$cid" redis-cli -p 6379 ACL SETUSER cosmo on ">test" "~*" "+@all" + docker exec "$cid" redis-cli -p 6379 ping done + cid=$(docker ps --format "{{.ID}} {{.Image}}" | grep "redis:7" | awk '{print $1}') + # Sanity checks + docker exec "$cid" redis-cli -p 6379 ping + docker exec "$cid" redis-cli -u "redis://cosmo:test@redis-0:6379" ping + docker exec "$cid" redis-cli -u "redis://cosmo:test@redis-0:6379" cluster nodes - name: Run Integration tests ${{ matrix.test_target }} working-directory: ./router-tests - run: make test test_params="-run '^Test[^(Flaky)]' --timeout=5m -p 1 --parallel 10" test_target="${{ matrix.test_target }}" + run: make test test_retry_count=0 test_params="-run '^Test[^(Flaky)]' --timeout=5m -p 1 --parallel 10" test_target="${{ matrix.test_target }}" - name: Run Flaky Integration tests ${{ matrix.test_target }} - uses: nick-fields/retry@v3 - with: - timeout_minutes: 30 - max_attempts: 5 - retry_wait_seconds: 5 - retry_on: error - command: | - cd router-tests - make test test_params="-run '^TestFlaky' --timeout=5m --parallel 1" test_target="${{ matrix.test_target }}" + working-directory: ./router-tests + run: make test test_retry_count=3 test_params="-run '^TestFlaky' --timeout=5m -p 1 --parallel 10" test_target="${{ matrix.test_target }}" image_scan: if: github.event.pull_request.head.repo.full_name == github.repository diff --git a/Makefile b/Makefile index 2017d6343b..03815cdc5f 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ setup-build-tools: go install github.com/bufbuild/buf/cmd/buf@v1.32.2 go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.34.2 go install connectrpc.com/connect/cmd/protoc-gen-connect-go@v1.16.2 - go install gotest.tools/gotestsum@v1.12.3 + go install gotest.tools/gotestsum@v1.13.0 setup-dev-tools: setup-build-tools go install github.com/amacneil/dbmate/v2@v2.6.0 diff --git a/router-tests/Makefile b/router-tests/Makefile index 5974bb03d4..a35fe2eb0f 100644 --- a/router-tests/Makefile +++ b/router-tests/Makefile @@ -1,5 +1,6 @@ SHELL := bash test_target := ./... +test_retry_count ?= 0 ifeq ($(CI),true) FORMAT := github-actions @@ -11,7 +12,7 @@ test-deps: $(MAKE) -C ../demo plugin-build-ci test: test-deps - gotestsum -f $(FORMAT) -- -ldflags=-extldflags=-Wl,-ld_classic $(test_params) -race $(test_target) + gotestsum --rerun-fails="$(test_retry_count)" --packages="$(test_target)" -f $(FORMAT) -- -ldflags=-extldflags=-Wl,-ld_classic $(test_params) -race $(test_target) update-snapshot: go test -update -race $(test_target) diff --git a/router-tests/batch_test.go b/router-tests/batch_test.go index aa3a7527a8..bde151cf63 100644 --- a/router-tests/batch_test.go +++ b/router-tests/batch_test.go @@ -562,52 +562,6 @@ func TestBatch(t *testing.T) { t.Run("Batch Tracing", func(t *testing.T) { t.Parallel() - t.Run("Verify primary root span attributes for batch request", func(t *testing.T) { - t.Parallel() - - metricReader := metric.NewManualReader() - exporter := tracetest.NewInMemoryExporter(t) - defer exporter.Reset() - - testenv.Run(t, &testenv.Config{ - TraceExporter: exporter, - MetricReader: metricReader, - BatchingConfig: config.BatchingConfig{ - Enabled: true, - MaxConcurrency: 10, - MaxEntriesPerBatch: 100, - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - operations := []testenv.GraphQLRequest{ - { - Query: `query employees { employees { id } }`, - }, - { - Query: `query employees { employees { id } }`, - }, - { - Query: `query employee { employees { isAvailable } }`, - }, - } - _, err := xEnv.MakeGraphQLBatchedRequestRequest(operations, nil) - require.NoError(t, err) - - sn := exporter.GetSpans().Snapshots() - require.Len(t, sn, 29) - rootSpan := sn[len(sn)-1] - - rootSpanAttributes := rootSpan.Attributes() - require.Contains(t, rootSpanAttributes, otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) - require.Contains(t, rootSpanAttributes, otel.WgRouterRootSpan.Bool(true)) - require.Contains(t, rootSpanAttributes, otel.WgIsBatchingOperation.Bool(true)) - require.Contains(t, rootSpanAttributes, otel.WgBatchingOperationsCount.Int(len(operations))) - - require.Contains(t, rootSpanAttributes, otel.WgOperationHash.String("12924042114100782429")) - require.Contains(t, rootSpanAttributes, otel.WgClientName.String("unknown")) - require.Contains(t, rootSpanAttributes, otel.WgClientVersion.String("missing")) - }) - }) - t.Run("Verify all root span attributes for batch requests", func(t *testing.T) { t.Parallel() @@ -793,6 +747,61 @@ func TestBatch(t *testing.T) { } +func TestFlakyBatch(t *testing.T) { + t.Parallel() + + t.Run("Batch Tracing", func(t *testing.T) { + t.Parallel() + + t.Run("Verify primary root span attributes for batch request", func(t *testing.T) { + t.Parallel() + + metricReader := metric.NewManualReader() + exporter := tracetest.NewInMemoryExporter(t) + defer exporter.Reset() + + testenv.Run(t, &testenv.Config{ + TraceExporter: exporter, + MetricReader: metricReader, + BatchingConfig: config.BatchingConfig{ + Enabled: true, + MaxConcurrency: 10, + MaxEntriesPerBatch: 100, + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + operations := []testenv.GraphQLRequest{ + { + Query: `query employees { employees { id } }`, + }, + { + Query: `query employees { employees { id } }`, + }, + { + Query: `query employee { employees { isAvailable } }`, + }, + } + _, err := xEnv.MakeGraphQLBatchedRequestRequest(operations, nil) + require.NoError(t, err) + + sn := exporter.GetSpans().Snapshots() + require.Len(t, sn, 29) + rootSpan := sn[len(sn)-1] + + rootSpanAttributes := rootSpan.Attributes() + require.Contains(t, rootSpanAttributes, otel.WgRouterConfigVersion.String(xEnv.RouterConfigVersionMain())) + require.Contains(t, rootSpanAttributes, otel.WgRouterRootSpan.Bool(true)) + require.Contains(t, rootSpanAttributes, otel.WgIsBatchingOperation.Bool(true)) + require.Contains(t, rootSpanAttributes, otel.WgBatchingOperationsCount.Int(len(operations))) + + require.Contains(t, rootSpanAttributes, otel.WgOperationHash.String("12924042114100782429")) + require.Contains(t, rootSpanAttributes, otel.WgClientName.String("unknown")) + require.Contains(t, rootSpanAttributes, otel.WgClientVersion.String("missing")) + }) + }) + + }) +} + func getChildSpanDetails(directChildSpans []sdktrace.ReadOnlySpan) ([]string, []string) { var operationNumberAttrs = make([]string, 0, len(directChildSpans)) var retrievedSpanNames = make([]string, 0, len(directChildSpans)) diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index 9e1558db24..b94520ae6a 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -1208,187 +1208,6 @@ func TestNatsEvents(t *testing.T) { }) }) - t.Run("subscribe ws with filter", func(t *testing.T) { - t.Parallel() - - testenv.Run(t, &testenv.Config{ - RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, - EnableNats: true, - ModifyEngineExecutionConfiguration: func(engineExecutionConfiguration *config.EngineExecutionConfiguration) { - engineExecutionConfiguration.WebSocketClientReadTimeout = time.Second - }, - }, func(t *testing.T, xEnv *testenv.Environment) { - type subscriptionPayload struct { - Data struct { - FilteredEmployeeUpdated struct { - ID float64 `graphql:"id"` - Details struct { - Forename string `graphql:"forename"` - Surname string `graphql:"surname"` - } `graphql:"details"` - } `graphql:"filteredEmployeeUpdated(id: 1)"` - } `json:"data"` - } - - // conn.Close() is called in a cleanup defined in the function - conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil) - err := conn.WriteJSON(&testenv.WebSocketMessage{ - ID: "1", - Type: "subscribe", - Payload: []byte(`{"query":"subscription { filteredEmployeeUpdated(id: 1) { id details { forename, surname } } }"}`), - }) - - require.NoError(t, err) - - xEnv.WaitForSubscriptionCount(1, NatsWaitTimeout) - - testData := map[uint32]struct{ forename, surname string }{ - 1: {forename: "Jens", surname: "Neuse"}, - 3: {forename: "Stefan", surname: "Avram"}, - 4: {forename: "Björn", surname: "Schwenzer"}, - 5: {forename: "Sergiy", surname: "Petrunin"}, - 7: {forename: "Suvij", surname: "Surya"}, - 8: {forename: "Nithin", surname: "Kumar"}, - 11: {forename: "Alexandra", surname: "Neuse"}, - } - - var msg testenv.WebSocketMessage - var payload subscriptionPayload - // This loop is used to test the filter - // It will emit 12 events, and only 7 of them should be included: - // 1, 3, 4, 5, 7, 8, and 11 - for i := uint32(1); i < 13; i++ { - err = xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.1"), []byte(fmt.Sprintf(`{"id":%d,"__typename":"Employee"}`, i))) - require.NoError(t, err) - err = xEnv.NatsConnectionDefault.Flush() - require.NoError(t, err) - - // Should get the message only for the events that should be included - // if some message is not filtered out, the test will fail - switch i { - case 1, 3, 4, 5, 7, 8, 11: - gErr := conn.ReadJSON(&msg) - require.NoError(t, gErr) - require.Equal(t, "1", msg.ID) - require.Equal(t, "next", msg.Type) - gErr = json.Unmarshal(msg.Payload, &payload) - require.NoError(t, gErr) - require.Equal(t, float64(i), payload.Data.FilteredEmployeeUpdated.ID) - require.Equal(t, testData[i].forename, payload.Data.FilteredEmployeeUpdated.Details.Forename) - require.Equal(t, testData[i].surname, payload.Data.FilteredEmployeeUpdated.Details.Surname) - } - } - }) - }) - - t.Run("subscribe sse with filter", func(t *testing.T) { - t.Parallel() - - testenv.Run(t, &testenv.Config{ - RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, - EnableNats: true, - }, func(t *testing.T, xEnv *testenv.Environment) { - - subscribePayload := []byte(`{"query":"subscription { filteredEmployeeUpdated(id: 1) { id details { forename surname } } }"}`) - - client := http.Client{} - req, gErr := http.NewRequest(http.MethodPost, xEnv.GraphQLRequestURL(), bytes.NewReader(subscribePayload)) - require.NoError(t, gErr) - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "text/event-stream") - req.Header.Set("Connection", "keep-alive") - req.Header.Set("Cache-Control", "no-cache") - - var clientDoCh = make(chan struct { - resp *http.Response - err error - }) - go func() { - resp, gErr := client.Do(req) - clientDoCh <- struct { - resp *http.Response - err error - }{resp, gErr} - }() - - xEnv.WaitForSubscriptionCount(1, NatsWaitTimeout) - - // Trigger the subscription via NATS - err := xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.1"), []byte(`{"id":1,"__typename": "Employee"}`)) - require.NoError(t, err) - - err = xEnv.NatsConnectionDefault.Flush() - require.NoError(t, err) - - var resp *http.Response - - testenv.AwaitChannelWithT(t, NatsWaitTimeout, clientDoCh, func(t *testing.T, clientDo struct { - resp *http.Response - err error - }) { - resp = clientDo.resp - require.NoError(t, clientDo.err) - }) - - require.Equal(t, http.StatusOK, resp.StatusCode) - defer resp.Body.Close() - - require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type")) - require.Equal(t, "no-cache", resp.Header.Get("Cache-Control")) - require.Equal(t, "keep-alive", resp.Header.Get("Connection")) - require.Equal(t, "no", resp.Header.Get("X-Accel-Buffering")) - - reader := bufio.NewReader(resp.Body) - - testData := map[int]struct{ forename, surname string }{ - 1: {forename: "Jens", surname: "Neuse"}, - 3: {forename: "Stefan", surname: "Avram"}, - 4: {forename: "Björn", surname: "Schwenzer"}, - 5: {forename: "Sergiy", surname: "Petrunin"}, - 7: {forename: "Suvij", surname: "Surya"}, - 8: {forename: "Nithin", surname: "Kumar"}, - 11: {forename: "Alexandra", surname: "Neuse"}, - } - - eventNext, _, gErr := reader.ReadLine() - require.NoError(t, gErr) - require.Equal(t, "event: next", string(eventNext)) - data, _, gErr := reader.ReadLine() - require.NoError(t, gErr) - require.Equal(t, fmt.Sprintf("data: {\"data\":{\"filteredEmployeeUpdated\":{\"id\":%d,\"details\":{\"forename\":\"%s\",\"surname\":\"%s\"}}}}", 1, testData[1].forename, testData[1].surname), string(data)) - line, _, gErr := reader.ReadLine() - require.NoError(t, gErr) - require.Equal(t, "", string(line)) - - // This loop is used to test the filter - // It will emit 12 events, and only 7 of them should be included: - // 1, 3, 4, 5, 7, 8, and 11 - for i := 1; i < 13; i++ { - err = xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.1"), []byte(fmt.Sprintf(`{"id":%d,"__typename": "Employee"}`, i))) - require.NoError(t, err) - - err = xEnv.NatsConnectionDefault.Flush() - require.NoError(t, err) - - // Should get the message only for the events that should be included - // if some message is not filtered out, the test will fail - switch i { - case 1, 3, 4, 5, 7, 8, 11: - eventNext, _, gErr = reader.ReadLine() - require.NoError(t, gErr) - require.Equal(t, "event: next", string(eventNext)) - data, _, gErr = reader.ReadLine() - require.NoError(t, gErr) - require.Equal(t, fmt.Sprintf("data: {\"data\":{\"filteredEmployeeUpdated\":{\"id\":%d,\"details\":{\"forename\":\"%s\",\"surname\":\"%s\"}}}}", i, testData[i].forename, testData[i].surname), string(data)) - line, _, gErr = reader.ReadLine() - require.NoError(t, gErr) - require.Equal(t, "", string(line)) - } - } - }) - }) - t.Run("message with invalid JSON should give a specific error", func(t *testing.T) { t.Parallel() @@ -1718,3 +1537,188 @@ func TestNatsEvents(t *testing.T) { }) }) } + +func TestFlakyNatsEvents(t *testing.T) { + t.Parallel() + + t.Run("subscribe sse with filter", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + + subscribePayload := []byte(`{"query":"subscription { filteredEmployeeUpdated(id: 1) { id details { forename surname } } }"}`) + + client := http.Client{} + req, gErr := http.NewRequest(http.MethodPost, xEnv.GraphQLRequestURL(), bytes.NewReader(subscribePayload)) + require.NoError(t, gErr) + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Connection", "keep-alive") + req.Header.Set("Cache-Control", "no-cache") + + var clientDoCh = make(chan struct { + resp *http.Response + err error + }) + go func() { + resp, gErr := client.Do(req) + clientDoCh <- struct { + resp *http.Response + err error + }{resp, gErr} + }() + + xEnv.WaitForSubscriptionCount(1, NatsWaitTimeout) + + // Trigger the subscription via NATS + err := xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.1"), []byte(`{"id":1,"__typename": "Employee"}`)) + require.NoError(t, err) + + err = xEnv.NatsConnectionDefault.Flush() + require.NoError(t, err) + + var resp *http.Response + + testenv.AwaitChannelWithT(t, NatsWaitTimeout, clientDoCh, func(t *testing.T, clientDo struct { + resp *http.Response + err error + }) { + resp = clientDo.resp + require.NoError(t, clientDo.err) + }) + + require.Equal(t, http.StatusOK, resp.StatusCode) + defer resp.Body.Close() + + require.Equal(t, "text/event-stream", resp.Header.Get("Content-Type")) + require.Equal(t, "no-cache", resp.Header.Get("Cache-Control")) + require.Equal(t, "keep-alive", resp.Header.Get("Connection")) + require.Equal(t, "no", resp.Header.Get("X-Accel-Buffering")) + + reader := bufio.NewReader(resp.Body) + + testData := map[int]struct{ forename, surname string }{ + 1: {forename: "Jens", surname: "Neuse"}, + 3: {forename: "Stefan", surname: "Avram"}, + 4: {forename: "Björn", surname: "Schwenzer"}, + 5: {forename: "Sergiy", surname: "Petrunin"}, + 7: {forename: "Suvij", surname: "Surya"}, + 8: {forename: "Nithin", surname: "Kumar"}, + 11: {forename: "Alexandra", surname: "Neuse"}, + } + + eventNext, _, gErr := reader.ReadLine() + require.NoError(t, gErr) + require.Equal(t, "event: next", string(eventNext)) + data, _, gErr := reader.ReadLine() + require.NoError(t, gErr) + require.Equal(t, fmt.Sprintf("data: {\"data\":{\"filteredEmployeeUpdated\":{\"id\":%d,\"details\":{\"forename\":\"%s\",\"surname\":\"%s\"}}}}", 1, testData[1].forename, testData[1].surname), string(data)) + line, _, gErr := reader.ReadLine() + require.NoError(t, gErr) + require.Equal(t, "", string(line)) + + // This loop is used to test the filter + // It will emit 12 events, and only 7 of them should be included: + // 1, 3, 4, 5, 7, 8, and 11 + for i := 1; i < 13; i++ { + err = xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.1"), []byte(fmt.Sprintf(`{"id":%d,"__typename": "Employee"}`, i))) + require.NoError(t, err) + + err = xEnv.NatsConnectionDefault.Flush() + require.NoError(t, err) + + // Should get the message only for the events that should be included + // if some message is not filtered out, the test will fail + switch i { + case 1, 3, 4, 5, 7, 8, 11: + eventNext, _, gErr = reader.ReadLine() + require.NoError(t, gErr) + require.Equal(t, "event: next", string(eventNext)) + data, _, gErr = reader.ReadLine() + require.NoError(t, gErr) + require.Equal(t, fmt.Sprintf("data: {\"data\":{\"filteredEmployeeUpdated\":{\"id\":%d,\"details\":{\"forename\":\"%s\",\"surname\":\"%s\"}}}}", i, testData[i].forename, testData[i].surname), string(data)) + line, _, gErr = reader.ReadLine() + require.NoError(t, gErr) + require.Equal(t, "", string(line)) + } + } + }) + }) + + t.Run("subscribe ws with filter", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + ModifyEngineExecutionConfiguration: func(engineExecutionConfiguration *config.EngineExecutionConfiguration) { + engineExecutionConfiguration.WebSocketClientReadTimeout = time.Second + }, + }, func(t *testing.T, xEnv *testenv.Environment) { + type subscriptionPayload struct { + Data struct { + FilteredEmployeeUpdated struct { + ID float64 `graphql:"id"` + Details struct { + Forename string `graphql:"forename"` + Surname string `graphql:"surname"` + } `graphql:"details"` + } `graphql:"filteredEmployeeUpdated(id: 1)"` + } `json:"data"` + } + + // conn.Close() is called in a cleanup defined in the function + conn := xEnv.InitGraphQLWebSocketConnection(nil, nil, nil) + err := conn.WriteJSON(&testenv.WebSocketMessage{ + ID: "1", + Type: "subscribe", + Payload: []byte(`{"query":"subscription { filteredEmployeeUpdated(id: 1) { id details { forename, surname } } }"}`), + }) + + require.NoError(t, err) + + xEnv.WaitForSubscriptionCount(1, NatsWaitTimeout) + + testData := map[uint32]struct{ forename, surname string }{ + 1: {forename: "Jens", surname: "Neuse"}, + 3: {forename: "Stefan", surname: "Avram"}, + 4: {forename: "Björn", surname: "Schwenzer"}, + 5: {forename: "Sergiy", surname: "Petrunin"}, + 7: {forename: "Suvij", surname: "Surya"}, + 8: {forename: "Nithin", surname: "Kumar"}, + 11: {forename: "Alexandra", surname: "Neuse"}, + } + + var msg testenv.WebSocketMessage + var payload subscriptionPayload + // This loop is used to test the filter + // It will emit 12 events, and only 7 of them should be included: + // 1, 3, 4, 5, 7, 8, and 11 + for i := uint32(1); i < 13; i++ { + err = xEnv.NatsConnectionDefault.Publish(xEnv.GetPubSubName("employeeUpdated.1"), []byte(fmt.Sprintf(`{"id":%d,"__typename":"Employee"}`, i))) + require.NoError(t, err) + err = xEnv.NatsConnectionDefault.Flush() + require.NoError(t, err) + + // Should get the message only for the events that should be included + // if some message is not filtered out, the test will fail + switch i { + case 1, 3, 4, 5, 7, 8, 11: + gErr := conn.ReadJSON(&msg) + require.NoError(t, gErr) + require.Equal(t, "1", msg.ID) + require.Equal(t, "next", msg.Type) + gErr = json.Unmarshal(msg.Payload, &payload) + require.NoError(t, gErr) + require.Equal(t, float64(i), payload.Data.FilteredEmployeeUpdated.ID) + require.Equal(t, testData[i].forename, payload.Data.FilteredEmployeeUpdated.Details.Forename) + require.Equal(t, testData[i].surname, payload.Data.FilteredEmployeeUpdated.Details.Surname) + } + } + }) + }) +} diff --git a/router-tests/ratelimit_test.go b/router-tests/ratelimit_test.go index b7d3494770..d3189c5d95 100644 --- a/router-tests/ratelimit_test.go +++ b/router-tests/ratelimit_test.go @@ -700,10 +700,11 @@ func TestRateLimit(t *testing.T) { name: "should successfully use auth from first url", clusterUrlSlice: []string{"redis://cosmo:test@localhost:7003", "redis://cosmo1:test1@localhost:7001", "redis://cosmo2:test2@localhost:7002"}, }, - { - name: "should successfully use auth from later url if no auth in first urls", - clusterUrlSlice: []string{"redis://localhost:7003", "rediss://localhost:7001", "rediss://cosmo:test@localhost:7002"}, - }, + // TODO: Was false-positive: if password is configured for the cluster default user, it fails. Investigate & fix functionality as needed. (ENG-8270) + //{ + // name: "should successfully use auth from later url if no auth in first urls", + // clusterUrlSlice: []string{"redis://localhost:7003", "rediss://localhost:7001", "rediss://cosmo:test@localhost:7002"}, + //}, { name: "should successfully work with two urls", clusterUrlSlice: []string{"redis://cosmo:test@localhost:7002", "rediss://localhost:7001"},