diff --git a/.changesets/fix_renee_router_1343.md b/.changesets/fix_renee_router_1343.md new file mode 100644 index 0000000000..0154b181ff --- /dev/null +++ b/.changesets/fix_renee_router_1343.md @@ -0,0 +1,5 @@ +### Reliably distinguish GraphQL errors and transport errors in subscriptions ([PR #7901](https://github.com/apollographql/router/pull/7901)) + +The [Multipart HTTP protocol for GraphQL Subscriptions](https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol) distinguishes between GraphQL-level errors and fatal transport-level errors. The router previously used a heuristic to determine if a given error was fatal or not, which could sometimes cause errors to be wrongly classified. For example, if a subgraph returned a GraphQL-level error for a subscription and then immediately ended the subscription, the router might propagate this as a fatal transport-level error. + +This is now fixed. Fatal transport-level errors are tagged as such when they are constructed, so the router can reliably know how to serialize errors when sending them to the client. \ No newline at end of file diff --git a/.config/nextest.toml b/.config/nextest.toml index 9e9e37d84d..d14f9f6934 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -104,8 +104,6 @@ or ( binary_id(=apollo-router::integration_tests) & test(=integration::rhai::tes or ( binary_id(=apollo-router::integration_tests) & test(=integration::subgraph_response::test_invalid_error_locations_contains_negative_one_location) ) or ( binary_id(=apollo-router::integration_tests) & test(=integration::subgraph_response::test_valid_extensions_service_for_subgraph_error) ) or ( binary_id(=apollo-router::integration_tests) & test(=integration::subgraph_response::test_valid_extensions_service_is_preserved_for_subgraph_error) ) -or ( binary_id(=apollo-router::integration_tests) & test(=integration::subscriptions::callback::test_subscription_callback_pure_error_payload) ) -or ( binary_id(=apollo-router::integration_tests) & test(=integration::subscriptions::ws_passthrough::test_subscription_ws_passthrough_pure_error_payload) ) or ( binary_id(=apollo-router::integration_tests) & test(=integration::telemetry::datadog::test_basic) ) or ( binary_id(=apollo-router::integration_tests) & test(=integration::telemetry::datadog::test_priority_sampling_no_parent_propagated) ) or ( binary_id(=apollo-router::integration_tests) & test(=integration::telemetry::datadog::test_resource_mapping_default) ) diff --git a/apollo-router/src/protocols/multipart.rs b/apollo-router/src/protocols/multipart.rs index 4ccce664d2..c8c7331da9 100644 --- a/apollo-router/src/protocols/multipart.rs +++ b/apollo-router/src/protocols/multipart.rs @@ -12,6 +12,7 @@ use tokio_stream::once; use tokio_stream::wrappers::IntervalStream; use crate::graphql; +use crate::services::SUBSCRIPTION_ERROR_EXTENSION_KEY; #[cfg(test)] const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(10); @@ -115,27 +116,39 @@ impl Stream for Multipart { match self.mode { ProtocolMode::Subscription => { - let resp = SubscriptionPayload { - errors: if is_still_open { - Vec::new() - } else { - response.errors.drain(..).collect() - }, - payload: match response.data { - None | Some(Value::Null) if response.extensions.is_empty() => { - None - } - _ => (*response).into(), - }, - }; - - // Gracefully closed at the server side - if !is_still_open && resp.payload.is_none() && resp.errors.is_empty() { + let is_transport_error = + response.extensions.remove(SUBSCRIPTION_ERROR_EXTENSION_KEY) + == Some(true.into()); + // Magic empty response (that we create internally) means the connection was gracefully closed at the server side + if !is_still_open + && response.data.is_none() + && response.errors.is_empty() + && response.extensions.is_empty() + { self.is_terminated = true; return Poll::Ready(Some(Ok(Bytes::from_static(&b"--\r\n"[..])))); - } else { - serde_json::to_writer(&mut buf, &resp)?; } + + let response = if is_transport_error { + SubscriptionPayload { + errors: std::mem::take(&mut response.errors), + payload: match response.data { + None | Some(Value::Null) + if response.extensions.is_empty() => + { + None + } + _ => (*response).into(), + }, + } + } else { + SubscriptionPayload { + errors: Vec::new(), + payload: (*response).into(), + } + }; + + serde_json::to_writer(&mut buf, &response)?; } ProtocolMode::Defer => { serde_json::to_writer(&mut buf, &response)?; diff --git a/apollo-router/src/query_planner/execution.rs b/apollo-router/src/query_planner/execution.rs index 838ac56e5a..294ebc9bab 100644 --- a/apollo-router/src/query_planner/execution.rs +++ b/apollo-router/src/query_planner/execution.rs @@ -56,12 +56,17 @@ impl QueryPlan { &self, context: &'a Context, service_factory: &'a Arc, + // The original supergraph request is used to populate variable values and for plugin + // features like propagating headers or subgraph telemetry based on supergraph request + // values. supergraph_request: &'a Arc>, schema: &'a Arc, subgraph_schemas: &'a Arc, + // Sender for additional responses past the first one (@defer, @stream, subscriptions) sender: mpsc::Sender, subscription_handle: Option, subscription_config: &'a Option, + // Query plan execution builds up a JSON result value, use this as the initial data. initial_value: Option, ) -> Response { let root = Path::empty(); diff --git a/apollo-router/src/services/supergraph/service.rs b/apollo-router/src/services/supergraph/service.rs index 688005dca9..fe9c176d33 100644 --- a/apollo-router/src/services/supergraph/service.rs +++ b/apollo-router/src/services/supergraph/service.rs @@ -87,8 +87,11 @@ use crate::spec::Schema; use crate::spec::operation_limits::OperationLimits; pub(crate) const FIRST_EVENT_CONTEXT_KEY: &str = "apollo::supergraph::first_event"; +pub(crate) const SUBSCRIPTION_ERROR_EXTENSION_KEY: &str = "apollo::subscriptions::fatal_error"; const SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_CONFIG_RELOAD"; const SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE: &str = "SUBSCRIPTION_SCHEMA_RELOAD"; +const SUBSCRIPTION_JWT_EXPIRED_EXTENSION_CODE: &str = "SUBSCRIPTION_JWT_EXPIRED"; +const SUBSCRIPTION_EXECUTION_ERROR_EXTENSION_CODE: &str = "SUBSCRIPTION_EXECUTION_ERROR"; /// An [`IndexMap`] of available plugins. pub(crate) type Plugins = IndexMap>; @@ -464,6 +467,19 @@ pub struct SubscriptionTaskParams { pub(crate) stream_rx: ReceiverStream, } +fn subscription_fatal_error(message: impl Into, extension_code: &str) -> Response { + Response::builder() + .subscribed(false) + .extension(SUBSCRIPTION_ERROR_EXTENSION_KEY, true) + .error( + graphql::Error::builder() + .message(message) + .extension_code(extension_code) + .build(), + ) + .build() +} + #[allow(clippy::too_many_arguments)] async fn subscription_task( execution_service: execution::BoxCloneService, @@ -498,16 +514,10 @@ async fn subscription_task( }), _ => { let _ = sender - .send( - graphql::Response::builder() - .error( - graphql::Error::builder() - .message("cannot execute the subscription event") - .extension_code("SUBSCRIPTION_EXECUTION_ERROR") - .build(), - ) - .build(), - ) + .send(subscription_fatal_error( + "cannot execute the subscription event", + SUBSCRIPTION_EXECUTION_ERROR_EXTENSION_CODE, + )) .await; return; } @@ -564,23 +574,14 @@ async fn subscription_task( break; } _ = &mut timeout => { - let response = Response::builder() - .subscribed(false) - .error( - crate::error::Error::builder() - .message("subscription closed because the JWT has expired") - .extension_code("SUBSCRIPTION_JWT_EXPIRED") - .build(), - ) - .build(); - let _ = sender.send(response).await; + let _ = sender.send(subscription_fatal_error("subscription closed because the JWT has expired", SUBSCRIPTION_JWT_EXPIRED_EXTENSION_CODE)).await; break; }, message = receiver.next() => { match message { Some(mut val) => { val.created_at = Some(Instant::now()); - let res = dispatch_event(&supergraph_req, execution_service.clone(), query_plan.as_ref(), context.clone(), val, sender.clone()) + let res = dispatch_subscription_event(&supergraph_req, execution_service.clone(), query_plan.as_ref(), context.clone(), val, sender.clone()) .instrument(tracing::info_span!(SUBSCRIPTION_EVENT_SPAN_NAME, graphql.operation.name = %operation_name, otel.kind = "INTERNAL", @@ -588,7 +589,7 @@ async fn subscription_task( apollo_private.duration_ns = field::Empty,) ).await; if let Err(err) = res { - tracing::error!("cannot send the subscription to the client: {err:?}"); + tracing::error!("cannot send the subscription to the client: {err:?}"); break; } } @@ -597,32 +598,12 @@ async fn subscription_task( } Some(_new_configuration) = configuration_updated_rx.next() => { let _ = sender - .send( - Response::builder() - .subscribed(false) - .error( - graphql::Error::builder() - .message("subscription has been closed due to a configuration reload") - .extension_code(SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE) - .build(), - ) - .build(), - ) + .send(subscription_fatal_error("subscription has been closed due to a configuration reload", SUBSCRIPTION_CONFIG_RELOAD_EXTENSION_CODE)) .await; } Some(_new_schema) = schema_updated_rx.next() => { let _ = sender - .send( - Response::builder() - .subscribed(false) - .error( - graphql::Error::builder() - .message("subscription has been closed due to a schema reload") - .extension_code(SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE) - .build(), - ) - .build(), - ) + .send(subscription_fatal_error("subscription has been closed due to a schema reload", SUBSCRIPTION_SCHEMA_RELOAD_EXTENSION_CODE)) .await; break; @@ -636,7 +617,7 @@ async fn subscription_task( } } -async fn dispatch_event( +async fn dispatch_subscription_event( supergraph_req: &SupergraphRequest, execution_service: execution::BoxCloneService, query_plan: Option<&Arc>, @@ -666,16 +647,10 @@ async fn dispatch_event( Err(err) => { tracing::error!("cannot execute the subscription event: {err:?}"); let _ = sender - .send( - graphql::Response::builder() - .error( - graphql::Error::builder() - .message("cannot execute the subscription event") - .extension_code("SUBSCRIPTION_EXECUTION_ERROR") - .build(), - ) - .build(), - ) + .send(subscription_fatal_error( + "cannot execute the subscription event", + SUBSCRIPTION_EXECUTION_ERROR_EXTENSION_CODE, + )) .await; return Ok(()); } diff --git a/apollo-router/src/services/supergraph/snapshots/apollo_router__services__supergraph__tests__subscription_callback_schema_reload-3.snap b/apollo-router/src/services/supergraph/snapshots/apollo_router__services__supergraph__tests__subscription_callback_schema_reload-3.snap index 1aa194fb05..c5d49aaedf 100644 --- a/apollo-router/src/services/supergraph/snapshots/apollo_router__services__supergraph__tests__subscription_callback_schema_reload-3.snap +++ b/apollo-router/src/services/supergraph/snapshots/apollo_router__services__supergraph__tests__subscription_callback_schema_reload-3.snap @@ -1,6 +1,7 @@ --- source: apollo-router/src/services/supergraph/tests.rs expression: "tokio::time::timeout(Duration::from_secs(1),\nstream.next_response()).await.unwrap().unwrap()" +snapshot_kind: text --- { "data": null, @@ -11,5 +12,8 @@ expression: "tokio::time::timeout(Duration::from_secs(1),\nstream.next_response( "code": "SUBSCRIPTION_SCHEMA_RELOAD" } } - ] + ], + "extensions": { + "apollo::subscriptions::fatal_error": true + } } diff --git a/apollo-router/tests/integration/subscriptions/ws_passthrough.rs b/apollo-router/tests/integration/subscriptions/ws_passthrough.rs index 3354686dd5..5afe78dcef 100644 --- a/apollo-router/tests/integration/subscriptions/ws_passthrough.rs +++ b/apollo-router/tests/integration/subscriptions/ws_passthrough.rs @@ -17,106 +17,106 @@ use crate::integration::subscriptions::verify_subscription_events; /// Creates an expected subscription event payload for a schema reload fn create_expected_schema_reload_payload() -> serde_json::Value { serde_json::json!({ - "payload": null, - "errors": [ - { - "message": "subscription has been closed due to a schema reload", - "extensions": { - "code": "SUBSCRIPTION_SCHEMA_RELOAD" + "payload": null, + "errors": [ + { + "message": "subscription has been closed due to a schema reload", + "extensions": { + "code": "SUBSCRIPTION_SCHEMA_RELOAD" + } } - } - ] + ] }) } /// Creates an expected subscription event payload for a configuration reload fn create_expected_config_reload_payload() -> serde_json::Value { serde_json::json!({ - "payload": null, - "errors": [ - { - "message": "subscription has been closed due to a configuration reload", - "extensions": { - "code": "SUBSCRIPTION_CONFIG_RELOAD" + "payload": null, + "errors": [ + { + "message": "subscription has been closed due to a configuration reload", + "extensions": { + "code": "SUBSCRIPTION_CONFIG_RELOAD" + } } - } - ] + ] }) } /// Creates an expected subscription event payload for the given user number fn create_expected_user_payload(user_num: u32) -> serde_json::Value { serde_json::json!({ - "payload": { - "data": { - "userWasCreated": { - "name": format!("User {}", user_num), - "reviews": [{"body": format!("Review {} from user {}", user_num, user_num)}] + "payload": { + "data": { + "userWasCreated": { + "name": format!("User {}", user_num), + "reviews": [{"body": format!("Review {} from user {}", user_num, user_num)}] + } } } - } }) } /// Creates an expected subscription event payload with null userWasCreated (for empty/error payloads) fn create_expected_null_payload() -> serde_json::Value { serde_json::json!({ - "payload": { - "data": { - "userWasCreated": null + "payload": { + "data": { + "userWasCreated": null + } } - } }) } /// Creates an expected subscription event payload for a user with missing reviews field (becomes null) fn create_expected_user_payload_missing_reviews(user_num: u32) -> serde_json::Value { serde_json::json!({ - "payload": { - "data": { - "userWasCreated": { - "name": format!("User {}", user_num), - "reviews": null // Missing reviews field gets transformed to null + "payload": { + "data": { + "userWasCreated": { + "name": format!("User {}", user_num), + "reviews": null // Missing reviews field gets transformed to null + } } } - } }) } /// Creates an expected subscription event payload for a user with missing reviews field (becomes null) and error fn create_expected_partial_error_payload(user_num: u32) -> serde_json::Value { serde_json::json!({ - "payload": { - "data": { - "userWasCreated": { - "name": format!("User {}", user_num), - "reviews": null // Missing reviews field gets transformed to null - } - }, - "errors": [ - { - "message": "Internal error handling deferred response", - "extensions": { - "code": "INTERNAL_ERROR" + "payload": { + "data": { + "userWasCreated": { + "name": format!("User {}", user_num), + "reviews": null // Missing reviews field gets transformed to null } - } - ] - } + }, + "errors": [ + { + "message": "Internal error handling deferred response", + "extensions": { + "code": "INTERNAL_ERROR" + } + } + ] + } }) } /// Creates an expected subscription event payload for a user with missing reviews field (becomes null) and error fn create_expected_error_payload() -> serde_json::Value { serde_json::json!({ - "payload": { - "data": { - "userWasCreated": null - } - }, - "errors": [{ - "message": "Internal error handling deferred response", - "extensions": {"code": "INTERNAL_ERROR"} - }] + "payload": { + "data": { + "userWasCreated": null + }, + "errors": [{ + "message": "Internal error handling deferred response", + "extensions": {"code": "INTERNAL_ERROR"} + }] + }, }) } @@ -130,27 +130,27 @@ fn create_initial_empty_response() -> serde_json::Value { /// Creates a GraphQL data payload for a user (sent to mock server) fn create_user_data_payload(user_num: u32) -> serde_json::Value { serde_json::json!({ - "data": { - "userWasCreated": { - "name": format!("User {}", user_num), - "reviews": [{ - "body": format!("Review {} from user {}", user_num, user_num) - }] + "data": { + "userWasCreated": { + "name": format!("User {}", user_num), + "reviews": [{ + "body": format!("Review {} from user {}", user_num, user_num) + }] + } } - } }) } /// Creates a GraphQL data payload with missing reviews field (sent to mock server) fn create_user_data_payload_missing_reviews(user_num: u32) -> serde_json::Value { serde_json::json!({ - "data": { - "userWasCreated": { - "name": format!("User {}", user_num) - // Missing reviews field to test error handling - } - }, - "errors": [] + "data": { + "userWasCreated": { + "name": format!("User {}", user_num) + // Missing reviews field to test error handling + } + }, + "errors": [] }) } @@ -164,36 +164,36 @@ fn create_empty_data_payload() -> serde_json::Value { /// Creates an expected error response payload (sent to mock server) fn create_partial_error_payload(user_num: u32) -> serde_json::Value { serde_json::json!({ - "data": { - "userWasCreated": { - "name": format!("User {}", user_num), - } - }, - "errors": [ - { - "message": "Internal error handling deferred response", - "extensions": { - "code": "INTERNAL_ERROR" + "data": { + "userWasCreated": { + "name": format!("User {}", user_num), } - } - ] + }, + "errors": [ + { + "message": "Internal error handling deferred response", + "extensions": { + "code": "INTERNAL_ERROR" + } + } + ] }) } /// Creates an expected error response payload (sent to mock server) fn create_error_payload() -> serde_json::Value { serde_json::json!({ - "data": { - "userWasCreated": null - }, - "errors": [ - { - "message": "Internal error handling deferred response", - "extensions": { - "code": "INTERNAL_ERROR" + "data": { + "userWasCreated": null + }, + "errors": [ + { + "message": "Internal error handling deferred response", + "extensions": { + "code": "INTERNAL_ERROR" + } } - } - ] + ] }) } @@ -420,7 +420,6 @@ async fn test_subscription_ws_passthrough_error_payload() -> Result<(), BoxError // We have disabled this test because this test is failing for reasons that are understood, but are now preventing us from doing other fixes. We will ensure this is fixed by tracking this in the attached ticket as a follow up on its own PR. // The bug is basically an inconsistency in the way we're returning an error, sometimes it's consider as a critical error, sometimes not. -#[ignore = "ROUTER-1343"] #[tokio::test(flavor = "multi_thread")] async fn test_subscription_ws_passthrough_pure_error_payload() -> Result<(), BoxError> { if !graph_os_enabled() { @@ -505,7 +504,6 @@ async fn test_subscription_ws_passthrough_pure_error_payload() -> Result<(), Box // We have disabled this test because this test is failing for reasons that are understood, but are now preventing us from doing other fixes. We will ensure this is fixed by tracking this in the attached ticket as a follow up on its own PR. // The bug is basically an inconsistency in the way we're returning an error, sometimes it's consider as a critical error, sometimes not. -#[ignore = "ROUTER-1343"] #[tokio::test(flavor = "multi_thread")] async fn test_subscription_ws_passthrough_pure_error_payload_with_coprocessor() -> Result<(), BoxError> {