diff --git a/apollo-router/src/plugins/mock_subgraphs/mod.rs b/apollo-router/src/plugins/mock_subgraphs/mod.rs index 37a37be5fa..9ac662afaa 100644 --- a/apollo-router/src/plugins/mock_subgraphs/mod.rs +++ b/apollo-router/src/plugins/mock_subgraphs/mod.rs @@ -120,6 +120,9 @@ impl PluginPrivate for MockSubgraphsPlugin { let config = config.clone(); let subgraph_schema = subgraph_schema.clone(); async move { + // WARN: REMOVE ME BEFORE MERGING! here to demonstrate the fix + dbg!(request.subgraph_request.body()); + let mut response = http::Response::builder(); let body = if let Some(config) = &config { *response.headers_mut().unwrap() = config.headers.0.clone(); diff --git a/apollo-router/tests/common.rs b/apollo-router/tests/common.rs index 9816de8e1f..dde3fcff4a 100644 --- a/apollo-router/tests/common.rs +++ b/apollo-router/tests/common.rs @@ -201,6 +201,7 @@ pub struct IntegrationTest { router_location: PathBuf, stdio_tx: tokio::sync::mpsc::Sender, stdio_rx: tokio::sync::mpsc::Receiver, + stderr_tx: tokio::sync::mpsc::Sender, apollo_otlp_metrics_rx: tokio::sync::mpsc::Receiver, collect_stdio: Option<(tokio::sync::oneshot::Sender, regex::Regex)>, _subgraphs: wiremock::MockServer, @@ -662,6 +663,16 @@ impl IntegrationTest { fs::copy(&supergraph, &test_schema_location).expect("could not write schema"); let (stdio_tx, stdio_rx) = tokio::sync::mpsc::channel(2000); + // we separate stderr and stdio (previously they were both just handled by a single + // channel) to avoid congestion in one to contend the other + + let (stderr_tx, mut stderr_rx) = tokio::sync::mpsc::channel::(2000); + // we want to continually drain stderr, not let it build up backpressure + task::spawn(async move { + while stderr_rx.recv().await.is_some() { + // we discard stderr to prevent backpressure + } + }); let collect_stdio = collect_stdio.map(|sender| { let version_line_re = regex::Regex::new("Apollo Router v[^ ]+ ").unwrap(); (sender, version_line_re) @@ -693,6 +704,7 @@ impl IntegrationTest { test_schema_location, stdio_tx, stdio_rx, + stderr_tx, apollo_otlp_metrics_rx, collect_stdio, _subgraphs: subgraphs, @@ -847,12 +859,14 @@ impl IntegrationTest { } }); - // Also read from stderr to capture error messages - let stderr_tx = self.stdio_tx.clone(); + // we separate out stderr to avoid congestion there affecting stdout; previous to this, we + // had both stdout and stderr in the same channel, allowing one's congestion to swamp the other + let stderr_tx = self.stderr_tx.clone(); task::spawn(async move { let mut lines = stderr_reader.lines(); while let Ok(Some(line)) = lines.next_line().await { - let _ = stderr_tx.send(line).await; + // try_send to never block - if the channel is full, just drop the line + let _ = stderr_tx.try_send(line); } }); self.router = Some(router);