diff --git a/router-tests/events/kafka_events_test.go b/router-tests/events/kafka_events_test.go index 37e54109a8..472ce27574 100644 --- a/router-tests/events/kafka_events_test.go +++ b/router-tests/events/kafka_events_test.go @@ -946,6 +946,23 @@ func TestKafkaEvents(t *testing.T) { }) }) + t.Run("mutate returns correct typename", func(t *testing.T) { + t.Parallel() + + topics := []string{"employeeUpdated"} + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsKafkaJSONTemplate, + EnableKafka: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + events.KafkaEnsureTopicExists(t, xEnv, KafkaWaitTimeout, topics...) + resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { __typename success } }`, + }) + require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"__typename":"edfs__PublishResult","success":true}}}`, resOne.Body) + }) + }) + t.Run("kafka startup and shutdown with wrong broker should not stop router from starting indefinitely", func(t *testing.T) { t.Parallel() diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index 12c82371d3..5d3b2ec2cf 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -1014,7 +1014,7 @@ func TestNatsEvents(t *testing.T) { }) }) - t.Run("publish", func(t *testing.T) { + t.Run("mutate", func(t *testing.T) { t.Parallel() testenv.Run(t, &testenv.Config{ @@ -1064,6 +1064,31 @@ func TestNatsEvents(t *testing.T) { }) }) + t.Run("mutate returns correct typename", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + sub, err := xEnv.NatsConnectionDefault.SubscribeSync(xEnv.GetPubSubName("employeeUpdatedMyNats.3")) + require.NoError(t, err) + require.NoError(t, xEnv.NatsConnectionDefault.Flush()) + + t.Cleanup(func() { _ = sub.Unsubscribe() }) + + resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `mutation UpdateEmployeeNats($update: UpdateEmployeeInput!) { + updateEmployeeMyNats(id: 3, update: $update) {__typename success} + }`, + Variables: json.RawMessage(`{"update":{"name":"Stefan Avramovic","email":"avramovic@wundergraph.com"}}`), + }) + + // Send a query to receive the response from the NATS message + require.Equal(t, `{"data":{"updateEmployeeMyNats":{"__typename":"edfs__PublishResult","success":true}}}`, resOne.Body) + }) + }) + t.Run("subscribe with stream and consumer", func(t *testing.T) { t.Parallel() diff --git a/router-tests/events/redis_events_test.go b/router-tests/events/redis_events_test.go index 2980ae61d5..715467ee98 100644 --- a/router-tests/events/redis_events_test.go +++ b/router-tests/events/redis_events_test.go @@ -936,6 +936,22 @@ func TestRedisEvents(t *testing.T) { } }) }) + + t.Run("mutate returns correct typename", func(t *testing.T) { + t.Parallel() + + testenv.Run(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsRedisJSONTemplate, + EnableRedis: true, + NoRetryClient: true, + }, func(t *testing.T, xEnv *testenv.Environment) { + // send a mutation to trigger the first subscription + resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ + Query: `mutation { updateEmployeeMyRedis(id: 3, update: {name: "name test"}) { __typename success } }`, + }) + require.JSONEq(t, `{"data":{"updateEmployeeMyRedis":{"__typename":"edfs__PublishResult","success":true}}}`, resOne.Body) + }) + }) } func TestRedisClusterEvents(t *testing.T) { diff --git a/router-tests/modules/stream_publish_test.go b/router-tests/modules/stream_publish_test.go index 35d9e16766..bc5187d400 100644 --- a/router-tests/modules/stream_publish_test.go +++ b/router-tests/modules/stream_publish_test.go @@ -120,9 +120,9 @@ func TestPublishHook(t *testing.T) { }, func(t *testing.T, xEnv *testenv.Environment) { events.KafkaEnsureTopicExists(t, xEnv, time.Second, "employeeUpdated") resOne := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{ - Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { success } }`, + Query: `mutation { updateEmployeeMyKafka(employeeID: 3, update: {name: "name test"}) { __typename success } }`, }) - require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"success":true}}}`, resOne.Body) + require.JSONEq(t, `{"data":{"updateEmployeeMyKafka":{"__typename": "edfs__PublishResult", "success":true}}}`, resOne.Body) assert.Equal(t, int32(1), customModule.HookCallCount.Load()) }) diff --git a/router/pkg/pubsub/kafka/engine_datasource.go b/router/pkg/pubsub/kafka/engine_datasource.go index d3ebb1fc5c..80b952efd9 100644 --- a/router/pkg/pubsub/kafka/engine_datasource.go +++ b/router/pkg/pubsub/kafka/engine_datasource.go @@ -265,10 +265,10 @@ func (s *PublishDataSource) Load(ctx context.Context, input []byte, out *bytes.B if err := s.pubSub.Publish(ctx, publishData.PublishEventConfiguration(), []datasource.StreamEvent{&Event{&publishData.Event}}); err != nil { // err will not be returned but only logged inside PubSubProvider.Publish to avoid a "unable to fetch from subgraph" error - _, errWrite := io.WriteString(out, `{"success": false}`) + _, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": false}`) return errWrite } - _, errWrite := io.WriteString(out, `{"success": true}`) + _, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": true}`) if errWrite != nil { return errWrite } diff --git a/router/pkg/pubsub/kafka/engine_datasource_factory_test.go b/router/pkg/pubsub/kafka/engine_datasource_factory_test.go index 5ceab4ae69..8a60a8fdb1 100644 --- a/router/pkg/pubsub/kafka/engine_datasource_factory_test.go +++ b/router/pkg/pubsub/kafka/engine_datasource_factory_test.go @@ -65,7 +65,7 @@ func TestEngineDataSourceFactoryWithMockAdapter(t *testing.T) { out := &bytes.Buffer{} err = ds.Load(context.Background(), []byte(input), out) require.NoError(t, err) - require.Equal(t, `{"success": true}`, out.String()) + require.Equal(t, `{"__typename": "edfs__PublishResult", "success": true}`, out.String()) } // TestEngineDataSourceFactory_GetResolveDataSource_WrongType tests the EngineDataSourceFactory with a mocked adapter diff --git a/router/pkg/pubsub/kafka/engine_datasource_test.go b/router/pkg/pubsub/kafka/engine_datasource_test.go index 5fb5808173..1dc805535d 100644 --- a/router/pkg/pubsub/kafka/engine_datasource_test.go +++ b/router/pkg/pubsub/kafka/engine_datasource_test.go @@ -108,7 +108,7 @@ func TestKafkaPublishDataSource_Load(t *testing.T) { })).Return(nil) }, expectError: false, - expectedOutput: `{"success": true}`, + expectedOutput: `{"__typename": "edfs__PublishResult", "success": true}`, expectPublished: true, }, { @@ -118,7 +118,7 @@ func TestKafkaPublishDataSource_Load(t *testing.T) { m.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("publish error")) }, expectError: false, // The Load method doesn't return the publish error directly - expectedOutput: `{"success": false}`, + expectedOutput: `{"__typename": "edfs__PublishResult", "success": false}`, expectPublished: true, }, { diff --git a/router/pkg/pubsub/nats/engine_datasource.go b/router/pkg/pubsub/nats/engine_datasource.go index 809f2aad43..b6855b09e3 100644 --- a/router/pkg/pubsub/nats/engine_datasource.go +++ b/router/pkg/pubsub/nats/engine_datasource.go @@ -246,10 +246,10 @@ func (s *NatsPublishDataSource) Load(ctx context.Context, input []byte, out *byt if err := s.pubSub.Publish(ctx, publishData.PublishEventConfiguration(), []datasource.StreamEvent{&Event{evt: &publishData.Event}}); err != nil { // err will not be returned but only logged inside PubSubProvider.Publish to avoid a "unable to fetch from subgraph" error - _, errWrite := io.WriteString(out, `{"success": false}`) + _, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": false}`) return errWrite } - _, err := io.WriteString(out, `{"success": true}`) + _, err := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": true}`) return err } diff --git a/router/pkg/pubsub/nats/engine_datasource_factory_test.go b/router/pkg/pubsub/nats/engine_datasource_factory_test.go index d6017404a7..0c45ced262 100644 --- a/router/pkg/pubsub/nats/engine_datasource_factory_test.go +++ b/router/pkg/pubsub/nats/engine_datasource_factory_test.go @@ -71,7 +71,7 @@ func TestEngineDataSourceFactoryWithMockAdapter(t *testing.T) { out := &bytes.Buffer{} err = ds.Load(context.Background(), []byte(input), out) require.NoError(t, err) - require.Equal(t, `{"success": true}`, out.String()) + require.Equal(t, `{"__typename": "edfs__PublishResult", "success": true}`, out.String()) } func TestEngineDataSourceFactory_GetResolveDataSource_WrongType(t *testing.T) { diff --git a/router/pkg/pubsub/nats/engine_datasource_test.go b/router/pkg/pubsub/nats/engine_datasource_test.go index 1f0818d305..10b41f8e3b 100644 --- a/router/pkg/pubsub/nats/engine_datasource_test.go +++ b/router/pkg/pubsub/nats/engine_datasource_test.go @@ -90,7 +90,7 @@ func TestNatsPublishDataSource_Load(t *testing.T) { })).Return(nil) }, expectError: false, - expectedOutput: `{"success": true}`, + expectedOutput: `{"__typename": "edfs__PublishResult", "success": true}`, expectPublished: true, }, { @@ -100,7 +100,7 @@ func TestNatsPublishDataSource_Load(t *testing.T) { m.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("publish error")) }, expectError: false, // The Load method doesn't return the publish error directly - expectedOutput: `{"success": false}`, + expectedOutput: `{"__typename": "edfs__PublishResult", "success": false}`, expectPublished: true, }, { diff --git a/router/pkg/pubsub/redis/engine_datasource.go b/router/pkg/pubsub/redis/engine_datasource.go index 41151df9f0..fe7956faef 100644 --- a/router/pkg/pubsub/redis/engine_datasource.go +++ b/router/pkg/pubsub/redis/engine_datasource.go @@ -228,10 +228,10 @@ func (s *PublishDataSource) Load(ctx context.Context, input []byte, out *bytes.B if err := s.pubSub.Publish(ctx, publishData.PublishEventConfiguration(), []datasource.StreamEvent{&Event{evt: &publishData.Event}}); err != nil { // err will not be returned but only logged inside PubSubProvider.Publish to avoid a "unable to fetch from subgraph" error - _, errWrite := io.WriteString(out, `{"success": false}`) + _, errWrite := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": false}`) return errWrite } - _, err := io.WriteString(out, `{"success": true}`) + _, err := io.WriteString(out, `{"__typename": "edfs__PublishResult", "success": true}`) return err } diff --git a/router/pkg/pubsub/redis/engine_datasource_factory_test.go b/router/pkg/pubsub/redis/engine_datasource_factory_test.go index 7dc4ade017..4f5a694a31 100644 --- a/router/pkg/pubsub/redis/engine_datasource_factory_test.go +++ b/router/pkg/pubsub/redis/engine_datasource_factory_test.go @@ -65,7 +65,7 @@ func TestEngineDataSourceFactoryWithMockAdapter(t *testing.T) { out := &bytes.Buffer{} err = ds.Load(context.Background(), []byte(input), out) require.NoError(t, err) - require.Equal(t, `{"success": true}`, out.String()) + require.Equal(t, `{"__typename": "edfs__PublishResult", "success": true}`, out.String()) } // TestEngineDataSourceFactory_GetResolveDataSource_WrongType tests the EngineDataSourceFactory with a mocked adapter diff --git a/router/pkg/pubsub/redis/engine_datasource_test.go b/router/pkg/pubsub/redis/engine_datasource_test.go index cc59d240f3..12205032ae 100644 --- a/router/pkg/pubsub/redis/engine_datasource_test.go +++ b/router/pkg/pubsub/redis/engine_datasource_test.go @@ -88,7 +88,7 @@ func TestRedisPublishDataSource_Load(t *testing.T) { })).Return(nil) }, expectError: false, - expectedOutput: `{"success": true}`, + expectedOutput: `{"__typename": "edfs__PublishResult", "success": true}`, expectPublished: true, }, { @@ -98,7 +98,7 @@ func TestRedisPublishDataSource_Load(t *testing.T) { m.On("Publish", mock.Anything, mock.Anything, mock.Anything).Return(errors.New("publish error")) }, expectError: false, // The Load method doesn't return the publish error directly - expectedOutput: `{"success": false}`, + expectedOutput: `{"__typename": "edfs__PublishResult", "success": false}`, expectPublished: true, }, {