diff --git a/Cargo.lock b/Cargo.lock index ae1452961bd8..11b369fb16d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3466,6 +3466,7 @@ dependencies = [ "tempfile", "thiserror 1.0.69", "tokio", + "tokio-stream", "tracing", "tracing-appender", "tracing-subscriber", diff --git a/crates/goose-mcp/Cargo.toml b/crates/goose-mcp/Cargo.toml index 6bcf1e716f12..e469b952ea3e 100644 --- a/crates/goose-mcp/Cargo.toml +++ b/crates/goose-mcp/Cargo.toml @@ -16,6 +16,7 @@ mcp-server = { path = "../mcp-server" } rmcp = { workspace = true } anyhow = "1.0.94" tokio = { version = "1", features = ["full"] } +tokio-stream = { version = "0.1", features = ["io-util"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-appender = "0.2" diff --git a/crates/goose-mcp/src/developer/mod.rs b/crates/goose-mcp/src/developer/mod.rs index 7bea5c5a045c..817f372ffe20 100644 --- a/crates/goose-mcp/src/developer/mod.rs +++ b/crates/goose-mcp/src/developer/mod.rs @@ -21,6 +21,7 @@ use tokio::{ process::Command, sync::mpsc, }; +use tokio_stream::{wrappers::SplitStream, StreamExt as _}; use url::Url; use include_dir::{include_dir, Dir}; @@ -641,83 +642,44 @@ impl DeveloperRouter { .spawn() .map_err(|e| ToolError::ExecutionError(e.to_string()))?; - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - - let mut stdout_reader = BufReader::new(stdout); - let mut stderr_reader = BufReader::new(stderr); + let stdout = BufReader::new(child.stdout.take().unwrap()); + let stderr = BufReader::new(child.stderr.take().unwrap()); let output_task = tokio::spawn(async move { let mut combined_output = String::new(); - let mut stdout_buf = Vec::new(); - let mut stderr_buf = Vec::new(); - - let mut stdout_done = false; - let mut stderr_done = false; - - loop { - tokio::select! { - n = stdout_reader.read_until(b'\n', &mut stdout_buf), if !stdout_done => { - if n? == 0 { - stdout_done = true; - } else { - let line = String::from_utf8_lossy(&stdout_buf); - - notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification { - jsonrpc: JsonRpcVersion2_0, - notification: Notification { - method: "notifications/message".to_string(), - params: object!({ - "level": "info", - "data": { - "type": "shell", - "stream": "stdout", - "output": line.to_string(), - } - }), - extensions: Default::default(), + // We have the individual two streams above, now merge them into one unified stream of + // an enum. ref https://blog.yoshuawuyts.com/futures-concurrency-3 + let stdout = SplitStream::new(stdout.split(b'\n')).map(|v| ("stdout", v)); + let stderr = SplitStream::new(stderr.split(b'\n')).map(|v| ("stderr", v)); + let mut merged = stdout.merge(stderr); + + while let Some((key, line)) = merged.next().await { + let mut line = line?; + // Re-add this as clients expect it + line.push(b'\n'); + // Here we always convert to UTF-8 so agents don't have to deal with corrupted output + let line = String::from_utf8_lossy(&line); + + combined_output.push_str(&line); + + notifier + .try_send(JsonRpcMessage::Notification(JsonRpcNotification { + jsonrpc: JsonRpcVersion2_0, + notification: Notification { + method: "notifications/message".to_string(), + params: object!({ + "level": "info", + "data": { + "type": "shell", + "stream": key, + "output": line, } - })).ok(); - - combined_output.push_str(&line); - stdout_buf.clear(); - } - } - - n = stderr_reader.read_until(b'\n', &mut stderr_buf), if !stderr_done => { - if n? == 0 { - stderr_done = true; - } else { - let line = String::from_utf8_lossy(&stderr_buf); - - notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification { - jsonrpc: JsonRpcVersion2_0, - notification: Notification { - method: "notifications/message".to_string(), - params: object!({ - "level": "info", - "data": { - "type": "shell", - "stream": "stderr", - "output": line.to_string(), - } - }), - extensions: Default::default(), - } - })).ok(); - - combined_output.push_str(&line); - stderr_buf.clear(); - } - } - - else => break, - } - - if stdout_done && stderr_done { - break; - } + }), + extensions: Default::default(), + }, + })) + .ok(); } Ok::<_, std::io::Error>(combined_output) }); @@ -3472,4 +3434,45 @@ mod tests { assert_eq!(result.0, ""); assert_eq!(result.1, ""); } + + #[tokio::test] + #[serial] + async fn test_shell_output_without_trailing_newline() { + let temp_dir = tempfile::tempdir().unwrap(); + std::env::set_current_dir(&temp_dir).unwrap(); + + let router = get_router().await; + + // Test command that outputs content without a trailing newline + let command = if cfg!(windows) { + "echo|set /p=\"Content without newline\"" + } else { + "printf 'Content without newline'" + }; + + let result = router + .call_tool("shell", json!({ "command": command }), dummy_sender()) + .await + .unwrap(); + + // Find the assistant content (which contains the full output) + let assistant_content = result + .iter() + .find(|c| { + c.audience() + .is_some_and(|roles| roles.contains(&Role::Assistant)) + }) + .unwrap() + .as_text() + .unwrap(); + + // The output should contain the content even without a trailing newline + assert!( + assistant_content.text.contains("Content without newline"), + "Output should contain content even without trailing newline, but got: {}", + assistant_content.text + ); + + temp_dir.close().unwrap(); + } }