Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 43 additions & 56 deletions crates/goose/src/agents/extension_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,42 @@ impl Default for ExtensionManager {
}
}

async fn child_process_client(
mut command: Command,
timeout: &Option<u64>,
) -> ExtensionResult<McpClient> {
command.process_group(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you said we'd give them their own process group, isn't this the same process group? also, does this work on Windows?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 sets it to its own group (uses the pid)

Re: Windows: eh, I think no. I don't actually even know what the current behavior here is on windows. I don't have windows machine to test. Any ideas there?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah there is no process_group on windows it seems.

let (transport, mut stderr) = TokioChildProcess::builder(command)
.stderr(Stdio::piped())
.spawn()?;
let mut stderr = stderr.take().ok_or_else(|| {
ExtensionError::SetupError("failed to attach child process stderr".to_owned())
})?;

let stderr_task = tokio::spawn(async move {
let mut all_stderr = Vec::new();
stderr.read_to_end(&mut all_stderr).await?;
Ok::<String, std::io::Error>(String::from_utf8_lossy(&all_stderr).into())
});

let client_result = McpClient::connect(
transport,
Duration::from_secs(timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT)),
)
.await;

match client_result {
Ok(client) => Ok(client),
Err(error) => {
let error_task_out = stderr_task.await?;
Err::<McpClient, ExtensionError>(match error_task_out {
Ok(stderr_content) => ProcessExit::new(stderr_content, error).into(),
Err(e) => e.into(),
})
}
}
}

impl ExtensionManager {
/// Create a new ExtensionManager instance
pub fn new() -> Self {
Expand Down Expand Up @@ -280,38 +316,7 @@ impl ExtensionManager {
let command = Command::new(cmd).configure(|command| {
command.args(args).envs(all_envs);
});
let (transport, mut stderr) = TokioChildProcess::builder(command)
.stderr(Stdio::piped())
.spawn()?;
let mut stderr = stderr
.take()
.expect("should have a stderr handle because it was requested");

let stderr_task = tokio::spawn(async move {
let mut all_stderr = Vec::new();
stderr.read_to_end(&mut all_stderr).await?;
Ok::<String, std::io::Error>(String::from_utf8_lossy(&all_stderr).into())
});

let client_result = McpClient::connect(
transport,
Duration::from_secs(
timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT),
),
)
.await;

let client = match client_result {
Ok(client) => Ok(client),
Err(error) => {
let error_task_out = stderr_task.await?;
Err::<McpClient, ExtensionError>(match error_task_out {
Ok(stderr_content) => ProcessExit::new(stderr_content, error).into(),
Err(e) => e.into(),
})
}
}?;

let client = child_process_client(command, timeout).await?;
Box::new(client)
}
ExtensionConfig::Builtin {
Expand All @@ -326,19 +331,11 @@ impl ExtensionManager {
.to_str()
.expect("should resolve executable to string path")
.to_string();

let transport = TokioChildProcess::new(Command::new(cmd).configure(|command| {
let command = Command::new(cmd).configure(|command| {
command.arg("mcp").arg(name);
}))?;
Box::new(
McpClient::connect(
transport,
Duration::from_secs(
timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT),
),
)
.await?,
)
});
let client = child_process_client(command, timeout).await?;
Box::new(client)
}
ExtensionConfig::InlinePython {
name,
Expand All @@ -360,21 +357,11 @@ impl ExtensionManager {

command.arg("python").arg(file_path.to_str().unwrap());
});
let transport = TokioChildProcess::new(command)?;

let client = Box::new(
McpClient::connect(
transport,
Duration::from_secs(
timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT),
),
)
.await?,
);

let client = child_process_client(command, timeout).await?;
self.temp_dirs.insert(sanitized_name.clone(), temp_dir);

client
Box::new(client)
}
_ => unreachable!(),
};
Expand Down
Loading