Skip to content

Commit bd88e22

Browse files
committed
feat(task): add task support (SEP-1686)
1 parent 03040f8 commit bd88e22

File tree

9 files changed

+721
-2
lines changed

9 files changed

+721
-2
lines changed

crates/rmcp/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ tracing = { version = "0.1" }
2323
tokio-util = { version = "0.7" }
2424
pin-project-lite = "0.2"
2525
paste = { version = "1", optional = true }
26-
26+
async-trait = "0.1"
2727
# oauth2 support
2828
oauth2 = { version = "5.0", optional = true }
2929

crates/rmcp/src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ pub enum RmcpError {
4141
error: Box<dyn std::error::Error + Send + Sync>,
4242
},
4343
// and cancellation shouldn't be an error?
44+
45+
// TODO: add more error variants as needed
46+
#[error("Task error: {0}")]
47+
TaskError(String),
4448
}
4549

4650
impl RmcpError {

crates/rmcp/src/handler/server.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,21 @@ mod resource;
1010
pub mod router;
1111
pub mod tool;
1212
pub mod wrapper;
13+
1314
impl<H: ServerHandler> Service<RoleServer> for H {
1415
async fn handle_request(
1516
&self,
1617
request: <RoleServer as ServiceRole>::PeerReq,
1718
context: RequestContext<RoleServer>,
1819
) -> Result<<RoleServer as ServiceRole>::Resp, McpError> {
20+
// Pre-dispatch: check task meta and optionally enqueue as task
21+
if context.meta.get_task().is_some() {
22+
// Allow handler to decide whether and how to enqueue task
23+
if let Some(result) = self.enqueue_task(&request, context.clone()).await? {
24+
return Ok(result);
25+
}
26+
}
27+
1928
match request {
2029
ClientRequest::InitializeRequest(request) => self
2130
.initialize(request.params, context)
@@ -68,6 +77,14 @@ impl<H: ServerHandler> Service<RoleServer> for H {
6877
.list_tools(request.params, context)
6978
.await
7079
.map(ServerResult::ListToolsResult),
80+
ClientRequest::ListTasksRequest(request) => self
81+
.list_tasks(request.params, context)
82+
.await
83+
.map(ServerResult::ListTasksResult),
84+
ClientRequest::GetTaskInfoRequest(request) => self
85+
.get_task_info(request.params, context)
86+
.await
87+
.map(ServerResult::GetTaskInfoResult),
7188
}
7289
}
7390

@@ -100,6 +117,19 @@ impl<H: ServerHandler> Service<RoleServer> for H {
100117

101118
#[allow(unused_variables)]
102119
pub trait ServerHandler: Sized + Send + Sync + 'static {
120+
/// Optional pre-dispatch hook to enqueue incoming request as a background task.
121+
/// Default: do nothing and return None.
122+
/// Implementors that also act as an OperationHandler may override this to:
123+
/// - Inspect `context.meta` (e.g., key "modelcontextprotocol.io/task")
124+
/// - Build an operation and submit to a task manager
125+
/// - Return an immediate ServerResult (e.g., EmptyResult or a Task ack)
126+
fn enqueue_task(
127+
&self,
128+
_request: &ClientRequest,
129+
_context: RequestContext<RoleServer>,
130+
) -> impl Future<Output = Result<Option<ServerResult>, McpError>> + Send + '_ {
131+
std::future::ready(Ok(None))
132+
}
103133
fn ping(
104134
&self,
105135
context: RequestContext<RoleServer>,
@@ -228,4 +258,20 @@ pub trait ServerHandler: Sized + Send + Sync + 'static {
228258
fn get_info(&self) -> ServerInfo {
229259
ServerInfo::default()
230260
}
261+
262+
fn list_tasks(
263+
&self,
264+
request: Option<PaginatedRequestParam>,
265+
context: RequestContext<RoleServer>,
266+
) -> impl Future<Output = Result<ListTasksResult, McpError>> + Send + '_ {
267+
std::future::ready(Err(McpError::method_not_found::<ListTasksMethod>()))
268+
}
269+
270+
fn get_task_info(
271+
&self,
272+
request: GetTaskInfoParam,
273+
context: RequestContext<RoleServer>,
274+
) -> impl Future<Output = Result<GetTaskInfoResult, McpError>> + Send + '_ {
275+
std::future::ready(Err(McpError::method_not_found::<GetTaskInfoMethod>()))
276+
}
231277
}

crates/rmcp/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ pub use service::{RoleClient, serve_client};
162162
pub use service::{RoleServer, serve_server};
163163

164164
pub mod handler;
165+
pub mod task_manager;
165166
pub mod transport;
166167

167168
// re-export

crates/rmcp/src/model.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod meta;
88
mod prompt;
99
mod resource;
1010
mod serde_impl;
11+
mod task;
1112
mod tool;
1213
pub use annotated::*;
1314
pub use capabilities::*;
@@ -19,6 +20,7 @@ pub use prompt::*;
1920
pub use resource::*;
2021
use serde::{Deserialize, Serialize, de::DeserializeOwned};
2122
use serde_json::Value;
23+
pub use task::*;
2224
pub use tool::*;
2325

2426
/// A JSON object type alias for convenient handling of JSON data.
@@ -1654,6 +1656,23 @@ pub struct GetPromptResult {
16541656
pub messages: Vec<PromptMessage>,
16551657
}
16561658

1659+
// =============================================================================
1660+
// TASK MANAGEMENT
1661+
// =============================================================================
1662+
1663+
const_string!(GetTaskInfoMethod = "tasks/get");
1664+
pub type GetTaskInfoRequest = Request<GetTaskInfoMethod, GetTaskInfoParam>;
1665+
1666+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
1667+
#[serde(rename_all = "camelCase")]
1668+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
1669+
pub struct GetTaskInfoParam {
1670+
pub task_id: String,
1671+
}
1672+
1673+
const_string!(ListTasksMethod = "tasks/list");
1674+
pub type ListTasksRequest = RequestOptionalParam<ListTasksMethod, PaginatedRequestParam>;
1675+
16571676
// =============================================================================
16581677
// MESSAGE TYPE UNIONS
16591678
// =============================================================================
@@ -1720,7 +1739,9 @@ ts_union!(
17201739
| SubscribeRequest
17211740
| UnsubscribeRequest
17221741
| CallToolRequest
1723-
| ListToolsRequest;
1742+
| ListToolsRequest
1743+
| GetTaskInfoRequest
1744+
| ListTasksRequest;
17241745
);
17251746

17261747
impl ClientRequest {
@@ -1739,6 +1760,8 @@ impl ClientRequest {
17391760
ClientRequest::UnsubscribeRequest(r) => r.method.as_str(),
17401761
ClientRequest::CallToolRequest(r) => r.method.as_str(),
17411762
ClientRequest::ListToolsRequest(r) => r.method.as_str(),
1763+
ClientRequest::GetTaskInfoRequest(r) => r.method.as_str(),
1764+
ClientRequest::ListTasksRequest(r) => r.method.as_str(),
17421765
}
17431766
}
17441767
}
@@ -1794,6 +1817,8 @@ ts_union!(
17941817
| CallToolResult
17951818
| ListToolsResult
17961819
| CreateElicitationResult
1820+
| GetTaskInfoResult
1821+
| ListTasksResult
17971822
| EmptyResult
17981823
;
17991824
);
@@ -1804,6 +1829,28 @@ impl ServerResult {
18041829
}
18051830
}
18061831

1832+
// =============================================================================
1833+
// TASK RESULT TYPES (Server responses for task queries)
1834+
// =============================================================================
1835+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
1836+
#[serde(rename_all = "camelCase")]
1837+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
1838+
pub struct GetTaskInfoResult {
1839+
#[serde(skip_serializing_if = "Option::is_none")]
1840+
pub task: Option<crate::model::Task>,
1841+
}
1842+
1843+
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
1844+
#[serde(rename_all = "camelCase")]
1845+
#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))]
1846+
pub struct ListTasksResult {
1847+
pub tasks: Vec<crate::model::Task>,
1848+
#[serde(skip_serializing_if = "Option::is_none")]
1849+
pub next_cursor: Option<String>,
1850+
#[serde(skip_serializing_if = "Option::is_none")]
1851+
pub total: Option<u64>,
1852+
}
1853+
18071854
pub type ServerJsonRpcMessage = JsonRpcMessage<ServerRequest, ServerResult, ServerNotification>;
18081855

18091856
impl TryInto<CancelledNotification> for ServerNotification {

crates/rmcp/src/model/meta.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ variant_extension! {
6666
UnsubscribeRequest
6767
CallToolRequest
6868
ListToolsRequest
69+
GetTaskInfoRequest
70+
ListTasksRequest
6971
}
7072
}
7173

@@ -103,6 +105,7 @@ variant_extension! {
103105
#[serde(transparent)]
104106
pub struct Meta(pub JsonObject);
105107
const PROGRESS_TOKEN_FIELD: &str = "progressToken";
108+
const TASK_FIELD: &str = "modelcontextprotocol.io/task";
106109
impl Meta {
107110
pub fn new() -> Self {
108111
Self(JsonObject::new())
@@ -133,6 +136,12 @@ impl Meta {
133136
})
134137
}
135138

139+
pub fn get_task(&self) -> Option<String> {
140+
self.0
141+
.get(TASK_FIELD)
142+
.and_then(|v| v.as_str().map(|s| s.to_string()))
143+
}
144+
136145
pub fn set_progress_token(&mut self, token: ProgressToken) {
137146
match token.0 {
138147
NumberOrString::String(ref s) => self.0.insert(

0 commit comments

Comments
 (0)