Skip to content
Closed
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 36 additions & 0 deletions crates/goose-cli/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use goose::agents::{Agent, SessionConfig};
use goose::config::Config;
use goose::providers::pricing::initialize_pricing_cache;
use goose::session;
use goose_mcp::FilePidTracker;
use input::InputResult;
use mcp_core::handler::ToolError;
use rmcp::model::PromptMessage;
Expand Down Expand Up @@ -1316,6 +1317,14 @@ impl Session {
"The existing call to {} was interrupted. How would you like to proceed?",
last_tool_name
);

// If this was a shell command that might have left processes running, clean them up
if last_tool_name == "developer__shell" {
tokio::spawn(async {
cleanup_shell_processes().await;
});
}

self.push_message(Message::assistant().with_text(&prompt));

// No need for description update here
Expand Down Expand Up @@ -1624,6 +1633,33 @@ impl Session {
}
}

/// Cleanup function to kill tracked subprocess PIDs when shell commands are interrupted.
async fn cleanup_shell_processes() {
let file_tracker = FilePidTracker::new();
let tracked_pids = file_tracker.get_all_pids();

if tracked_pids.is_empty() {
return;
}
if cfg!(windows) {
// On Windows, we can use taskkill to terminate processes by PID
for pid in tracked_pids {
let _ = tokio::process::Command::new("taskkill")
.args(&["/F", "/PID", &pid.to_string()])
.output()
.await;
}
} else {
// On Unix-like systems, we can use kill to terminate processes by PID
for pid in tracked_pids {
let _ = tokio::process::Command::new("kill")
.args(&["-TERM", &pid.to_string()])
.output()
.await;
}
}
}

fn get_reasoner() -> Result<Arc<dyn Provider>, anyhow::Error> {
use goose::model::ModelConfig;
use goose::providers::create;
Expand Down
1 change: 1 addition & 0 deletions crates/goose-mcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ hyper = "1"
serde_with = "3"
which = "6.0"
glob = "0.3"
uuid = { version = "1.0", features = ["v4"] }


[dev-dependencies]
Expand Down
153 changes: 111 additions & 42 deletions crates/goose-mcp/src/developer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
io::{Cursor, Read},
path::{Path, PathBuf},
pin::Pin,
sync::{Arc, Mutex},
};
use tokio::{
io::{AsyncBufReadExt, BufReader},
Expand All @@ -23,6 +24,7 @@ use tokio::{
};
use tokio_stream::{wrappers::SplitStream, StreamExt as _};
use url::Url;
use uuid::Uuid;

use include_dir::{include_dir, Dir};
use mcp_core::{
Expand All @@ -41,13 +43,13 @@ use rmcp::object;

use self::editor_models::{create_editor_model, EditorModel};
use self::shell::{expand_path, get_shell_config, is_absolute_path, normalize_line_endings};
use crate::file_pid_tracker::FilePidTracker;

use ignore::gitignore::{Gitignore, GitignoreBuilder};
use indoc::indoc;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use xcap::{Monitor, Window};

use ignore::gitignore::{Gitignore, GitignoreBuilder};

#[derive(Debug, Serialize, Deserialize)]
pub struct PromptTemplate {
pub id: String,
Expand Down Expand Up @@ -631,64 +633,130 @@ impl DeveloperRouter {
// Get platform-specific shell configuration
let shell_config = get_shell_config();

// Execute the command using platform-specific shell
// Execute the command using shell with better process cleanup
let wrapped_command = if cfg!(windows) {
command.to_string()
} else {
format!("setsid bash -c '{}'", command)
};

let mut child = Command::new(&shell_config.executable)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null())
.kill_on_drop(true)
.args(&shell_config.args)
.arg(command)
.arg(&wrapped_command)
.spawn()
.map_err(|e| ToolError::ExecutionError(e.to_string()))?;

let stdout = BufReader::new(child.stdout.take().unwrap());
let stderr = BufReader::new(child.stderr.take().unwrap());
// Store the child PID for cleanup - generate a unique execution ID
let execution_id = format!("exec_{}", Uuid::new_v4().simple());

let child_pid = child.id();

// Store the PID globally for cleanup if cancellation occurs
if let Some(pid) = child_pid {
let file_tracker = FilePidTracker::new();
file_tracker.register_process(execution_id.clone(), pid, command.to_string());
}

let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();

let mut stdout_reader = BufReader::new(stdout);
let mut stderr_reader = BufReader::new(stderr);

let output_task = tokio::spawn(async move {
let mut combined_output = String::new();

// We have the individual two streams above, now merge them into one unified stream of
// an enum. ref https://blog.yoshuawuyts.com/futures-concurrency-3
let stdout = SplitStream::new(stdout.split(b'\n')).map(|v| ("stdout", v));
let stderr = SplitStream::new(stderr.split(b'\n')).map(|v| ("stderr", v));
let mut merged = stdout.merge(stderr);

while let Some((key, line)) = merged.next().await {
let mut line = line?;
// Re-add this as clients expect it
line.push(b'\n');
// Here we always convert to UTF-8 so agents don't have to deal with corrupted output
let line = String::from_utf8_lossy(&line);

combined_output.push_str(&line);

notifier
.try_send(JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: JsonRpcVersion2_0,
notification: Notification {
method: "notifications/message".to_string(),
params: object!({
"level": "info",
"data": {
"type": "shell",
"stream": key,
"output": line,
let mut stdout_buf = Vec::new();
let mut stderr_buf = Vec::new();

let mut stdout_done = false;
let mut stderr_done = false;

loop {
tokio::select! {
n = stdout_reader.read_until(b'\n', &mut stdout_buf), if !stdout_done => {
if n? == 0 {
stdout_done = true;
} else {
let line = String::from_utf8_lossy(&stdout_buf);

notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: JsonRpcVersion2_0,
notification: Notification {
method: "notifications/message".to_string(),
params: object!({
"level": "info",
"data": {
"type": "shell",
"stream": "stdout",
"output": line.to_string(),
}
}),
extensions: Default::default(),
}
})).ok();

combined_output.push_str(&line);
stdout_buf.clear();
}
}

n = stderr_reader.read_until(b'\n', &mut stderr_buf), if !stderr_done => {
if n? == 0 {
stderr_done = true;
} else {
let line = String::from_utf8_lossy(&stderr_buf);

notifier.try_send(JsonRpcMessage::Notification(JsonRpcNotification {
jsonrpc: JsonRpcVersion2_0,
notification: Notification {
method: "notifications/message".to_string(),
params: object!({
"level": "info",
"data": {
"type": "shell",
"stream": "stderr",
"output": line.to_string(),
}
}),
extensions: Default::default(),
}
}),
extensions: Default::default(),
},
}))
.ok();
})).ok();

combined_output.push_str(&line);
stderr_buf.clear();
}
}

else => break,
}

if stdout_done && stderr_done {
break;
}
}
Ok::<_, std::io::Error>(combined_output)
});

// Wait for the command to complete and get output
child
.wait()
.await
.map_err(|e| ToolError::ExecutionError(e.to_string()))?;
let exit_status_result = child.wait().await;

match exit_status_result {
Ok(exit_status) => {
if exit_status.success() {
// Always use file-based tracking for consistency
let file_tracker = FilePidTracker::new();
file_tracker.unregister_process(&execution_id);
}
}
Err(e) => {
return Err(ToolError::ExecutionError(e.to_string()));
}
}

let output_str = match output_task.await {
Ok(result) => result.map_err(|e| ToolError::ExecutionError(e.to_string()))?,
Expand Down Expand Up @@ -1714,6 +1782,7 @@ mod tests {
json!({
"command": "Get-ChildItem"
}),
dummy_sender(),
)
.await;
assert!(result.is_ok());
Expand Down
Loading