From f27a10d47be2fb3fb09d44e4910792851f01b1d9 Mon Sep 17 00:00:00 2001 From: simondanielsson Date: Thu, 9 Apr 2026 13:05:42 +0000 Subject: [PATCH] feat: support streaming in service discovery path Signed-off-by: simondanielsson --- src/routers/http/vllm_pd_router.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/routers/http/vllm_pd_router.rs b/src/routers/http/vllm_pd_router.rs index 29cd9959..63a752d1 100644 --- a/src/routers/http/vllm_pd_router.rs +++ b/src/routers/http/vllm_pd_router.rs @@ -636,7 +636,9 @@ impl VllmPDRouter { Ok(response) } else { - // No logprobs merging needed - return decode response as-is + // No logprobs merging needed - stream decode response as-is (handles both streaming + // and non-streaming). Using bytes_stream() avoids buffering the entire SSE stream + // into memory, which would cause streaming requests to return an empty reply. debug!( "No logprobs merging needed (streaming={}, needs_logprobs={})", is_streaming, needs_logprobs @@ -644,18 +646,19 @@ impl VllmPDRouter { let status = decode_response.status(); let headers = decode_response.headers().clone(); - let body = decode_response - .bytes() - .await - .map_err(|e| format!("Failed to read decode response: {}", e))?; let mut response_builder = axum::http::Response::builder().status(status); for (name, value) in headers.iter() { - response_builder = response_builder.header(name, value); + // Skip hop-by-hop headers that must not be forwarded as-is; axum will + // set transfer-encoding and content-length correctly for the streamed body. + if name != "transfer-encoding" && name != "content-length" { + response_builder = response_builder.header(name, value); + } } + let body = axum::body::Body::from_stream(decode_response.bytes_stream()); let response = response_builder - .body(axum::body::Body::from(body)) + .body(body) .map_err(|e| format!("Failed to build response: {}", e))?; Ok(response)