From 42f3e1022dc6391cd9b7b868b13a2e0d00012087 Mon Sep 17 00:00:00 2001 From: Matthew Hawkins Date: Fri, 2 Aug 2024 17:36:58 -0600 Subject: [PATCH 1/3] Fix race condition when adding debug data to connector context --- .../plugins/connectors/handle_responses.rs | 94 +++++++++++-------- .../plugins/connectors/http_json_transport.rs | 27 +++--- .../src/plugins/connectors/make_requests.rs | 8 +- .../src/services/connector_service.rs | 21 +---- 4 files changed, 76 insertions(+), 74 deletions(-) diff --git a/apollo-router/src/plugins/connectors/handle_responses.rs b/apollo-router/src/plugins/connectors/handle_responses.rs index 26933107d3..38e9f5ac29 100644 --- a/apollo-router/src/plugins/connectors/handle_responses.rs +++ b/apollo-router/src/plugins/connectors/handle_responses.rs @@ -12,6 +12,7 @@ use crate::plugins::connectors::plugin::ConnectorContext; use crate::plugins::connectors::plugin::SelectionData; use crate::services::connect::Response; use crate::services::router::body::RouterBody; +use crate::Context; const ENTITIES: &str = "_entities"; const TYPENAME: &str = "__typename"; @@ -35,7 +36,7 @@ pub(crate) enum HandleResponseError { pub(crate) async fn handle_responses( responses: Vec>, connector: &Connector, - debug: &mut Option, + context: Context, _schema: &Valid, // TODO for future apply_with_selection ) -> Result { use HandleResponseError::*; @@ -58,9 +59,11 @@ pub(crate) async fn handle_responses( if parts.status.is_success() { let Ok(json_data) = serde_json::from_slice::(body) else { - if let Some(ref mut debug) = debug { - debug.push_invalid_response(&parts, body); - } + context.extensions().with_lock(|mut lock| { + if let Some(ref mut debug) = lock.get_mut::() { + debug.push_invalid_response(&parts, body); + } + }); return Err(InvalidResponseBody( "couldn't deserialize response body".into(), )); @@ -77,18 +80,20 @@ pub(crate) async fn handle_responses( &response_key.inputs().merge(connector.config.as_ref()), ); - if let Some(ref mut debug) = debug { - debug.push_response( - &parts, - &json_data, - Some(SelectionData { - source: connector.selection.to_string(), - transformed: transformed_selection.to_string(), - result: res.clone(), - errors: apply_to_errors, - }), - ); - } + context.extensions().with_lock(|mut lock| { + if let Some(ref mut debug) = lock.get_mut::() { + debug.push_response( + &parts, + &json_data, + Some(SelectionData { + source: connector.selection.to_string(), + transformed: transformed_selection.to_string(), + result: res.clone(), + errors: apply_to_errors, + }), + ); + } + }); res.unwrap_or_else(|| Value::Null) }; @@ -169,16 +174,18 @@ pub(crate) async fn handle_responses( _ => {} }; - if let Some(ref mut debug) = debug { - match serde_json::from_slice(body) { - Ok(json_data) => { - debug.push_response(&parts, &json_data, None); - } - Err(_) => { - debug.push_invalid_response(&parts, body); + context.extensions().with_lock(|mut lock| { + if let Some(ref mut debug) = lock.get_mut::() { + match serde_json::from_slice(body) { + Ok(json_data) => { + debug.push_response(&parts, &json_data, None); + } + Err(_) => { + debug.push_invalid_response(&parts, body); + } } } - } + }); errors.push( graphql::Error::builder() @@ -248,6 +255,7 @@ mod tests { use crate::plugins::connectors::make_requests::ResponseKey; use crate::plugins::connectors::make_requests::ResponseTypeName; + use crate::Context; #[tokio::test] async fn test_handle_responses_root_fields() { @@ -300,10 +308,14 @@ mod tests { let schema = Schema::parse_and_validate("type Query { hello: String }", "./").unwrap(); - let res = - super::handle_responses(vec![response1, response2], &connector, &mut None, &schema) - .await - .unwrap(); + let res = super::handle_responses( + vec![response1, response2], + &connector, + Context::default(), + &schema, + ) + .await + .unwrap(); assert_debug_snapshot!(res, @r###" Response { @@ -400,10 +412,14 @@ mod tests { ) .unwrap(); - let res = - super::handle_responses(vec![response1, response2], &connector, &mut None, &schema) - .await - .unwrap(); + let res = super::handle_responses( + vec![response1, response2], + &connector, + Context::default(), + &schema, + ) + .await + .unwrap(); assert_debug_snapshot!(res, @r###" Response { @@ -506,10 +522,14 @@ mod tests { ) .unwrap(); - let res = - super::handle_responses(vec![response1, response2], &connector, &mut None, &schema) - .await - .unwrap(); + let res = super::handle_responses( + vec![response1, response2], + &connector, + Context::default(), + &schema, + ) + .await + .unwrap(); assert_debug_snapshot!(res, @r###" Response { @@ -628,7 +648,7 @@ mod tests { let res = super::handle_responses( vec![response1, response2, response3], &connector, - &mut None, + Context::default(), &schema, ) .await diff --git a/apollo-router/src/plugins/connectors/http_json_transport.rs b/apollo-router/src/plugins/connectors/http_json_transport.rs index fa89af160e..4b58edfd60 100644 --- a/apollo-router/src/plugins/connectors/http_json_transport.rs +++ b/apollo-router/src/plugins/connectors/http_json_transport.rs @@ -67,7 +67,6 @@ pub(crate) fn make_request( transport: &HttpJsonTransport, inputs: IndexMap, original_request: &connect::Request, - debug: &mut Option, ) -> Result, HttpJsonTransportError> { let uri = make_uri( transport.source_url.as_ref(), @@ -100,18 +99,20 @@ pub(crate) fn make_request( &transport.headers, ); - if let Some(ref mut debug) = debug { - debug.push_request( - &request, - json_body.as_ref(), - transport.body.as_ref().map(|body| SelectionData { - source: body.to_string(), - transformed: body.to_string(), - result: json_body.clone(), - errors: apply_to_errors, - }), - ); - } + original_request.context.extensions().with_lock(|mut lock| { + if let Some(debug) = lock.get_mut::() { + debug.push_request( + &request, + json_body.as_ref(), + transport.body.as_ref().map(|body| SelectionData { + source: body.to_string(), + transformed: body.to_string(), + result: json_body.clone(), + errors: apply_to_errors, + }), + ); + } + }); Ok(request) } diff --git a/apollo-router/src/plugins/connectors/make_requests.rs b/apollo-router/src/plugins/connectors/make_requests.rs index c30c7688e3..35f263d354 100644 --- a/apollo-router/src/plugins/connectors/make_requests.rs +++ b/apollo-router/src/plugins/connectors/make_requests.rs @@ -11,7 +11,6 @@ use serde_json_bytes::Value; use super::http_json_transport::make_request; use super::http_json_transport::HttpJsonTransportError; -use super::plugin::ConnectorContext; use crate::services::connect; use crate::services::router::body::RouterBody; @@ -131,7 +130,6 @@ pub(crate) enum ResponseTypeName { pub(crate) fn make_requests( request: connect::Request, connector: &Connector, - debug: &mut Option, ) -> Result, ResponseKey)>, MakeRequestError> { let request_params = match connector.entity_resolver { Some(EntityResolver::Explicit) => entities_from_request(&request), @@ -139,14 +137,13 @@ pub(crate) fn make_requests( None => root_fields(&request), }?; - request_params_to_requests(connector, request_params, &request, debug) + request_params_to_requests(connector, request_params, &request) } fn request_params_to_requests( connector: &Connector, request_params: Vec, original_request: &connect::Request, - debug: &mut Option, ) -> Result, ResponseKey)>, MakeRequestError> { let mut results = vec![]; @@ -155,7 +152,6 @@ fn request_params_to_requests( &connector.transport, response_key.inputs().merge(connector.config.as_ref()), original_request, - debug, )?; results.push((request, response_key)); @@ -1280,7 +1276,7 @@ mod tests { config: Default::default(), }; - let requests = super::make_requests(req, &connector, &mut None).unwrap(); + let requests = super::make_requests(req, &connector).unwrap(); assert_debug_snapshot!(requests, @r###" [ diff --git a/apollo-router/src/services/connector_service.rs b/apollo-router/src/services/connector_service.rs index 0eaaa0a3ed..1759b5d4f2 100644 --- a/apollo-router/src/services/connector_service.rs +++ b/apollo-router/src/services/connector_service.rs @@ -18,7 +18,6 @@ use super::http::HttpRequest; use super::new_service::ServiceFactory; use crate::plugins::connectors::handle_responses::handle_responses; use crate::plugins::connectors::make_requests::make_requests; -use crate::plugins::connectors::plugin::ConnectorContext; use crate::plugins::connectors::tracing::CONNECTOR_TYPE_HTTP; use crate::plugins::connectors::tracing::CONNECT_SPAN_NAME; use crate::plugins::subscription::SubscriptionConfig; @@ -128,18 +127,12 @@ async fn execute( let context2 = context.clone(); let original_subgraph_name = connector.id.subgraph_name.to_string(); - let mut debug = context - .extensions() - .with_lock(|mut lock| lock.remove::()); - - let requests = make_requests(request, connector, &mut debug).map_err(BoxError::from)?; + let requests = make_requests(request, connector).map_err(BoxError::from)?; let tasks = requests.into_iter().map(move |(req, key)| { let context = context.clone(); let original_subgraph_name = original_subgraph_name.clone(); async move { - let context = context.clone(); - let client = http_client_factory.create(&original_subgraph_name); let req = HttpRequest { http_request: req, @@ -158,17 +151,9 @@ async fn execute( .await .map_err(BoxError::from)?; - let result = handle_responses(responses, connector, &mut debug, schema) + handle_responses(responses, connector, context2, schema) .await - .map_err(BoxError::from); - - if let Some(debug) = debug { - context2 - .extensions() - .with_lock(|mut lock| lock.insert::(debug)); - } - - result + .map_err(BoxError::from) } #[derive(Clone)] From 3744132b65cd767924724d92d38c598d7094e8bd Mon Sep 17 00:00:00 2001 From: Matthew Hawkins Date: Mon, 5 Aug 2024 15:44:54 -0600 Subject: [PATCH 2/3] Access connector context through Arc> --- .../plugins/connectors/handle_responses.rs | 94 ++++++++----------- .../plugins/connectors/http_json_transport.rs | 28 +++--- .../src/plugins/connectors/make_requests.rs | 11 ++- .../src/plugins/connectors/plugin.rs | 11 ++- .../src/services/connector_service.rs | 11 ++- 5 files changed, 76 insertions(+), 79 deletions(-) diff --git a/apollo-router/src/plugins/connectors/handle_responses.rs b/apollo-router/src/plugins/connectors/handle_responses.rs index 38e9f5ac29..7d3ccf9582 100644 --- a/apollo-router/src/plugins/connectors/handle_responses.rs +++ b/apollo-router/src/plugins/connectors/handle_responses.rs @@ -1,7 +1,10 @@ +use std::sync::Arc; + use apollo_compiler::validation::Valid; use apollo_compiler::Schema; use apollo_federation::sources::connect::ApplyTo; use apollo_federation::sources::connect::Connector; +use parking_lot::Mutex; use serde_json_bytes::ByteString; use serde_json_bytes::Value; @@ -12,7 +15,6 @@ use crate::plugins::connectors::plugin::ConnectorContext; use crate::plugins::connectors::plugin::SelectionData; use crate::services::connect::Response; use crate::services::router::body::RouterBody; -use crate::Context; const ENTITIES: &str = "_entities"; const TYPENAME: &str = "__typename"; @@ -36,7 +38,7 @@ pub(crate) enum HandleResponseError { pub(crate) async fn handle_responses( responses: Vec>, connector: &Connector, - context: Context, + debug: &Option>>, _schema: &Valid, // TODO for future apply_with_selection ) -> Result { use HandleResponseError::*; @@ -59,11 +61,9 @@ pub(crate) async fn handle_responses( if parts.status.is_success() { let Ok(json_data) = serde_json::from_slice::(body) else { - context.extensions().with_lock(|mut lock| { - if let Some(ref mut debug) = lock.get_mut::() { - debug.push_invalid_response(&parts, body); - } - }); + if let Some(debug) = debug { + debug.lock().push_invalid_response(&parts, body); + } return Err(InvalidResponseBody( "couldn't deserialize response body".into(), )); @@ -80,20 +80,18 @@ pub(crate) async fn handle_responses( &response_key.inputs().merge(connector.config.as_ref()), ); - context.extensions().with_lock(|mut lock| { - if let Some(ref mut debug) = lock.get_mut::() { - debug.push_response( - &parts, - &json_data, - Some(SelectionData { - source: connector.selection.to_string(), - transformed: transformed_selection.to_string(), - result: res.clone(), - errors: apply_to_errors, - }), - ); - } - }); + if let Some(ref debug) = debug { + debug.lock().push_response( + &parts, + &json_data, + Some(SelectionData { + source: connector.selection.to_string(), + transformed: transformed_selection.to_string(), + result: res.clone(), + errors: apply_to_errors, + }), + ); + } res.unwrap_or_else(|| Value::Null) }; @@ -174,18 +172,16 @@ pub(crate) async fn handle_responses( _ => {} }; - context.extensions().with_lock(|mut lock| { - if let Some(ref mut debug) = lock.get_mut::() { - match serde_json::from_slice(body) { - Ok(json_data) => { - debug.push_response(&parts, &json_data, None); - } - Err(_) => { - debug.push_invalid_response(&parts, body); - } + if let Some(ref debug) = debug { + match serde_json::from_slice(body) { + Ok(json_data) => { + debug.lock().push_response(&parts, &json_data, None); + } + Err(_) => { + debug.lock().push_invalid_response(&parts, body); } } - }); + } errors.push( graphql::Error::builder() @@ -255,7 +251,6 @@ mod tests { use crate::plugins::connectors::make_requests::ResponseKey; use crate::plugins::connectors::make_requests::ResponseTypeName; - use crate::Context; #[tokio::test] async fn test_handle_responses_root_fields() { @@ -308,14 +303,9 @@ mod tests { let schema = Schema::parse_and_validate("type Query { hello: String }", "./").unwrap(); - let res = super::handle_responses( - vec![response1, response2], - &connector, - Context::default(), - &schema, - ) - .await - .unwrap(); + let res = super::handle_responses(vec![response1, response2], &connector, &None, &schema) + .await + .unwrap(); assert_debug_snapshot!(res, @r###" Response { @@ -412,14 +402,9 @@ mod tests { ) .unwrap(); - let res = super::handle_responses( - vec![response1, response2], - &connector, - Context::default(), - &schema, - ) - .await - .unwrap(); + let res = super::handle_responses(vec![response1, response2], &connector, &None, &schema) + .await + .unwrap(); assert_debug_snapshot!(res, @r###" Response { @@ -522,14 +507,9 @@ mod tests { ) .unwrap(); - let res = super::handle_responses( - vec![response1, response2], - &connector, - Context::default(), - &schema, - ) - .await - .unwrap(); + let res = super::handle_responses(vec![response1, response2], &connector, &None, &schema) + .await + .unwrap(); assert_debug_snapshot!(res, @r###" Response { @@ -648,7 +628,7 @@ mod tests { let res = super::handle_responses( vec![response1, response2, response3], &connector, - Context::default(), + &None, &schema, ) .await diff --git a/apollo-router/src/plugins/connectors/http_json_transport.rs b/apollo-router/src/plugins/connectors/http_json_transport.rs index 4b58edfd60..4501ff3c6d 100644 --- a/apollo-router/src/plugins/connectors/http_json_transport.rs +++ b/apollo-router/src/plugins/connectors/http_json_transport.rs @@ -24,6 +24,7 @@ use http::HeaderMap; use http::HeaderName; use http::HeaderValue; use lazy_static::lazy_static; +use parking_lot::Mutex; use serde_json_bytes::json; use serde_json_bytes::ByteString; use serde_json_bytes::Map; @@ -67,6 +68,7 @@ pub(crate) fn make_request( transport: &HttpJsonTransport, inputs: IndexMap, original_request: &connect::Request, + debug: &Option>>, ) -> Result, HttpJsonTransportError> { let uri = make_uri( transport.source_url.as_ref(), @@ -99,20 +101,18 @@ pub(crate) fn make_request( &transport.headers, ); - original_request.context.extensions().with_lock(|mut lock| { - if let Some(debug) = lock.get_mut::() { - debug.push_request( - &request, - json_body.as_ref(), - transport.body.as_ref().map(|body| SelectionData { - source: body.to_string(), - transformed: body.to_string(), - result: json_body.clone(), - errors: apply_to_errors, - }), - ); - } - }); + if let Some(debug) = debug { + debug.lock().push_request( + &request, + json_body.as_ref(), + transport.body.as_ref().map(|body| SelectionData { + source: body.to_string(), + transformed: body.to_string(), + result: json_body.clone(), + errors: apply_to_errors, + }), + ); + } Ok(request) } diff --git a/apollo-router/src/plugins/connectors/make_requests.rs b/apollo-router/src/plugins/connectors/make_requests.rs index 35f263d354..607363e959 100644 --- a/apollo-router/src/plugins/connectors/make_requests.rs +++ b/apollo-router/src/plugins/connectors/make_requests.rs @@ -1,9 +1,12 @@ +use std::sync::Arc; + use apollo_compiler::collections::IndexMap; use apollo_compiler::executable::Selection; use apollo_federation::sources::connect::Connector; use apollo_federation::sources::connect::CustomConfiguration; use apollo_federation::sources::connect::EntityResolver; use itertools::Itertools; +use parking_lot::Mutex; use serde_json_bytes::json; use serde_json_bytes::ByteString; use serde_json_bytes::Map; @@ -11,6 +14,7 @@ use serde_json_bytes::Value; use super::http_json_transport::make_request; use super::http_json_transport::HttpJsonTransportError; +use super::plugin::ConnectorContext; use crate::services::connect; use crate::services::router::body::RouterBody; @@ -130,6 +134,7 @@ pub(crate) enum ResponseTypeName { pub(crate) fn make_requests( request: connect::Request, connector: &Connector, + debug: &Option>>, ) -> Result, ResponseKey)>, MakeRequestError> { let request_params = match connector.entity_resolver { Some(EntityResolver::Explicit) => entities_from_request(&request), @@ -137,13 +142,14 @@ pub(crate) fn make_requests( None => root_fields(&request), }?; - request_params_to_requests(connector, request_params, &request) + request_params_to_requests(connector, request_params, &request, debug) } fn request_params_to_requests( connector: &Connector, request_params: Vec, original_request: &connect::Request, + debug: &Option>>, ) -> Result, ResponseKey)>, MakeRequestError> { let mut results = vec![]; @@ -152,6 +158,7 @@ fn request_params_to_requests( &connector.transport, response_key.inputs().merge(connector.config.as_ref()), original_request, + debug, )?; results.push((request, response_key)); @@ -1276,7 +1283,7 @@ mod tests { config: Default::default(), }; - let requests = super::make_requests(req, &connector).unwrap(); + let requests = super::make_requests(req, &connector, &None).unwrap(); assert_debug_snapshot!(requests, @r###" [ diff --git a/apollo-router/src/plugins/connectors/plugin.rs b/apollo-router/src/plugins/connectors/plugin.rs index 3593d0f5c5..043483874b 100644 --- a/apollo-router/src/plugins/connectors/plugin.rs +++ b/apollo-router/src/plugins/connectors/plugin.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use apollo_federation::sources::connect::ApplyToError; use bytes::Bytes; use futures::future::ready; @@ -5,6 +7,7 @@ use futures::stream::once; use futures::StreamExt; use http::HeaderValue; use itertools::Itertools; +use parking_lot::Mutex; use serde::Deserialize; use serde::Serialize; use serde_json_bytes::json; @@ -57,7 +60,9 @@ impl Plugin for Connectors { == Some(&HeaderValue::from_static("true")); if is_enabled { req.context.extensions().with_lock(|mut lock| { - lock.insert::(ConnectorContext::default()); + lock.insert::>>(Arc::new(Mutex::new( + ConnectorContext::default(), + ))); }); } @@ -72,7 +77,7 @@ impl Plugin for Connectors { if let Some(debug) = res .context .extensions() - .with_lock(|mut lock| lock.remove::()) + .with_lock(|mut lock| lock.remove::>>()) { let (parts, stream) = res.response.into_parts(); let (mut first, rest) = stream.into_future().await; @@ -80,7 +85,7 @@ impl Plugin for Connectors { if let Some(first) = &mut first { first.extensions.insert( "apolloConnectorsDebugging", - json!({"version": "1", "data": debug.serialize() }), + json!({"version": "1", "data": debug.lock().clone().serialize() }), ); } res.response = http::Response::from_parts( diff --git a/apollo-router/src/services/connector_service.rs b/apollo-router/src/services/connector_service.rs index 1759b5d4f2..ee88ae08e3 100644 --- a/apollo-router/src/services/connector_service.rs +++ b/apollo-router/src/services/connector_service.rs @@ -8,6 +8,7 @@ use apollo_federation::sources::connect::Connector; use futures::future::BoxFuture; use indexmap::IndexMap; use opentelemetry::Key; +use parking_lot::Mutex; use tower::BoxError; use tower::ServiceExt; use tracing::Instrument; @@ -18,6 +19,7 @@ use super::http::HttpRequest; use super::new_service::ServiceFactory; use crate::plugins::connectors::handle_responses::handle_responses; use crate::plugins::connectors::make_requests::make_requests; +use crate::plugins::connectors::plugin::ConnectorContext; use crate::plugins::connectors::tracing::CONNECTOR_TYPE_HTTP; use crate::plugins::connectors::tracing::CONNECT_SPAN_NAME; use crate::plugins::subscription::SubscriptionConfig; @@ -124,10 +126,13 @@ async fn execute( schema: &Valid, ) -> Result { let context = request.context.clone(); - let context2 = context.clone(); let original_subgraph_name = connector.id.subgraph_name.to_string(); - let requests = make_requests(request, connector).map_err(BoxError::from)?; + let debug = context + .extensions() + .with_lock(|lock| lock.get::>>().cloned()); + + let requests = make_requests(request, connector, &debug).map_err(BoxError::from)?; let tasks = requests.into_iter().map(move |(req, key)| { let context = context.clone(); @@ -151,7 +156,7 @@ async fn execute( .await .map_err(BoxError::from)?; - handle_responses(responses, connector, context2, schema) + handle_responses(responses, connector, &debug, schema) .await .map_err(BoxError::from) } From 8e2118dd6b2bb5b482b9c080a828b4d13e69bef8 Mon Sep 17 00:00:00 2001 From: Matthew Hawkins Date: Tue, 6 Aug 2024 09:41:26 -0600 Subject: [PATCH 3/3] Remove clone of ConnectorContext --- apollo-router/src/plugins/connectors/plugin.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/apollo-router/src/plugins/connectors/plugin.rs b/apollo-router/src/plugins/connectors/plugin.rs index 043483874b..c3abfd0cb4 100644 --- a/apollo-router/src/plugins/connectors/plugin.rs +++ b/apollo-router/src/plugins/connectors/plugin.rs @@ -74,19 +74,21 @@ impl Plugin for Connectors { res = match res { Ok(mut res) => { if is_enabled { - if let Some(debug) = res - .context - .extensions() - .with_lock(|mut lock| lock.remove::>>()) + if let Some(debug) = + res.context.extensions().with_lock(|mut lock| { + lock.remove::>>() + }) { let (parts, stream) = res.response.into_parts(); let (mut first, rest) = stream.into_future().await; if let Some(first) = &mut first { - first.extensions.insert( - "apolloConnectorsDebugging", - json!({"version": "1", "data": debug.lock().clone().serialize() }), - ); + if let Some(inner) = Arc::into_inner(debug) { + first.extensions.insert( + "apolloConnectorsDebugging", + json!({"version": "1", "data": inner.into_inner().serialize() }), + ); + } } res.response = http::Response::from_parts( parts,