Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions apollo-router/src/plugins/connectors/handle_responses.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::Arc;

use apollo_compiler::validation::Valid;
use apollo_compiler::Schema;
use apollo_federation::sources::connect::ApplyTo;
use apollo_federation::sources::connect::Connector;
use parking_lot::Mutex;
use serde_json_bytes::ByteString;
use serde_json_bytes::Value;

Expand Down Expand Up @@ -35,7 +38,7 @@ pub(crate) enum HandleResponseError {
pub(crate) async fn handle_responses(
responses: Vec<http::Response<RouterBody>>,
connector: &Connector,
debug: &mut Option<ConnectorContext>,
debug: &Option<Arc<Mutex<ConnectorContext>>>,
_schema: &Valid<Schema>, // TODO for future apply_with_selection
) -> Result<Response, HandleResponseError> {
use HandleResponseError::*;
Expand All @@ -58,8 +61,8 @@ pub(crate) async fn handle_responses(

if parts.status.is_success() {
let Ok(json_data) = serde_json::from_slice::<Value>(body) else {
if let Some(ref mut debug) = debug {
debug.push_invalid_response(&parts, body);
if let Some(debug) = debug {
debug.lock().push_invalid_response(&parts, body);
}
return Err(InvalidResponseBody(
"couldn't deserialize response body".into(),
Expand All @@ -77,8 +80,8 @@ pub(crate) async fn handle_responses(
&response_key.inputs().merge(connector.config.as_ref()),
);

if let Some(ref mut debug) = debug {
debug.push_response(
if let Some(ref debug) = debug {
debug.lock().push_response(
&parts,
&json_data,
Some(SelectionData {
Expand Down Expand Up @@ -169,13 +172,13 @@ pub(crate) async fn handle_responses(
_ => {}
};

if let Some(ref mut debug) = debug {
if let Some(ref debug) = debug {
match serde_json::from_slice(body) {
Ok(json_data) => {
debug.push_response(&parts, &json_data, None);
debug.lock().push_response(&parts, &json_data, None);
}
Err(_) => {
debug.push_invalid_response(&parts, body);
debug.lock().push_invalid_response(&parts, body);
}
}
}
Expand Down Expand Up @@ -300,10 +303,9 @@ mod tests {

let schema = Schema::parse_and_validate("type Query { hello: String }", "./").unwrap();

let res =
super::handle_responses(vec![response1, response2], &connector, &mut None, &schema)
.await
.unwrap();
let res = super::handle_responses(vec![response1, response2], &connector, &None, &schema)
.await
.unwrap();

assert_debug_snapshot!(res, @r###"
Response {
Expand Down Expand Up @@ -400,10 +402,9 @@ mod tests {
)
.unwrap();

let res =
super::handle_responses(vec![response1, response2], &connector, &mut None, &schema)
.await
.unwrap();
let res = super::handle_responses(vec![response1, response2], &connector, &None, &schema)
.await
.unwrap();

assert_debug_snapshot!(res, @r###"
Response {
Expand Down Expand Up @@ -506,10 +507,9 @@ mod tests {
)
.unwrap();

let res =
super::handle_responses(vec![response1, response2], &connector, &mut None, &schema)
.await
.unwrap();
let res = super::handle_responses(vec![response1, response2], &connector, &None, &schema)
.await
.unwrap();

assert_debug_snapshot!(res, @r###"
Response {
Expand Down Expand Up @@ -628,7 +628,7 @@ mod tests {
let res = super::handle_responses(
vec![response1, response2, response3],
&connector,
&mut None,
&None,
&schema,
)
.await
Expand Down
7 changes: 4 additions & 3 deletions apollo-router/src/plugins/connectors/http_json_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use http::HeaderMap;
use http::HeaderName;
use http::HeaderValue;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use serde_json_bytes::json;
use serde_json_bytes::ByteString;
use serde_json_bytes::Map;
Expand Down Expand Up @@ -67,7 +68,7 @@ pub(crate) fn make_request(
transport: &HttpJsonTransport,
inputs: IndexMap<String, Value>,
original_request: &connect::Request,
debug: &mut Option<ConnectorContext>,
debug: &Option<Arc<Mutex<ConnectorContext>>>,
) -> Result<http::Request<RouterBody>, HttpJsonTransportError> {
let uri = make_uri(
transport.source_url.as_ref(),
Expand Down Expand Up @@ -100,8 +101,8 @@ pub(crate) fn make_request(
&transport.headers,
);

if let Some(ref mut debug) = debug {
debug.push_request(
if let Some(debug) = debug {
debug.lock().push_request(
&request,
json_body.as_ref(),
transport.body.as_ref().map(|body| SelectionData {
Expand Down
9 changes: 6 additions & 3 deletions apollo-router/src/plugins/connectors/make_requests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::sync::Arc;

use apollo_compiler::collections::IndexMap;
use apollo_compiler::executable::Selection;
use apollo_federation::sources::connect::Connector;
use apollo_federation::sources::connect::CustomConfiguration;
use apollo_federation::sources::connect::EntityResolver;
use itertools::Itertools;
use parking_lot::Mutex;
use serde_json_bytes::json;
use serde_json_bytes::ByteString;
use serde_json_bytes::Map;
Expand Down Expand Up @@ -131,7 +134,7 @@ pub(crate) enum ResponseTypeName {
pub(crate) fn make_requests(
request: connect::Request,
connector: &Connector,
debug: &mut Option<ConnectorContext>,
debug: &Option<Arc<Mutex<ConnectorContext>>>,
) -> Result<Vec<(http::Request<RouterBody>, ResponseKey)>, MakeRequestError> {
let request_params = match connector.entity_resolver {
Some(EntityResolver::Explicit) => entities_from_request(&request),
Expand All @@ -146,7 +149,7 @@ fn request_params_to_requests(
connector: &Connector,
request_params: Vec<ResponseKey>,
original_request: &connect::Request,
debug: &mut Option<ConnectorContext>,
debug: &Option<Arc<Mutex<ConnectorContext>>>,
) -> Result<Vec<(http::Request<RouterBody>, ResponseKey)>, MakeRequestError> {
let mut results = vec![];

Expand Down Expand Up @@ -1280,7 +1283,7 @@ mod tests {
config: Default::default(),
};

let requests = super::make_requests(req, &connector, &mut None).unwrap();
let requests = super::make_requests(req, &connector, &None).unwrap();

assert_debug_snapshot!(requests, @r###"
[
Expand Down
25 changes: 16 additions & 9 deletions apollo-router/src/plugins/connectors/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::sync::Arc;

use apollo_federation::sources::connect::ApplyToError;
use bytes::Bytes;
use futures::future::ready;
use futures::stream::once;
use futures::StreamExt;
use http::HeaderValue;
use itertools::Itertools;
use parking_lot::Mutex;
use serde::Deserialize;
use serde::Serialize;
use serde_json_bytes::json;
Expand Down Expand Up @@ -57,7 +60,9 @@ impl Plugin for Connectors {
== Some(&HeaderValue::from_static("true"));
if is_enabled {
req.context.extensions().with_lock(|mut lock| {
lock.insert::<ConnectorContext>(ConnectorContext::default());
lock.insert::<Arc<Mutex<ConnectorContext>>>(Arc::new(Mutex::new(
ConnectorContext::default(),
)));
});
}

Expand All @@ -69,19 +74,21 @@ impl Plugin for Connectors {
res = match res {
Ok(mut res) => {
if is_enabled {
if let Some(debug) = res
.context
.extensions()
.with_lock(|mut lock| lock.remove::<ConnectorContext>())
if let Some(debug) =
res.context.extensions().with_lock(|mut lock| {
lock.remove::<Arc<Mutex<ConnectorContext>>>()
})
{
let (parts, stream) = res.response.into_parts();
let (mut first, rest) = stream.into_future().await;

if let Some(first) = &mut first {
first.extensions.insert(
"apolloConnectorsDebugging",
json!({"version": "1", "data": debug.serialize() }),
);
if let Some(inner) = Arc::into_inner(debug) {
first.extensions.insert(
"apolloConnectorsDebugging",
json!({"version": "1", "data": inner.into_inner().serialize() }),
);
}
}
res.response = http::Response::from_parts(
parts,
Expand Down
22 changes: 6 additions & 16 deletions apollo-router/src/services/connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use apollo_federation::sources::connect::Connector;
use futures::future::BoxFuture;
use indexmap::IndexMap;
use opentelemetry::Key;
use parking_lot::Mutex;
use tower::BoxError;
use tower::ServiceExt;
use tracing::Instrument;
Expand Down Expand Up @@ -125,21 +126,18 @@ async fn execute(
schema: &Valid<apollo_compiler::Schema>,
) -> Result<ConnectResponse, BoxError> {
let context = request.context.clone();
let context2 = context.clone();
let original_subgraph_name = connector.id.subgraph_name.to_string();

let mut debug = context
let debug = context
.extensions()
.with_lock(|mut lock| lock.remove::<ConnectorContext>());
.with_lock(|lock| lock.get::<Arc<Mutex<ConnectorContext>>>().cloned());

let requests = make_requests(request, connector, &mut debug).map_err(BoxError::from)?;
let requests = make_requests(request, connector, &debug).map_err(BoxError::from)?;

let tasks = requests.into_iter().map(move |(req, key)| {
let context = context.clone();
let original_subgraph_name = original_subgraph_name.clone();
async move {
let context = context.clone();

let client = http_client_factory.create(&original_subgraph_name);
let req = HttpRequest {
http_request: req,
Expand All @@ -158,17 +156,9 @@ async fn execute(
.await
.map_err(BoxError::from)?;

let result = handle_responses(responses, connector, &mut debug, schema)
handle_responses(responses, connector, &debug, schema)
.await
.map_err(BoxError::from);

if let Some(debug) = debug {
context2
.extensions()
.with_lock(|mut lock| lock.insert::<ConnectorContext>(debug));
}

result
.map_err(BoxError::from)
}

#[derive(Clone)]
Expand Down