Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions crates/goose-server/src/routes/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ enum ExtensionConfigRequest {
display_name: Option<String>,
timeout: Option<u64>,
},
/// Streamable HTTP extension using MCP Streamable HTTP specification.
#[serde(rename = "streamable_http")]
StreamableHttp {
/// The name to identify this extension
name: String,
/// The URI endpoint for the streamable HTTP extension.
uri: String,
#[serde(default)]
/// Map of environment variable key to values.
envs: Envs,
/// List of environment variable keys. The server will fetch their values from the keyring.
#[serde(default)]
env_keys: Vec<String>,
/// Custom headers to include in requests.
#[serde(default)]
headers: std::collections::HashMap<String, String>,
timeout: Option<u64>,
},
/// Frontend extension that provides tools to be executed by the frontend.
#[serde(rename = "frontend")]
Frontend {
Expand Down Expand Up @@ -176,6 +194,23 @@ async fn add_extension(
timeout,
bundled: None,
},
ExtensionConfigRequest::StreamableHttp {
name,
uri,
envs,
env_keys,
headers,
timeout,
} => ExtensionConfig::StreamableHttp {
name,
uri,
envs,
env_keys,
headers,
description: None,
timeout,
bundled: None,
},
ExtensionConfigRequest::Stdio {
name,
cmd,
Expand Down
46 changes: 44 additions & 2 deletions crates/goose/src/agents/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::config::permission::PermissionLevel;
#[derive(Error, Debug)]
pub enum ExtensionError {
#[error("Failed to start the MCP server from configuration `{0}` `{1}`")]
Initialization(ExtensionConfig, ClientError),
Initialization(Box<ExtensionConfig>, ClientError),
#[error("Failed a client call to an MCP server: {0}")]
Client(#[from] ClientError),
#[error("User Message exceeded context-limit. History could not be truncated to accommodate.")]
Expand Down Expand Up @@ -54,7 +54,7 @@ impl Envs {
"LD_AUDIT", // Loads a monitoring library that can intercept execution
"LD_DEBUG", // Enables verbose linker logging (information disclosure risk)
"LD_BIND_NOW", // Forces immediate symbol resolution, affecting ASLR
"LD_ASSUME_KERNEL", // Tricks linker into thinking its running on an older kernel
"LD_ASSUME_KERNEL", // Tricks linker into thinking it's running on an older kernel
// 🍎 macOS dynamic linker variables
"DYLD_LIBRARY_PATH", // Same as LD_LIBRARY_PATH but for macOS
"DYLD_INSERT_LIBRARIES", // macOS equivalent of LD_PRELOAD
Expand Down Expand Up @@ -168,6 +168,26 @@ pub enum ExtensionConfig {
#[serde(default)]
bundled: Option<bool>,
},
/// Streamable HTTP client with a URI endpoint using MCP Streamable HTTP specification
#[serde(rename = "streamable_http")]
StreamableHttp {
/// The name used to identify this extension
name: String,
uri: String,
#[serde(default)]
envs: Envs,
#[serde(default)]
env_keys: Vec<String>,
#[serde(default)]
headers: HashMap<String, String>,
description: Option<String>,
// NOTE: set timeout to be optional for compatibility.
// However, new configurations should include this field.
timeout: Option<u64>,
/// Whether this extension is bundled with Goose
#[serde(default)]
bundled: Option<bool>,
},
/// Frontend-provided tools that will be called through the frontend
#[serde(rename = "frontend")]
Frontend {
Expand Down Expand Up @@ -207,6 +227,24 @@ impl ExtensionConfig {
}
}

pub fn streamable_http<S: Into<String>, T: Into<u64>>(
name: S,
uri: S,
description: S,
timeout: T,
) -> Self {
Self::StreamableHttp {
name: name.into(),
uri: uri.into(),
envs: Envs::default(),
env_keys: Vec::new(),
headers: HashMap::new(),
description: Some(description.into()),
timeout: Some(timeout.into()),
bundled: None,
}
}

pub fn stdio<S: Into<String>, T: Into<u64>>(
name: S,
cmd: S,
Expand Down Expand Up @@ -263,6 +301,7 @@ impl ExtensionConfig {
pub fn name(&self) -> String {
match self {
Self::Sse { name, .. } => name,
Self::StreamableHttp { name, .. } => name,
Self::Stdio { name, .. } => name,
Self::Builtin { name, .. } => name,
Self::Frontend { name, .. } => name,
Expand All @@ -275,6 +314,9 @@ impl std::fmt::Display for ExtensionConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExtensionConfig::Sse { name, uri, .. } => write!(f, "SSE({}: {})", name, uri),
ExtensionConfig::StreamableHttp { name, uri, .. } => {
write!(f, "StreamableHttp({}: {})", name, uri)
}
ExtensionConfig::Stdio {
name, cmd, args, ..
} => {
Expand Down
31 changes: 28 additions & 3 deletions crates/goose/src/agents/extension_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::agents::extension::Envs;
use crate::config::{Config, ExtensionConfigManager};
use crate::prompt_template;
use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait};
use mcp_client::transport::{SseTransport, StdioTransport, Transport};
use mcp_client::transport::{SseTransport, StdioTransport, StreamableHttpTransport, Transport};
use mcp_core::{prompt::Prompt, Content, Tool, ToolCall, ToolError};
use serde_json::Value;

Expand Down Expand Up @@ -195,6 +195,28 @@ impl ExtensionManager {
.await?,
)
}
ExtensionConfig::StreamableHttp {
uri,
envs,
env_keys,
headers,
timeout,
..
} => {
let all_envs = merge_environments(envs, env_keys, &sanitized_name).await?;
let transport =
StreamableHttpTransport::with_headers(uri, all_envs, headers.clone());
let handle = transport.start().await?;
Box::new(
McpClient::connect(
handle,
Duration::from_secs(
timeout.unwrap_or(crate::config::DEFAULT_EXTENSION_TIMEOUT),
),
)
.await?,
)
}
ExtensionConfig::Stdio {
cmd,
args,
Expand Down Expand Up @@ -256,7 +278,7 @@ impl ExtensionManager {
let init_result = client
.initialize(info, capabilities)
.await
.map_err(|e| ExtensionError::Initialization(config.clone(), e))?;
.map_err(|e| ExtensionError::Initialization(Box::new(config.clone()), e))?;

if let Some(instructions) = init_result.instructions {
self.instructions
Expand Down Expand Up @@ -752,10 +774,13 @@ impl ExtensionManager {
ExtensionConfig::Sse {
description, name, ..
}
| ExtensionConfig::StreamableHttp {
description, name, ..
}
| ExtensionConfig::Stdio {
description, name, ..
} => {
// For SSE/Stdio, use description if available
// For SSE/StreamableHttp/Stdio, use description if available
description
.as_ref()
.map(|s| s.to_string())
Expand Down
1 change: 1 addition & 0 deletions crates/mcp-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ workspace = true
[dependencies]
mcp-core = { path = "../mcp-core" }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["io"] }
reqwest = { version = "0.11", default-features = false, features = ["json", "stream", "rustls-tls-native-roots"] }
eventsource-client = "0.12.0"
futures = "0.3"
Expand Down
19 changes: 18 additions & 1 deletion crates/mcp-client/examples/integration_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use futures::lock::Mutex;
use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait};
use mcp_client::transport::{SseTransport, Transport};
use mcp_client::transport::{SseTransport, StreamableHttpTransport, Transport};
use mcp_client::StdioTransport;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -20,6 +20,7 @@ async fn main() -> Result<()> {
.init();

test_transport(sse_transport().await?).await?;
test_transport(streamable_http_transport().await?).await?;
test_transport(stdio_transport().await?).await?;

// Test broken transport
Expand Down Expand Up @@ -52,6 +53,22 @@ async fn sse_transport() -> Result<SseTransport> {
))
}

async fn streamable_http_transport() -> Result<StreamableHttpTransport> {
let port = "60054";

tokio::process::Command::new("npx")
.env("PORT", port)
.arg("@modelcontextprotocol/server-everything")
.arg("streamable-http")
.spawn()?;
tokio::time::sleep(Duration::from_secs(1)).await;

Ok(StreamableHttpTransport::new(
format!("http://localhost:{}/mcp", port),
HashMap::new(),
))
}

async fn stdio_transport() -> Result<StdioTransport> {
Ok(StdioTransport::new(
"npx",
Expand Down
93 changes: 93 additions & 0 deletions crates/mcp-client/examples/streamable_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use anyhow::Result;
use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait};
use mcp_client::transport::{StreamableHttpTransport, Transport};
use std::collections::HashMap;
use std::time::Duration;
use tracing_subscriber::EnvFilter;

#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("mcp_client=debug".parse().unwrap())
.add_directive("eventsource_client=info".parse().unwrap()),
)
.init();

// Create example headers
let mut headers = HashMap::new();
headers.insert("X-Custom-Header".to_string(), "example-value".to_string());
headers.insert(
"User-Agent".to_string(),
"MCP-StreamableHttp-Client/1.0".to_string(),
);

// Create the Streamable HTTP transport with headers
let transport =
StreamableHttpTransport::with_headers("http://localhost:8000/mcp", HashMap::new(), headers);

// Start transport
let handle = transport.start().await?;

// Create client
let mut client = McpClient::connect(handle, Duration::from_secs(10)).await?;
println!("Client created with Streamable HTTP transport\n");

// Initialize
let server_info = client
.initialize(
ClientInfo {
name: "streamable-http-client".into(),
version: "1.0.0".into(),
},
ClientCapabilities::default(),
)
.await?;
println!("Connected to server: {server_info:?}\n");

// Give the server a moment to fully initialize
tokio::time::sleep(Duration::from_millis(500)).await;

// List tools
let tools = client.list_tools(None).await?;
println!("Available tools: {tools:?}\n");

// Call tool if available
if !tools.tools.is_empty() {
let tool_result = client
.call_tool(
&tools.tools[0].name,
serde_json::json!({ "message": "Hello from Streamable HTTP transport!" }),
)
.await?;
println!("Tool result: {tool_result:?}\n");
}

// List resources
let resources = client.list_resources(None).await?;
println!("Resources: {resources:?}\n");

// Read resource if available
if !resources.resources.is_empty() {
let resource = client.read_resource(&resources.resources[0].uri).await?;
println!("Resource content: {resource:?}\n");
}

// List prompts
let prompts = client.list_prompts(None).await?;
println!("Available prompts: {prompts:?}\n");

// Get prompt if available
if !prompts.prompts.is_empty() {
let prompt_result = client
.get_prompt(&prompts.prompts[0].name, serde_json::json!({}))
.await?;
println!("Prompt result: {prompt_result:?}\n");
}

println!("Streamable HTTP transport example completed successfully!");

Ok(())
}
4 changes: 3 additions & 1 deletion crates/mcp-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ pub mod transport;

pub use client::{ClientCapabilities, ClientInfo, Error, McpClient, McpClientTrait};
pub use service::McpService;
pub use transport::{SseTransport, StdioTransport, Transport, TransportHandle};
pub use transport::{
SseTransport, StdioTransport, StreamableHttpTransport, Transport, TransportHandle,
};
9 changes: 9 additions & 0 deletions crates/mcp-client/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub enum Error {

#[error("HTTP error: {status} - {message}")]
HttpError { status: u16, message: String },

#[error("Streamable HTTP error: {0}")]
StreamableHttpError(String),

#[error("Session error: {0}")]
SessionError(String),
}

/// A message that can be sent through the transport
Expand Down Expand Up @@ -78,3 +84,6 @@ pub use stdio::StdioTransport;

pub mod sse;
pub use sse::SseTransport;

pub mod streamable_http;
pub use streamable_http::StreamableHttpTransport;
Loading
Loading