Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apollo-router/src/plugins/mock_subgraphs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ impl PluginPrivate for MockSubgraphsPlugin {
let config = config.clone();
let subgraph_schema = subgraph_schema.clone();
async move {
// WARN: REMOVE ME BEFORE MERGING! here to demonstrate the fix
dbg!(request.subgraph_request.body());

let mut response = http::Response::builder();
let body = if let Some(config) = &config {
*response.headers_mut().unwrap() = config.headers.0.clone();
Expand Down
20 changes: 17 additions & 3 deletions apollo-router/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ pub struct IntegrationTest {
router_location: PathBuf,
stdio_tx: tokio::sync::mpsc::Sender<String>,
stdio_rx: tokio::sync::mpsc::Receiver<String>,
stderr_tx: tokio::sync::mpsc::Sender<String>,
apollo_otlp_metrics_rx: tokio::sync::mpsc::Receiver<ExportMetricsServiceRequest>,
collect_stdio: Option<(tokio::sync::oneshot::Sender<String>, regex::Regex)>,
_subgraphs: wiremock::MockServer,
Expand Down Expand Up @@ -662,6 +663,16 @@ impl IntegrationTest {
fs::copy(&supergraph, &test_schema_location).expect("could not write schema");

let (stdio_tx, stdio_rx) = tokio::sync::mpsc::channel(2000);
// we separate stderr and stdio (previously they were both just handled by a single
// channel) to avoid congestion in one to contend the other

let (stderr_tx, mut stderr_rx) = tokio::sync::mpsc::channel::<String>(2000);
// we want to continually drain stderr, not let it build up backpressure
task::spawn(async move {
while stderr_rx.recv().await.is_some() {
// we discard stderr to prevent backpressure
}
});
let collect_stdio = collect_stdio.map(|sender| {
let version_line_re = regex::Regex::new("Apollo Router v[^ ]+ ").unwrap();
(sender, version_line_re)
Expand Down Expand Up @@ -693,6 +704,7 @@ impl IntegrationTest {
test_schema_location,
stdio_tx,
stdio_rx,
stderr_tx,
apollo_otlp_metrics_rx,
collect_stdio,
_subgraphs: subgraphs,
Expand Down Expand Up @@ -847,12 +859,14 @@ impl IntegrationTest {
}
});

// Also read from stderr to capture error messages
let stderr_tx = self.stdio_tx.clone();
// we separate out stderr to avoid congestion there affecting stdout; previous to this, we
// had both stdout and stderr in the same channel, allowing one's congestion to swamp the other
let stderr_tx = self.stderr_tx.clone();
task::spawn(async move {
let mut lines = stderr_reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let _ = stderr_tx.send(line).await;
// try_send to never block - if the channel is full, just drop the line
let _ = stderr_tx.try_send(line);
}
});
self.router = Some(router);
Expand Down