From 9190457bbfd1d99c680ee1820ff844edd90a54e5 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Tue, 12 Aug 2025 21:03:01 +0800 Subject: [PATCH 1/4] feat: support downcast WorkerQuitReason::Fatal --- .../common/reqwest/streamable_http_client.rs | 2 +- .../src/transport/streamable_http_client.rs | 20 ++++++++---- .../streamable_http_server/session/local.rs | 32 ++++++++++++++----- crates/rmcp/src/transport/worker.rs | 13 +++++--- 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs b/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs index fd3aa1d5..c82cd177 100644 --- a/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs +++ b/crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs @@ -36,7 +36,7 @@ impl StreamableHttpClient for reqwest::Client { } let response = request_builder.send().await?; if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED { - return Err(StreamableHttpError::SeverDoesNotSupportSse); + return Err(StreamableHttpError::ServerDoesNotSupportSse); } let response = response.error_for_status()?; match response.headers().get(reqwest::header::CONTENT_TYPE) { diff --git a/crates/rmcp/src/transport/streamable_http_client.rs b/crates/rmcp/src/transport/streamable_http_client.rs index 20159a01..17aa7549 100644 --- a/crates/rmcp/src/transport/streamable_http_client.rs +++ b/crates/rmcp/src/transport/streamable_http_client.rs @@ -33,15 +33,17 @@ pub enum StreamableHttpError { #[error("Unexpected content type: {0:?}")] UnexpectedContentType(Option), #[error("Server does not support SSE")] - SeverDoesNotSupportSse, + ServerDoesNotSupportSse, #[error("Server does not support delete session")] - SeverDoesNotSupportDeleteSession, + ServerDoesNotSupportDeleteSession, #[error("Tokio join error: {0}")] TokioJoinError(#[from] tokio::task::JoinError), #[error("Deserialize error: {0}")] Deserialize(#[from] serde_json::Error), #[error("Transport channel closed")] TransportChannelClosed, + #[error("Missing session id in response")] + MissingSessionIdInResponse, #[cfg(feature = "auth")] #[cfg_attr(docsrs, doc(cfg(feature = "auth")))] #[error("Auth error: {0}")] @@ -54,6 +56,12 @@ impl From for StreamableHttpError { } } +#[derive(Debug, Clone, Error)] +pub enum StreamableHttpProtocolError { + #[error("Missing session id in response")] + MissingSessionIdInResponse, +} + pub enum StreamableHttpPostResponse { Accepted, Json(ServerJsonRpcMessage, Option), @@ -288,7 +296,7 @@ impl Worker for StreamableHttpClientWorker { } else { if !self.config.allow_stateless { return Err(WorkerQuitReason::fatal( - "missing session id in initialize response", + StreamableHttpError::::MissingSessionIdInResponse, "process initialize response", )); } @@ -308,7 +316,7 @@ impl Worker for StreamableHttpClientWorker { Ok(_) => { tracing::info!(session_id = session_id.as_ref(), "delete session success") } - Err(StreamableHttpError::SeverDoesNotSupportDeleteSession) => { + Err(StreamableHttpError::ServerDoesNotSupportDeleteSession) => { tracing::info!( session_id = session_id.as_ref(), "server doesn't support delete session" @@ -373,14 +381,14 @@ impl Worker for StreamableHttpClientWorker { )); tracing::debug!("got common stream"); } - Err(StreamableHttpError::SeverDoesNotSupportSse) => { + Err(StreamableHttpError::ServerDoesNotSupportSse) => { tracing::debug!("server doesn't support sse, skip common stream"); } Err(e) => { // fail to get common stream tracing::error!("fail to get common stream: {e}"); return Err(WorkerQuitReason::fatal( - "fail to get general purpose event stream", + e, "get general purpose event stream", )); } diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index c1c4f893..8cd232fc 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -317,7 +317,7 @@ enum OutboundChannel { RequestWise { id: HttpRequestId, close: bool }, Common, } - +#[derive(Debug)] pub struct StreamableHttpMessageReceiver { pub http_request_id: Option, pub inner: Receiver, @@ -534,8 +534,8 @@ impl LocalSessionWorker { } } } - -enum SessionEvent { +#[derive(Debug)] +pub enum SessionEvent { ClientMessage { message: ClientJsonRpcMessage, http_request_id: Option, @@ -695,6 +695,17 @@ impl LocalSessionHandle { pub type SessionTransport = WorkerTransport; +#[derive(Debug, Error)] +pub enum LocalSessionError { + #[error("transport terminated")] + TransportTerminated, + #[error("unexpected message: {0:?}")] + UnexpectedEvent(SessionEvent), + #[error("fail to send initialize request {0}")] + FailToSendInitializeRequest(SessionError), + #[error("keep alive timeout")] + KeepAliveTimeout, +} impl Worker for LocalSessionWorker { type Error = SessionError; type Role = RoleServer; @@ -718,11 +729,14 @@ impl Worker for LocalSessionWorker { } // waiting for initialize request let evt = self.event_rx.recv().await.ok_or_else(|| { - WorkerQuitReason::fatal("transport terminated", "get initialize request") + WorkerQuitReason::fatal( + LocalSessionError::TransportTerminated, + "get initialize request", + ) })?; let SessionEvent::InitializeRequest { request, responder } = evt else { return Err(WorkerQuitReason::fatal( - "unexpected message", + LocalSessionError::UnexpectedEvent(evt), "get initialize request", )); }; @@ -732,7 +746,9 @@ impl Worker for LocalSessionWorker { .send(Ok(send_initialize_response.message)) .map_err(|_| { WorkerQuitReason::fatal( - "failed to send initialize response to http service", + LocalSessionError::FailToSendInitializeRequest( + SessionError::SessionServiceTerminated, + ), "send initialize response", ) })?; @@ -749,7 +765,7 @@ impl Worker for LocalSessionWorker { if let Some(event) = event { InnerEvent::FromHttpService(event) } else { - return Err(WorkerQuitReason::fatal("session dropped", "waiting next session event")) + return Err(WorkerQuitReason::fatal(LocalSessionError::TransportTerminated, "waiting next session event")) } }, from_handler = context.recv_from_handler() => { @@ -759,7 +775,7 @@ impl Worker for LocalSessionWorker { return Err(WorkerQuitReason::Cancelled) } _ = keep_alive_timeout => { - return Err(WorkerQuitReason::fatal("keep live timeout", "poll next session event")) + return Err(WorkerQuitReason::fatal(LocalSessionError::KeepAliveTimeout, "poll next session event")) } }; match event { diff --git a/crates/rmcp/src/transport/worker.rs b/crates/rmcp/src/transport/worker.rs index 5ae9098e..0c793283 100644 --- a/crates/rmcp/src/transport/worker.rs +++ b/crates/rmcp/src/transport/worker.rs @@ -12,7 +12,7 @@ pub enum WorkerQuitReason { Join(#[from] tokio::task::JoinError), #[error("Transport fatal {error}, when {context}")] Fatal { - error: Cow<'static, str>, + error: Box, context: Cow<'static, str>, }, #[error("Transport canncelled")] @@ -24,17 +24,20 @@ pub enum WorkerQuitReason { } impl WorkerQuitReason { - pub fn fatal(msg: impl Into>, context: impl Into>) -> Self { + pub fn fatal( + error: impl std::error::Error + Send + 'static, + context: impl Into>, + ) -> Self { Self::Fatal { - error: msg.into(), + error: Box::new(error), context: context.into(), } } - pub fn fatal_context( + pub fn fatal_context( context: impl Into>, ) -> impl FnOnce(E) -> Self { |e| Self::Fatal { - error: Cow::Owned(format!("{e}")), + error: Box::new(e), context: context.into(), } } From fdded3beba4224ee2c390e0dc26a09d895f7ce56 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Wed, 13 Aug 2025 12:32:01 +0800 Subject: [PATCH 2/4] feat(transport): expose internal worker error in fatal --- .../src/transport/streamable_http_client.rs | 6 +-- .../streamable_http_server/session/local.rs | 42 +++++++++++-------- crates/rmcp/src/transport/worker.rs | 29 ++++++------- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/crates/rmcp/src/transport/streamable_http_client.rs b/crates/rmcp/src/transport/streamable_http_client.rs index 17aa7549..9c307603 100644 --- a/crates/rmcp/src/transport/streamable_http_client.rs +++ b/crates/rmcp/src/transport/streamable_http_client.rs @@ -269,7 +269,7 @@ impl Worker for StreamableHttpClientWorker { async fn run( self, mut context: super::worker::WorkerContext, - ) -> Result<(), WorkerQuitReason> { + ) -> Result<(), WorkerQuitReason> { let channel_buffer_capacity = self.config.channel_buffer_capacity; let (sse_worker_tx, mut sse_worker_rx) = tokio::sync::mpsc::channel::(channel_buffer_capacity); @@ -286,7 +286,7 @@ impl Worker for StreamableHttpClientWorker { .post_message(config.uri.clone(), initialize_request, None, None) .await .map_err(WorkerQuitReason::fatal_context("send initialize request"))? - .expect_initialized::() + .expect_initialized::() .await .map_err(WorkerQuitReason::fatal_context( "process initialize response", @@ -346,7 +346,7 @@ impl Worker for StreamableHttpClientWorker { .map_err(WorkerQuitReason::fatal_context( "send initialized notification", ))? - .expect_accepted::() + .expect_accepted::() .map_err(WorkerQuitReason::fatal_context( "process initialized notification response", ))?; diff --git a/crates/rmcp/src/transport/streamable_http_server/session/local.rs b/crates/rmcp/src/transport/streamable_http_server/session/local.rs index 8cd232fc..5458c404 100644 --- a/crates/rmcp/src/transport/streamable_http_server/session/local.rs +++ b/crates/rmcp/src/transport/streamable_http_server/session/local.rs @@ -296,12 +296,8 @@ pub enum SessionError { SessionServiceTerminated, #[error("Invalid event id")] InvalidEventId, - #[error("Transport closed")] - TransportClosed, #[error("IO error: {0}")] Io(#[from] std::io::Error), - #[error("Tokio join error {0}")] - TokioJoinError(#[from] tokio::task::JoinError), } impl From for std::io::Error { @@ -696,24 +692,30 @@ impl LocalSessionHandle { pub type SessionTransport = WorkerTransport; #[derive(Debug, Error)] -pub enum LocalSessionError { +pub enum LocalSessionWorkerError { #[error("transport terminated")] TransportTerminated, #[error("unexpected message: {0:?}")] UnexpectedEvent(SessionEvent), #[error("fail to send initialize request {0}")] FailToSendInitializeRequest(SessionError), - #[error("keep alive timeout")] - KeepAliveTimeout, + #[error("fail to handle message: {0}")] + FailToHandleMessage(SessionError), + #[error("keep alive timeout after {}ms", _0.as_millis())] + KeepAliveTimeout(Duration), + #[error("Transport closed")] + TransportClosed, + #[error("Tokio join error {0}")] + TokioJoinError(#[from] tokio::task::JoinError), } impl Worker for LocalSessionWorker { - type Error = SessionError; + type Error = LocalSessionWorkerError; type Role = RoleServer; fn err_closed() -> Self::Error { - SessionError::TransportClosed + LocalSessionWorkerError::TransportClosed } fn err_join(e: tokio::task::JoinError) -> Self::Error { - SessionError::TokioJoinError(e) + LocalSessionWorkerError::TokioJoinError(e) } fn config(&self) -> crate::transport::worker::WorkerConfig { crate::transport::worker::WorkerConfig { @@ -722,7 +724,10 @@ impl Worker for LocalSessionWorker { } } #[instrument(name = "streamable_http_session", skip_all, fields(id = self.id.as_ref()))] - async fn run(mut self, mut context: WorkerContext) -> Result<(), WorkerQuitReason> { + async fn run( + mut self, + mut context: WorkerContext, + ) -> Result<(), WorkerQuitReason> { enum InnerEvent { FromHttpService(SessionEvent), FromHandler(WorkerSendRequest), @@ -730,13 +735,13 @@ impl Worker for LocalSessionWorker { // waiting for initialize request let evt = self.event_rx.recv().await.ok_or_else(|| { WorkerQuitReason::fatal( - LocalSessionError::TransportTerminated, + LocalSessionWorkerError::TransportTerminated, "get initialize request", ) })?; let SessionEvent::InitializeRequest { request, responder } = evt else { return Err(WorkerQuitReason::fatal( - LocalSessionError::UnexpectedEvent(evt), + LocalSessionWorkerError::UnexpectedEvent(evt), "get initialize request", )); }; @@ -746,7 +751,7 @@ impl Worker for LocalSessionWorker { .send(Ok(send_initialize_response.message)) .map_err(|_| { WorkerQuitReason::fatal( - LocalSessionError::FailToSendInitializeRequest( + LocalSessionWorkerError::FailToSendInitializeRequest( SessionError::SessionServiceTerminated, ), "send initialize response", @@ -765,7 +770,7 @@ impl Worker for LocalSessionWorker { if let Some(event) = event { InnerEvent::FromHttpService(event) } else { - return Err(WorkerQuitReason::fatal(LocalSessionError::TransportTerminated, "waiting next session event")) + return Err(WorkerQuitReason::fatal(LocalSessionWorkerError::TransportTerminated, "waiting next session event")) } }, from_handler = context.recv_from_handler() => { @@ -775,7 +780,7 @@ impl Worker for LocalSessionWorker { return Err(WorkerQuitReason::Cancelled) } _ = keep_alive_timeout => { - return Err(WorkerQuitReason::fatal(LocalSessionError::KeepAliveTimeout, "poll next session event")) + return Err(WorkerQuitReason::fatal(LocalSessionWorkerError::KeepAliveTimeout(keep_alive), "poll next session event")) } }; match event { @@ -795,7 +800,10 @@ impl Worker for LocalSessionWorker { // no need to unregister resource } }; - let handle_result = self.handle_server_message(message).await; + let handle_result = self + .handle_server_message(message) + .await + .map_err(LocalSessionWorkerError::FailToHandleMessage); let _ = responder.send(handle_result).inspect_err(|error| { tracing::warn!(?error, "failed to send message to http service handler"); }); diff --git a/crates/rmcp/src/transport/worker.rs b/crates/rmcp/src/transport/worker.rs index 0c793283..eaabc506 100644 --- a/crates/rmcp/src/transport/worker.rs +++ b/crates/rmcp/src/transport/worker.rs @@ -7,12 +7,12 @@ use super::{IntoTransport, Transport}; use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage}; #[derive(Debug, thiserror::Error)] -pub enum WorkerQuitReason { +pub enum WorkerQuitReason { #[error("Join error {0}")] Join(#[from] tokio::task::JoinError), #[error("Transport fatal {error}, when {context}")] Fatal { - error: Box, + error: E, context: Cow<'static, str>, }, #[error("Transport canncelled")] @@ -23,21 +23,16 @@ pub enum WorkerQuitReason { HandlerTerminated, } -impl WorkerQuitReason { - pub fn fatal( - error: impl std::error::Error + Send + 'static, - context: impl Into>, - ) -> Self { +impl WorkerQuitReason { + pub fn fatal(error: E, context: impl Into>) -> Self { Self::Fatal { - error: Box::new(error), + error, context: context.into(), } } - pub fn fatal_context( - context: impl Into>, - ) -> impl FnOnce(E) -> Self { + pub fn fatal_context(context: impl Into>) -> impl FnOnce(E) -> Self { |e| Self::Fatal { - error: Box::new(e), + error: e, context: context.into(), } } @@ -51,7 +46,7 @@ pub trait Worker: Sized + Send + 'static { fn run( self, context: WorkerContext, - ) -> impl Future> + Send; + ) -> impl Future>> + Send; fn config(&self) -> WorkerConfig { WorkerConfig::default() } @@ -65,7 +60,7 @@ pub struct WorkerSendRequest { pub struct WorkerTransport { rx: tokio::sync::mpsc::Receiver>, send_service: tokio::sync::mpsc::Sender>, - join_handle: Option>>, + join_handle: Option>>>, _drop_guard: tokio_util::sync::DropGuard, ct: CancellationToken, } @@ -162,14 +157,16 @@ impl WorkerContext { pub async fn send_to_handler( &mut self, item: RxJsonRpcMessage, - ) -> Result<(), WorkerQuitReason> { + ) -> Result<(), WorkerQuitReason> { self.to_handler_tx .send(item) .await .map_err(|_| WorkerQuitReason::HandlerTerminated) } - pub async fn recv_from_handler(&mut self) -> Result, WorkerQuitReason> { + pub async fn recv_from_handler( + &mut self, + ) -> Result, WorkerQuitReason> { self.from_handler_rx .recv() .await From e9dbe202f52785a737598f8c0f1ced40b851d26b Mon Sep 17 00:00:00 2001 From: 4t145 Date: Fri, 15 Aug 2025 14:52:32 +0800 Subject: [PATCH 3/4] Update crates/rmcp/src/transport/streamable_http_client.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/rmcp/src/transport/streamable_http_client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rmcp/src/transport/streamable_http_client.rs b/crates/rmcp/src/transport/streamable_http_client.rs index 9c307603..9a7ef646 100644 --- a/crates/rmcp/src/transport/streamable_http_client.rs +++ b/crates/rmcp/src/transport/streamable_http_client.rs @@ -61,7 +61,6 @@ pub enum StreamableHttpProtocolError { #[error("Missing session id in response")] MissingSessionIdInResponse, } - pub enum StreamableHttpPostResponse { Accepted, Json(ServerJsonRpcMessage, Option), From 2411451eb51c4b4c91987b109d7cf57b17cfcd59 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Fri, 15 Aug 2025 14:52:51 +0800 Subject: [PATCH 4/4] Update crates/rmcp/src/transport/streamable_http_client.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- crates/rmcp/src/transport/streamable_http_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rmcp/src/transport/streamable_http_client.rs b/crates/rmcp/src/transport/streamable_http_client.rs index 9a7ef646..22a6b0d6 100644 --- a/crates/rmcp/src/transport/streamable_http_client.rs +++ b/crates/rmcp/src/transport/streamable_http_client.rs @@ -42,7 +42,7 @@ pub enum StreamableHttpError { Deserialize(#[from] serde_json::Error), #[error("Transport channel closed")] TransportChannelClosed, - #[error("Missing session id in response")] + #[error("Missing session id in HTTP response")] MissingSessionIdInResponse, #[cfg(feature = "auth")] #[cfg_attr(docsrs, doc(cfg(feature = "auth")))]