From 52ed9d31e47cb092c71271fca2d0636f9c8c52ad Mon Sep 17 00:00:00 2001 From: Guoqing Bao Date: Sat, 28 Mar 2026 16:49:39 +0000 Subject: [PATCH 1/4] Use placeholder approach --- src/core/engine.rs | 141 ++++++++++++++++++++++--- src/core/scheduler.rs | 161 ++++++++++++++++++++++++++++- src/mcp/manager.rs | 2 +- src/server/mod.rs | 2 +- src/server/parser.rs | 48 +++++++++ src/server/server.rs | 37 ++++++- src/utils/chat_template.rs | 207 ++++++++++++++++++++++++++++++++++++- 7 files changed, 572 insertions(+), 26 deletions(-) diff --git a/src/core/engine.rs b/src/core/engine.rs index 5671b7ff..d3f37554 100644 --- a/src/core/engine.rs +++ b/src/core/engine.rs @@ -18,7 +18,7 @@ use crate::server::{EmbeddingStrategy, UsageResponse}; use crate::tools::Tool; use crate::transfer::PdRole; use crate::transfer::Transfer; -use crate::utils::chat_template::Message; +use crate::utils::chat_template::{Message, PromptReplay}; use crate::utils::config::{EngineConfig, EosTokenId, ModelType, SamplingParams}; use crate::utils::guidance::{build_llg_factory, extract_guidance_tokens, GuidanceTokens}; use crate::utils::heartbeat::heartbeat_worker; @@ -58,6 +58,9 @@ pub static GLOBAL_RT: Lazy = Lazy::new(|| { .expect("Failed to build global Tokio runtime") }); +const REASONING_START_MARKERS: &[&str] = &["", "<|think|>", "[THINK]", ""]; +const REASONING_END_MARKERS: &[&str] = &["", "<|/think|>", "[/THINK]", ""]; + #[derive(Debug, Clone)] pub enum StreamItem { Token(String, u32), //streaming: (text, token_id) @@ -87,6 +90,8 @@ pub struct LLMEngine { decode_start_times: HashMap, decode_length: HashMap, seq_prefilled_reasoning_end: HashMap, + seq_prompt_replays: HashMap>, + reasoning_space_token_id: Option, last_check_throughput_time: usize, active_requests: HashSet, cancelled_sequences: Vec, @@ -426,6 +431,31 @@ impl LLMEngine { // Set tokenizer for JSON tool call detection (for models like Qwen3 that output raw JSON) scheduler.set_tokenizer(Arc::new(tokenizer.clone())); + let reasoning_space_token_id = { + let resolve_single_token = |s: &str| -> Option { + tokenizer.encode(s, false).ok().and_then(|enc| { + let ids = enc.get_ids(); + if ids.len() == 1 { + Some(ids[0]) + } else { + None + } + }) + }; + + let start_ids: Vec = REASONING_START_MARKERS + .iter() + .filter_map(|m| resolve_single_token(m)) + .collect(); + let end_ids: Vec = REASONING_END_MARKERS + .iter() + .filter_map(|m| resolve_single_token(m)) + .collect(); + let space_id = resolve_single_token(" "); + scheduler.set_reasoning_marker_tokens(start_ids, end_ids, space_id); + space_id + }; + log_warn!( "Maximum batched tokens {} ({} blocks x Block_Size {} for KV cache). Additional CPU KV Cache blocks {}.", econfig.max_num_batched_tokens, @@ -474,6 +504,8 @@ impl LLMEngine { decode_start_times: HashMap::new(), decode_length: HashMap::new(), seq_prefilled_reasoning_end: HashMap::new(), + seq_prompt_replays: HashMap::new(), + reasoning_space_token_id, last_check_throughput_time: 0, active_requests: HashSet::new(), cancelled_sequences: Vec::new(), @@ -494,6 +526,7 @@ impl LLMEngine { &mut self, params: &SamplingParams, prompt: &str, + prompt_replay: Option<&PromptReplay>, request_type: &RequestType, images: &Option, image_idx: i32, @@ -504,6 +537,19 @@ impl LLMEngine { .expect("encode failed!"); let token_ids: Vec = tokens.get_ids().iter().map(|&x| x).collect(); let length = token_ids.len(); + let raw_replay_token_ids = self.encode_prompt_replay_token_ids(prompt_replay); + let placeholder_replay = self.should_enable_reasoning_cache_revision_for_params(params); + let replay_token_ids = raw_replay_token_ids.as_ref().map(|replay_ids| { + if placeholder_replay { + if let Some(space_id) = self.reasoning_space_token_id { + vec![space_id; replay_ids.len()] + } else { + replay_ids.clone() + } + } else { + replay_ids.clone() + } + }); if let Some(max_model_len) = self.econfig.max_model_len { if length > max_model_len - 1 { candle_core::bail!( @@ -603,6 +649,15 @@ impl LLMEngine { if let Some(end_marker) = detect_prefilled_reasoning_end_marker(prompt) { self.seq_prefilled_reasoning_end.insert(seq_id, end_marker); } + if let Some(replay_ids) = replay_token_ids { + self.seq_prompt_replays.insert(seq_id, replay_ids); + } + if placeholder_replay { + if let Some(raw_replay_ids) = raw_replay_token_ids { + self.scheduler + .set_reasoning_prompt_replay_suffix(seq_id, raw_replay_ids); + } + } if *request_type == RequestType::Stream { let tokenizer = self.tokenizer.clone(); @@ -623,12 +678,19 @@ impl LLMEngine { &mut self, params: &SamplingParams, prompt: &str, + prompt_replay: Option<&PromptReplay>, request_type: RequestType, images: &Option, image_idx: i32, ) -> Result<(usize, usize, Receiver)> { - let (seq_id, prompt_length) = - self.add_request_(params, prompt, &request_type, images, image_idx)?; + let (seq_id, prompt_length) = self.add_request_( + params, + prompt, + prompt_replay, + &request_type, + images, + image_idx, + )?; let (tx, rx) = channel(1024); self.stream_senders.insert(seq_id, tx); self.request_types.insert(seq_id, request_type.clone()); @@ -648,10 +710,37 @@ impl LLMEngine { self.scheduler.get_num_cached_tokens() } + fn should_enable_reasoning_cache_revision(&self, tools: &[Tool]) -> bool { + !tools.is_empty() && self.scheduler.block_manager.prefix_cache_enabled() + } + + fn should_enable_reasoning_cache_revision_for_params(&self, params: &SamplingParams) -> bool { + params.mcp_mode.is_some() && self.scheduler.block_manager.prefix_cache_enabled() + } + + fn encode_prompt_replay_token_ids( + &self, + prompt_replay: Option<&PromptReplay>, + ) -> Option> { + let replay = prompt_replay?; + if replay.is_empty() { + return None; + } + self.tokenizer + .encode(replay.suffix_text.as_str(), false) + .ok() + .map(|encoding| encoding.get_ids().to_vec()) + .filter(|ids| !ids.is_empty()) + } + pub fn get_available_kv_tokens(&self) -> usize { self.scheduler.get_available_kv_tokens() } + fn take_prompt_replay_token_ids(&mut self, seq_id: usize) -> Option> { + self.seq_prompt_replays.remove(&seq_id) + } + pub fn notify_runner_finished(&mut self, id: usize) -> Result<()> { match &mut *self.runners.write() { RunnerType::Thread(model_runner) => Ok(model_runner.finished(id)), @@ -842,6 +931,7 @@ impl LLMEngine { self.decode_start_times.remove(&seq_id); self.decode_length.remove(&seq_id); self.seq_prefilled_reasoning_end.remove(&seq_id); + self.seq_prompt_replays.remove(&seq_id); let _ = self.notify_runner_finished(seq_id); if self.econfig.server_mode.unwrap_or(true) { self.scheduler.print_free_blocks(); @@ -876,13 +966,17 @@ impl LLMEngine { *length = s.output_len(); } - let token_ids = + let mut token_ids = if self.is_pd_mode() && s.pd_first_token.is_some() && s.output_len() == 2 { // Special case, the real first token is generated on PD server vec![s.pd_first_token.unwrap_or(s.last_token), s.last_token] } else { vec![s.last_token] }; + if let Some(mut replay_ids) = self.take_prompt_replay_token_ids(seq_id) { + replay_ids.extend(token_ids); + token_ids = replay_ids; + } if let Some(sender) = self.stream_senders.get_mut(&seq_id) { if let Some(request_type) = self.request_types.get(&seq_id) { @@ -980,6 +1074,7 @@ impl LLMEngine { self.stream_decoders.remove(&seq_id); self.decode_start_times.remove(&seq_id); self.seq_prefilled_reasoning_end.remove(&seq_id); + self.seq_prompt_replays.remove(&seq_id); if let Some(r) = &reason { if let Some(sender) = self.stream_senders.get_mut(&seq_id) { if let Some(request_type) = self.request_types.get(&seq_id) { @@ -1059,7 +1154,7 @@ impl LLMEngine { messages: &Vec, tools: &Vec, log: bool, - ) -> (String, i32) { + ) -> (String, i32, Option) { // let mut collected_images = Vec::new(); let mut prompt_template = self.template.clone(); prompt_template.set_enable_thinking(params.thinking.unwrap_or(false)); @@ -1088,6 +1183,7 @@ impl LLMEngine { } prompt }; + let replay = prompt_template.generation_prompt_replay(tools, &prompt); if log { log_info!( @@ -1095,7 +1191,7 @@ impl LLMEngine { prompt.replace("\n", "") ); } - (prompt, image_idx) + (prompt, image_idx, replay) } pub fn is_idle(&self) -> bool { @@ -1127,13 +1223,22 @@ impl LLMEngine { } let mut receivers = Vec::new(); for (param, messages) in params.iter().zip(message_list.iter()) { - let (prompt, image_idx) = self.apply_chat_template(param, messages, tools, false); + let (prompt, image_idx, prompt_replay) = + self.apply_chat_template(param, messages, tools, false); if let Some(ref l) = logger { l.log_prompt(&prompt); } - if let Ok((seq_id, prompt_length, rx)) = - self.add_request(param, &prompt, RequestType::Completion, &images, image_idx) - { + if let Ok((seq_id, prompt_length, rx)) = self.add_request( + param, + &prompt, + prompt_replay.as_ref(), + RequestType::Completion, + &images, + image_idx, + ) { + if self.should_enable_reasoning_cache_revision(tools) { + self.scheduler.mark_reasoning_marker_revision(seq_id); + } receivers.push((seq_id, prompt_length, rx)); } } @@ -1271,6 +1376,7 @@ impl LLMEngine { self.scheduler.clear_finished(); self.scheduler.release_waitings(); self.seq_prefilled_reasoning_end.clear(); + self.seq_prompt_replays.clear(); } /// Returns the reasoning end marker when the prompt for this sequence @@ -1287,12 +1393,23 @@ impl LLMEngine { tools: &Vec, logger: &Option>, ) -> Result<(usize, usize, Option, mpsc::Receiver)> { - let (prompt, image_idx) = self.apply_chat_template(params, messages, tools, false); + let (prompt, image_idx, prompt_replay) = + self.apply_chat_template(params, messages, tools, false); if let Some(ref l) = logger { l.log_prompt(&prompt); } - match self.add_request(params, &prompt, RequestType::Stream, &images, image_idx) { + match self.add_request( + params, + &prompt, + prompt_replay.as_ref(), + RequestType::Stream, + &images, + image_idx, + ) { Ok((seq_id, prompt_length, rx)) => { + if self.should_enable_reasoning_cache_revision(tools) { + self.scheduler.mark_reasoning_marker_revision(seq_id); + } let prefilled_reasoning_end = self.get_prefilled_reasoning_end_marker(seq_id); Ok((seq_id, prompt_length, prefilled_reasoning_end, rx)) } diff --git a/src/core/scheduler.rs b/src/core/scheduler.rs index cf9b6133..fa785b29 100644 --- a/src/core/scheduler.rs +++ b/src/core/scheduler.rs @@ -10,7 +10,7 @@ use crate::utils::config::{Config, EngineConfig, EosTokenId}; use candle_core::Result; use parking_lot::RwLock; use regex::Regex; -use std::collections::VecDeque; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tokenizers::Tokenizer; @@ -35,6 +35,11 @@ pub struct Scheduler { cfg: EngineConfig, pd_config: Option, is_last_prefill: bool, + reasoning_marker_revision_seqs: HashSet, + reasoning_prompt_replay_suffixes: HashMap>, + reasoning_start_token_ids: Vec, + reasoning_end_token_ids: Vec, + space_token_id: Option, } const MIN_NUM_SCHEDULED_REQS: usize = 5; @@ -149,6 +154,95 @@ impl Scheduler { cfg: econfig.clone(), pd_config: econfig.pd_config.clone(), is_last_prefill: false, + reasoning_marker_revision_seqs: HashSet::new(), + reasoning_prompt_replay_suffixes: HashMap::new(), + reasoning_start_token_ids: Vec::new(), + reasoning_end_token_ids: Vec::new(), + space_token_id: None, + } + } + + pub fn mark_reasoning_marker_revision(&mut self, seq_id: usize) { + self.reasoning_marker_revision_seqs.insert(seq_id); + } + + pub fn set_reasoning_prompt_replay_suffix(&mut self, seq_id: usize, suffix_ids: Vec) { + if suffix_ids.is_empty() { + self.reasoning_prompt_replay_suffixes.remove(&seq_id); + } else { + self.reasoning_prompt_replay_suffixes + .insert(seq_id, suffix_ids); + } + } + + pub fn set_reasoning_marker_tokens( + &mut self, + start_ids: Vec, + end_ids: Vec, + space_id: Option, + ) { + self.reasoning_start_token_ids = start_ids; + self.reasoning_end_token_ids = end_ids; + self.space_token_id = space_id; + } + + fn token_is_whitespace_only(tokenizer: Option<&Arc>, token_id: u32) -> bool { + tokenizer + .and_then(|tokenizer| tokenizer.decode(&[token_id], false).ok()) + .is_some_and(|text| !text.is_empty() && text.chars().all(|ch| ch.is_whitespace())) + } + + fn try_revise_reasoning_markers( + revision_set: &mut HashSet, + replay_suffixes: &mut HashMap>, + start_ids: &[u32], + end_ids: &[u32], + space_id: Option, + tokenizer: Option<&Arc>, + seq: &mut Sequence, + ) { + if !revision_set.remove(&seq.id) { + return; + } + let space = match space_id { + Some(id) => id, + None => return, + }; + let prompt_len = seq.token_ids.len() - seq.output_ids.len(); + if let Some(replay_suffix) = replay_suffixes.remove(&seq.id) { + if !replay_suffix.is_empty() + && prompt_len >= replay_suffix.len() + && seq.token_ids[prompt_len - replay_suffix.len()..prompt_len] == replay_suffix + { + for token_id in &mut seq.token_ids[prompt_len - replay_suffix.len()..prompt_len] { + *token_id = space; + } + } + } + + if !start_ids.is_empty() || !end_ids.is_empty() { + let mut pending_whitespace_placeholder = false; + for idx in 0..seq.output_ids.len() { + let token_id = seq.output_ids[idx]; + if start_ids.contains(&token_id) || end_ids.contains(&token_id) { + seq.output_ids[idx] = space; + seq.token_ids[prompt_len + idx] = space; + pending_whitespace_placeholder = true; + continue; + } + if pending_whitespace_placeholder + && Self::token_is_whitespace_only(tokenizer, token_id) + { + seq.output_ids[idx] = space; + seq.token_ids[prompt_len + idx] = space; + continue; + } + pending_whitespace_placeholder = false; + } + } + + if let Some(&last) = seq.token_ids.last() { + seq.last_token = last; } } @@ -468,12 +562,18 @@ impl Scheduler { ); let seq = &mut self.running[idx]; if success { - // Insert into prefix cache so future requests can benefit - // LRU eviction will handle memory pressure automatically + Self::try_revise_reasoning_markers( + &mut self.reasoning_marker_revision_seqs, + &mut self.reasoning_prompt_replay_suffixes, + &self.reasoning_start_token_ids, + &self.reasoning_end_token_ids, + self.space_token_id, + self.tokenizer.as_ref(), + seq, + ); self.block_manager .capture_mamba_prefix_state(seq, seq.len()); self.block_manager.cache_sequence(seq); - // Maintain resources until client asks to release or cache eviction seq.status = SequenceStatus::Cached; let cur_time = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -527,6 +627,15 @@ impl Scheduler { seq.is_tool_call_end = true; // External tool mode: finish stream so client can handle tool calls seq.status = SequenceStatus::Finished; + Self::try_revise_reasoning_markers( + &mut self.reasoning_marker_revision_seqs, + &mut self.reasoning_prompt_replay_suffixes, + &self.reasoning_start_token_ids, + &self.reasoning_end_token_ids, + self.space_token_id, + self.tokenizer.as_ref(), + seq, + ); self.block_manager .capture_mamba_prefix_state(seq, seq.len()); self.block_manager.cache_sequence(seq); @@ -561,6 +670,15 @@ impl Scheduler { }); } seq.status = SequenceStatus::Finished; + Self::try_revise_reasoning_markers( + &mut self.reasoning_marker_revision_seqs, + &mut self.reasoning_prompt_replay_suffixes, + &self.reasoning_start_token_ids, + &self.reasoning_end_token_ids, + self.space_token_id, + self.tokenizer.as_ref(), + seq, + ); self.block_manager .capture_mamba_prefix_state(seq, seq.len()); self.block_manager.cache_sequence(seq); @@ -621,6 +739,8 @@ impl Scheduler { } pub fn cancel(&mut self, seq_id: usize) { + self.reasoning_marker_revision_seqs.remove(&seq_id); + self.reasoning_prompt_replay_suffixes.remove(&seq_id); for i in 0..self.running.len() { let seq = &mut self.running[i]; if seq.id == seq_id { @@ -1206,3 +1326,36 @@ impl Scheduler { None } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::config::SamplingParams; + + #[test] + fn revises_prompt_replay_suffix_as_whole_span() { + let mut revision_set = HashSet::from([7usize]); + let mut replay_suffixes = HashMap::from([(7usize, vec![151, 152, 153])]); + let mut seq = Sequence::new( + vec![11, 12, 151, 152, 153], + 16, + SamplingParams::new_with_max_tokens(8), + &None, + 0, + ); + seq.id = 7; + + Scheduler::try_revise_reasoning_markers( + &mut revision_set, + &mut replay_suffixes, + &[], + &[], + Some(32), + None, + &mut seq, + ); + + assert_eq!(seq.token_ids, vec![11, 12, 32, 32, 32]); + assert_eq!(seq.last_token, 32); + } +} diff --git a/src/mcp/manager.rs b/src/mcp/manager.rs index 701b4a3d..55ad66b8 100644 --- a/src/mcp/manager.rs +++ b/src/mcp/manager.rs @@ -445,7 +445,7 @@ mod tests { #[test] fn memory_transport_client_roundtrip() { - let (mut client_transport, mut server_transport) = MemoryTransport::pair(); + let (client_transport, mut server_transport) = MemoryTransport::pair(); let server = thread::spawn(move || { let mut server = crate::mcp::server::McpServer::new("test", "0.1"); server.register_tool( diff --git a/src/server/mod.rs b/src/server/mod.rs index 4fb3485a..f08da7c2 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1111,7 +1111,7 @@ pub fn convert_chat_message( if let Some(tool_calls) = &msg.tool_calls { let mut content = String::new(); if let Some(existing) = &msg.content { - content = extract_text_content(existing).trim().to_owned(); + content = extract_text_content(existing); } let template_calls = tool_calls .iter() diff --git a/src/server/parser.rs b/src/server/parser.rs index 53e64227..0143a287 100644 --- a/src/server/parser.rs +++ b/src/server/parser.rs @@ -346,6 +346,48 @@ const REASONING_MARKERS: &[(&str, &str)] = &[ ("", ""), ]; +/// All reasoning marker strings, used for tool-response placeholder rewriting. +pub const ALL_REASONING_MARKER_STRINGS: &[&str] = &[ + "", + "", + "<|think|>", + "<|/think|>", + "[THINK]", + "[/THINK]", + "", + "", +]; + +/// Replace reasoning markers with a single-space placeholder for tool-call +/// responses sent back to the client. +pub fn strip_reasoning_markers_for_tool_response(text: &str) -> String { + if text.is_empty() { + return String::new(); + } + + let mut out = text.to_string(); + for marker in ALL_REASONING_MARKER_STRINGS { + if out.contains(marker) { + out = out.replace(marker, " "); + } + } + out +} + +pub fn text_contains_reasoning_marker(text: &str) -> bool { + ALL_REASONING_MARKER_STRINGS + .iter() + .any(|marker| text.contains(marker)) +} + +pub fn token_text_is_reasoning_boundary(text: &str) -> bool { + let trimmed = text.trim_matches(|ch: char| ch.is_whitespace()); + !trimmed.is_empty() + && ALL_REASONING_MARKER_STRINGS + .iter() + .any(|marker| trimmed == *marker) +} + /// Detect whether a rendered prompt already ends inside a reasoning block. /// /// This happens for templates that prefill `` in `add_generation_prompt`. @@ -1947,6 +1989,12 @@ abc assert!(!parser.contains_tool_markup(&safe)); } + #[test] + fn test_strip_reasoning_markers_for_tool_response_uses_space_placeholders() { + let text = "\nabc\n"; + assert_eq!(strip_reasoning_markers_for_tool_response(text), " \nabc\n "); + } + #[test] fn test_parser_defaults_to_qwen_coder_for_qwen35() { assert_eq!( diff --git a/src/server/server.rs b/src/server/server.rs index 8d498ec6..368df3dd 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -464,6 +464,7 @@ pub async fn chat_completion( let mut total_decoded_tokens = 0usize; let mut pending_tool_calls: Vec = Vec::new(); let mut suppressed_tool_markup: String = String::new(); + let mut pending_reasoning_whitespace_placeholder = false; let mut buffering_since: Option = None; let mut buffering_cancel_requested = false; let mut buffering_warned = false; @@ -549,11 +550,25 @@ pub async fn chat_completion( ); continue; } + let display_text = + if crate::server::parser::token_text_is_reasoning_boundary( + &token, + ) { + pending_reasoning_whitespace_placeholder = true; + " ".to_string() + } else if pending_reasoning_whitespace_placeholder + && token.chars().all(|ch| ch.is_whitespace()) + { + " ".to_string() + } else { + pending_reasoning_whitespace_placeholder = false; + crate::server::parser::strip_reasoning_markers_for_tool_response(&text) + }; // Send content to client if let Some(ref l) = stream_logger { - l.log_stream_token(&text); + l.log_stream_token(&display_text); } - if !stream_ctx.send_token(&text) { + if !stream_ctx.send_token(&display_text) { crate::log_error!( "[Seq {}] Stream send error (disconnected)", current_seq_id @@ -564,6 +579,7 @@ pub async fn chat_completion( } } StreamResult::Buffering => { + pending_reasoning_whitespace_placeholder = false; // Parser is buffering, don't send anything to client yet. if buffering_since.is_none() { buffering_since = Some(Instant::now()); @@ -605,6 +621,7 @@ pub async fn chat_completion( } } StreamResult::FlushBuffer(text) => { + pending_reasoning_whitespace_placeholder = false; buffering_since = None; buffering_cancel_requested = false; buffering_warned = false; @@ -633,6 +650,8 @@ pub async fn chat_completion( } let safe_text = tool_parser.sanitize_tool_markup_for_display(&text); + let safe_text = + crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_text); if safe_text != text { crate::log_warn!( "[Seq {}] Sanitized leaked tool markup in flushed text", @@ -655,6 +674,7 @@ pub async fn chat_completion( } } StreamResult::ToolCalls(tools) => { + pending_reasoning_whitespace_placeholder = false; buffering_since = None; buffering_cancel_requested = false; buffering_warned = false; @@ -716,6 +736,8 @@ pub async fn chat_completion( } else { let safe_buffer = tool_parser .sanitize_tool_markup_for_display(&buffer); + let safe_buffer = + crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_buffer); if safe_buffer != buffer { crate::log_warn!( "[Seq {}] Sanitized leaked tool markup in partial buffer", @@ -749,6 +771,8 @@ pub async fn chat_completion( if pending_tool_calls.is_empty() && !suppressed_tool_markup.is_empty() { let safe_suppressed = tool_parser .sanitize_tool_markup_for_display(&suppressed_tool_markup); + let safe_suppressed = + crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_suppressed); crate::log_warn!( "[Seq {}] Releasing {} suppressed tool-markup chars as sanitized text (no tool calls recovered)", current_seq_id, @@ -1103,7 +1127,14 @@ pub async fn chat_completion( output.decode_output.clone() } }; - (Some(fallback_text), None) + ( + Some( + crate::server::parser::strip_reasoning_markers_for_tool_response( + &fallback_text, + ), + ), + None, + ) } else { log_tool_calls("Valid", &valid_calls); let public_calls = valid_calls diff --git a/src/utils/chat_template.rs b/src/utils/chat_template.rs index 4de666b8..f1bbf8ef 100644 --- a/src/utils/chat_template.rs +++ b/src/utils/chat_template.rs @@ -127,6 +127,56 @@ pub struct ChatTemplate { enable_thinking: bool, } +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct PromptReplay { + pub suffix_text: String, +} + +impl PromptReplay { + pub fn is_empty(&self) -> bool { + self.suffix_text.is_empty() + } +} + +const ASSISTANT_REASONING_SENTINELS: [(&str, &str); 8] = [ + ("", "__VLLM_RS_ASSIST_REASONING_START__"), + ("", "__VLLM_RS_ASSIST_REASONING_END__"), + ("<|think|>", "__VLLM_RS_ASSIST_REASONING_START_QWEN__"), + ("<|/think|>", "__VLLM_RS_ASSIST_REASONING_END_QWEN__"), + ("[THINK]", "__VLLM_RS_ASSIST_REASONING_START_BRACKET__"), + ("[/THINK]", "__VLLM_RS_ASSIST_REASONING_END_BRACKET__"), + ("", "__VLLM_RS_ASSIST_REASONING_START_THOUGHT__"), + ("", "__VLLM_RS_ASSIST_REASONING_END_THOUGHT__"), +]; + +fn protect_assistant_reasoning_markers(content: &str) -> String { + let mut protected = content.to_string(); + for (raw, sentinel) in ASSISTANT_REASONING_SENTINELS { + protected = protected.replace(raw, sentinel); + } + protected +} + +fn restore_assistant_reasoning_markers(content: &str) -> String { + let mut restored = content.to_string(); + for (raw, sentinel) in ASSISTANT_REASONING_SENTINELS { + restored = restored.replace(sentinel, raw); + } + restored +} + +fn strip_generation_assistant_header(suffix_text: &str) -> &str { + let Some((first_line, remainder)) = suffix_text.split_once('\n') else { + return suffix_text; + }; + + if first_line.ends_with("assistant") { + return remainder; + } + + suffix_text +} + impl ChatTemplate { pub fn collect_escape_tokens(tokenizer: &Tokenizer, tool_markers: &[&str]) -> Vec { let mut tokens = tokenizer @@ -241,18 +291,28 @@ impl ChatTemplate { let mut escaped = message.clone(); // System/developer prompts can include engine-defined structural // tool-call instructions that must remain exact (e.g. ). - // Escape only user/assistant/tool payloads. - if !matches!(escaped.role.as_str(), "system" | "developer") { - escaped.content = self.escape_text(&escaped.content); + // Assistant reasoning markers are protected first so prior + // tool-call turns can round-trip placeholder restoration + // without being escaped away by special-token handling. + match escaped.role.as_str() { + "system" | "developer" => {} + "assistant" => { + let protected = protect_assistant_reasoning_markers(&escaped.content); + escaped.content = self.escape_text(&protected); + } + _ => { + escaped.content = self.escape_text(&escaped.content); + } } escaped }) .collect() } - pub fn apply_chat_template( + fn render_chat_template( &self, tools: &Vec, + add_generation_prompt: bool, log: bool, ) -> Result { if self.chat_template.is_none() { @@ -283,12 +343,149 @@ impl ChatTemplate { template .render(context! { messages => render_messages, - add_generation_prompt => self.add_generation_prompt, + add_generation_prompt => add_generation_prompt, bos_token => self.bos_token, eos_token => self.eos_token, enable_thinking => self.enable_thinking, tools => tools, }) + .map(|rendered| restore_assistant_reasoning_markers(&rendered)) .map_err(ApplyChatTemplateError::RenderTemplateError) } + + pub fn apply_chat_template( + &self, + tools: &Vec, + log: bool, + ) -> Result { + self.render_chat_template(tools, self.add_generation_prompt, log) + } + + pub fn generation_prompt_replay( + &self, + tools: &Vec, + rendered_prompt: &str, + ) -> Option { + if !self.add_generation_prompt { + return None; + } + + let prompt_without_generation = self.render_chat_template(tools, false, false).ok()?; + let suffix_text = rendered_prompt + .strip_prefix(&prompt_without_generation)? + .to_string(); + let suffix_text = strip_generation_assistant_header(&suffix_text).to_string(); + if suffix_text.is_empty() { + return None; + } + Some(PromptReplay { suffix_text }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const THINKING_TEMPLATE: &str = r#" +{%- for message in messages %} + {{- '<|im_start|>' + message.role + '\n' + message.content + '<|im_end|>\n' }} +{%- endfor %} +{%- if add_generation_prompt %} + {{- '<|im_start|>assistant\n' }} + {%- if enable_thinking is defined and enable_thinking is false %} + {{- '\n\n\n\n' }} + {%- else %} + {{- '\n' }} + {%- endif %} +{%- endif %} +"#; + + const HEADER_ONLY_TEMPLATE: &str = r#" +{%- for message in messages %} + {{- '<|im_start|>' + message.role + '\n' + message.content + '<|im_end|>\n' }} +{%- endfor %} +{%- if add_generation_prompt %} + {{- '<|im_start|>assistant\n' }} +{%- endif %} +"#; + + const ALT_ASSISTANT_HEADER_TEMPLATE: &str = r#" +{%- for message in messages %} + {{- '' + message.role + '\n' + message.content + '\n' }} +{%- endfor %} +{%- if add_generation_prompt %} + {{- 'assistant\n\n' }} +{%- endif %} +"#; + + fn build_template(source: &str, enable_thinking: bool) -> ChatTemplate { + ChatTemplate::new( + None, + Some(source.to_string()), + None, + None, + None, + true, + enable_thinking, + ) + } + + #[test] + fn generation_prompt_replay_extracts_thinking_suffix() { + let template = build_template(THINKING_TEMPLATE, true); + let rendered = template.apply_chat_template(&Vec::new(), false).unwrap(); + let replay = template + .generation_prompt_replay(&Vec::new(), &rendered) + .unwrap(); + assert_eq!(replay.suffix_text, "\n"); + } + + #[test] + fn generation_prompt_replay_extracts_disabled_thinking_suffix() { + let template = build_template(THINKING_TEMPLATE, false); + let rendered = template.apply_chat_template(&Vec::new(), false).unwrap(); + let replay = template + .generation_prompt_replay(&Vec::new(), &rendered) + .unwrap(); + assert_eq!(replay.suffix_text, "\n\n\n\n"); + } + + #[test] + fn generation_prompt_replay_extracts_header_only_suffix() { + let template = build_template(HEADER_ONLY_TEMPLATE, true); + let rendered = template.apply_chat_template(&Vec::new(), false).unwrap(); + assert!(template + .generation_prompt_replay(&Vec::new(), &rendered) + .is_none()); + } + + #[test] + fn generation_prompt_replay_strips_non_qwen_assistant_header() { + let template = build_template(ALT_ASSISTANT_HEADER_TEMPLATE, true); + let rendered = template.apply_chat_template(&Vec::new(), false).unwrap(); + let replay = template + .generation_prompt_replay(&Vec::new(), &rendered) + .unwrap(); + assert_eq!(replay.suffix_text, "\n"); + } + + #[test] + fn strip_generation_assistant_header_only_strips_leading_header_line() { + let suffix = "<|im_start|>assistant\n\nassistant\n"; + assert_eq!( + strip_generation_assistant_header(suffix), + "\nassistant\n" + ); + } + + #[test] + fn protect_and_restore_reasoning_markers_round_trip() { + let original = + "\na\n\n<|think|>b<|/think|>\n[THINK]c[/THINK]\nd"; + let protected = protect_assistant_reasoning_markers(original); + assert!(!protected.contains("")); + assert!(!protected.contains("")); + let restored = restore_assistant_reasoning_markers(&protected); + assert_eq!(restored, original); + } } From 693f95c0b686ac3fa681337dd7f4d78abba9600c Mon Sep 17 00:00:00 2001 From: Guoqing Bao Date: Sun, 29 Mar 2026 06:05:35 +0000 Subject: [PATCH 2/4] Revise capture stride --- docs/prefix-cache.md | 4 ++-- src/utils/env.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/prefix-cache.md b/docs/prefix-cache.md index ee81d42a..0d908f31 100644 --- a/docs/prefix-cache.md +++ b/docs/prefix-cache.md @@ -28,8 +28,8 @@ For hybrid Mamba models (for example Qwen3.5), prefix reuse also needs a compatible Mamba snapshot at the matched boundary. Use environment variable `VLLM_RS_MAMBA_SNAPSHOT_STRIDE_BLOCKS` to control -sparse snapshot capture during decode: -- Default: `8` blocks +sparse snapshot capture during decode (larger stride side usefull for limited GPU memory): +- Default: `1` blocks - Minimum valid value: `1` (capture every block) - Effective snapshot boundary in tokens: `block_size * stride` diff --git a/src/utils/env.rs b/src/utils/env.rs index 7c51508b..ac2c3ef4 100644 --- a/src/utils/env.rs +++ b/src/utils/env.rs @@ -1,7 +1,7 @@ use std::env; pub const MAMBA_SNAPSHOT_BLOCK_STRIDE_ENV: &str = "VLLM_RS_MAMBA_SNAPSHOT_STRIDE_BLOCKS"; -pub const DEFAULT_MAMBA_SNAPSHOT_BLOCK_STRIDE: usize = 8; +pub const DEFAULT_MAMBA_SNAPSHOT_BLOCK_STRIDE: usize = 1; pub fn mamba_snapshot_block_stride_blocks() -> usize { let default = DEFAULT_MAMBA_SNAPSHOT_BLOCK_STRIDE; From dc6611e458af90f57a97841d876434bee495e8d4 Mon Sep 17 00:00:00 2001 From: Guoqing Bao Date: Sun, 29 Mar 2026 06:17:46 +0000 Subject: [PATCH 3/4] Bump version to v0.9.16 --- Cargo.toml | 2 +- src/core/engine.rs | 7 +++---- src/utils/chat_template.rs | 5 +++++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eea55541..5ef378e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "vllm-rs" -version = "0.9.15" +version = "0.9.16" edition = "2021" default-run = "vllm-rs" description = "A minimal, high-performance large language model (LLM) inference engine implementing vLLM in Rust." diff --git a/src/core/engine.rs b/src/core/engine.rs index d3f37554..1617926b 100644 --- a/src/core/engine.rs +++ b/src/core/engine.rs @@ -18,7 +18,9 @@ use crate::server::{EmbeddingStrategy, UsageResponse}; use crate::tools::Tool; use crate::transfer::PdRole; use crate::transfer::Transfer; -use crate::utils::chat_template::{Message, PromptReplay}; +use crate::utils::chat_template::{ + Message, PromptReplay, REASONING_END_MARKERS, REASONING_START_MARKERS, +}; use crate::utils::config::{EngineConfig, EosTokenId, ModelType, SamplingParams}; use crate::utils::guidance::{build_llg_factory, extract_guidance_tokens, GuidanceTokens}; use crate::utils::heartbeat::heartbeat_worker; @@ -58,9 +60,6 @@ pub static GLOBAL_RT: Lazy = Lazy::new(|| { .expect("Failed to build global Tokio runtime") }); -const REASONING_START_MARKERS: &[&str] = &["", "<|think|>", "[THINK]", ""]; -const REASONING_END_MARKERS: &[&str] = &["", "<|/think|>", "[/THINK]", ""]; - #[derive(Debug, Clone)] pub enum StreamItem { Token(String, u32), //streaming: (text, token_id) diff --git a/src/utils/chat_template.rs b/src/utils/chat_template.rs index f1bbf8ef..30351480 100644 --- a/src/utils/chat_template.rs +++ b/src/utils/chat_template.rs @@ -4,6 +4,11 @@ use minijinja::{context, Environment}; use pyo3::pyclass; use tokenizers::Tokenizer; +pub(crate) const REASONING_START_MARKERS: &[&str] = + &["", "<|think|>", "[THINK]", ""]; +pub(crate) const REASONING_END_MARKERS: &[&str] = + &["", "<|/think|>", "[/THINK]", ""]; + #[cfg(feature = "python")] #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] #[pyclass] From c5511e6598f1fc311a906041726a722aea7d7173 Mon Sep 17 00:00:00 2001 From: Guoqing Bao Date: Sun, 29 Mar 2026 08:14:32 +0000 Subject: [PATCH 4/4] Fix claude server path & update ReadMe --- ReadMe-CN.md | 2 +- ReadMe.md | 2 +- docs/opencode.md | 23 +- src/server/claude_server.rs | 1090 ++++++++++++++++++++++++++++++++--- src/server/parser.rs | 38 ++ 5 files changed, 1049 insertions(+), 106 deletions(-) diff --git a/ReadMe-CN.md b/ReadMe-CN.md index fc9a934e..798bdd5c 100644 --- a/ReadMe-CN.md +++ b/ReadMe-CN.md @@ -85,8 +85,8 @@ - [工具调用解析](docs/tool_parsing.md) - [MCP集成与工具调用](docs/mcp_tool_calling.md) - [结构化输出文档](docs/llguidance-integration.md) -- [Claude Code使用vLLM.rs后端](docs/claude_code.md) - [OpenCode使用vLLM.rs后端](docs/open_code.md) +- [Claude Code使用vLLM.rs后端](docs/claude_code.md) - [Goose AI Agent使用vLLM.rs后端](docs/goose.md) - [Embedding](docs/embeddings.md) - [多模态 (Qwen3-VL, Gemma3, Mistral3-VL)](docs/multimodal.md) diff --git a/ReadMe.md b/ReadMe.md index 0441dcce..cb1b3c8b 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -86,8 +86,8 @@ All models support hardware FP8 KV-cache acceleration (requires SM90+ and disabl - [Tool Parsing](docs/tool_parsing.md) - [MCP Integration and Tool Calling](docs/mcp_tool_calling.md) - [Structured Outputs](docs/llguidance-integration.md) -- [Work with Claude Code](docs/claude_code.md) - [Work with OpenCode](docs/opencode.md) +- [Work with Claude Code](docs/claude_code.md) - [Embedding](docs/embeddings.md) - [Multimodal (Qwen3-VL, Gemma3, Mistral3-VL)](docs/multimodal.md) - [Prefix cache](docs/prefix-cache.md) diff --git a/docs/opencode.md b/docs/opencode.md index 37bcce15..1ef525dc 100644 --- a/docs/opencode.md +++ b/docs/opencode.md @@ -12,7 +12,7 @@ OpenCode -> vLLM.rs (OpenAI-compatible) ```bash # Rust # Replace `flashinfer` with `flashattn` to use Flash attention backend -./run.sh --features cuda,nccl,graph,flashinfer,cutlass --release --m Qwen/Qwen3-Coder-Next-FP8 --server --d 0,1 --prefix-cache +./run.sh --features cuda,nccl,graph,flashinfer,cutlass --release --m Qwen/Qwen3.5-35B-A3B-FP8 --server --d 0 --prefix-cache # Different model ./run.sh --features cuda,nccl,graph,flashinfer,cutlass --release --m Qwen/Qwen3.5-27B-FP8 --d 0 --server --prefix-cache @@ -23,15 +23,11 @@ python3 -m vllm_rs.server --m Qwen/Qwen3-Coder-Next-FP8 --d 0,1 --prefix-cache ## 2) Configure OpenCode -Install opencode +Install opencode (CLI) ```shell curl -fsSL https://opencode.ai/install | bash -``` - -Or install with npm - -```shell +# Or install with npm npm i -g opencode-ai ``` @@ -59,14 +55,25 @@ Export config into `~/.config/opencode/config.json` } ``` +Install Desktop OpenCode (optional) + +```shell +visit https://opencode.ai/download +``` + +Connect to provider -> custom -> base URL (http://localhost:8000/v1) -> Empty key + + ## 3) Run OpenCode -run opencode +run opencode (CLI) ```shell opencode ``` +Or, run OpenCode desktop (choose configured custom provider) + ### Trouble shooting 1. Use the chat logger to monitor detailed interactions between OpenCode and vLLM.rs. diff --git a/src/server/claude_server.rs b/src/server/claude_server.rs index bbcff04a..77222819 100644 --- a/src/server/claude_server.rs +++ b/src/server/claude_server.rs @@ -19,6 +19,7 @@ use axum::{ IntoResponse, Sse, }, }; +use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _}; use flume::{Receiver, TrySendError}; use futures::Stream; use once_cell::sync::Lazy; @@ -53,6 +54,14 @@ static ANTHROPIC_BILLING_CCH_RE: Lazy = Lazy::new(|| { .expect("valid anthropic billing cch regex") }); +const CLAUDE_REASONING_MARKERS: &[(&str, &str)] = &[ + ("", ""), + ("<|think|>", "<|/think|>"), + ("[THINK]", "[/THINK]"), + ("", ""), +]; +const SYNTHETIC_THINKING_SIGNATURE_PREFIX: &str = "vllm-rs-thinking-v1:"; + #[derive(Debug, Clone, Deserialize, Serialize)] #[serde(untagged)] pub enum ClaudeContent { @@ -67,6 +76,14 @@ pub enum ClaudeContentBlock { Text { text: String }, #[serde(rename = "image")] Image { source: ClaudeImageSource }, + #[serde(rename = "thinking")] + Thinking { + thinking: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + signature: Option, + }, + #[serde(rename = "redacted_thinking")] + RedactedThinking { data: String }, #[serde(rename = "tool_use")] ToolUse { id: String, @@ -213,6 +230,14 @@ pub struct ClaudeMessageResponse { pub enum ClaudeContentBlockOut { #[serde(rename = "text")] Text { text: String }, + #[serde(rename = "thinking")] + Thinking { + thinking: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + signature: Option, + }, + #[serde(rename = "redacted_thinking")] + RedactedThinking { data: String }, #[serde(rename = "tool_use")] ToolUse { id: String, @@ -255,6 +280,10 @@ pub struct ClaudeContentBlockDeltaEvent { pub enum ClaudeContentDelta { #[serde(rename = "text_delta")] TextDelta { text: String }, + #[serde(rename = "thinking_delta")] + ThinkingDelta { thinking: String }, + #[serde(rename = "signature_delta")] + SignatureDelta { signature: String }, #[serde(rename = "input_json_delta")] InputJsonDelta { #[serde(rename = "partial_json")] @@ -262,6 +291,24 @@ pub enum ClaudeContentDelta { }, } +#[derive(Debug, Clone, Deserialize, Serialize)] +struct SyntheticThinkingSignature { + version: u8, + suffix_placeholder: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct ClaudeThinkingBlock { + thinking: String, + signature: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct ParsedClaudeAssistantOutput { + thinking_blocks: Vec, + text: String, +} + #[derive(Debug, Serialize)] pub struct ClaudeContentBlockStopEvent { #[serde(rename = "type")] @@ -439,6 +486,149 @@ fn tool_choice_to_openai(choice: &Option) -> Option String { + let payload = SyntheticThinkingSignature { + version: 1, + suffix_placeholder: suffix_newlines_to_spaces(suffix_placeholder), + }; + let json = serde_json::to_vec(&payload).unwrap_or_default(); + format!( + "{}{}", + SYNTHETIC_THINKING_SIGNATURE_PREFIX, + URL_SAFE_NO_PAD.encode(json) + ) +} + +fn decode_synthetic_thinking_signature(signature: &str) -> Option { + let encoded = signature.strip_prefix(SYNTHETIC_THINKING_SIGNATURE_PREFIX)?; + let bytes = URL_SAFE_NO_PAD.decode(encoded).ok()?; + serde_json::from_slice(&bytes).ok() +} + +fn replay_text_for_thinking_block(thinking: &str, signature: Option<&str>) -> String { + let thinking = normalize_suffix_thinking_text(thinking); + let suffix_placeholder = signature + .and_then(decode_synthetic_thinking_signature) + .map(|payload| payload.suffix_placeholder) + .unwrap_or_else(|| " ".to_string()); + format!(" {}{}", thinking, suffix_placeholder) +} + +fn suffix_newlines_to_spaces(text: &str) -> String { + text.chars() + .map(|ch| if ch == '\n' || ch == '\r' { ' ' } else { ch }) + .collect() +} + +fn normalize_suffix_thinking_text(thinking: &str) -> String { + if thinking.trim().is_empty() { + suffix_newlines_to_spaces(thinking) + } else { + thinking.to_string() + } +} + +fn find_reasoning_start(text: &str) -> Option<(usize, &'static str, &'static str)> { + CLAUDE_REASONING_MARKERS + .iter() + .filter_map(|(start, end)| text.find(start).map(|idx| (idx, *start, *end))) + .min_by_key(|(idx, _, _)| *idx) +} + +fn find_reasoning_end(text: &str) -> Option<(usize, &'static str)> { + CLAUDE_REASONING_MARKERS + .iter() + .filter_map(|(_, end)| text.find(end).map(|idx| (idx, *end))) + .min_by_key(|(idx, _)| *idx) +} + +fn append_reasoning_segment( + rendered: &mut String, + thinking_blocks: &mut Vec, + thinking: &str, + trailing_ws: &str, +) { + let suffix_placeholder = format!(" {}", trailing_ws); + let normalized_thinking = normalize_suffix_thinking_text(thinking); + let signature = encode_synthetic_thinking_signature(&suffix_placeholder); + if normalized_thinking.trim().is_empty() { + rendered.push_str(&replay_text_for_thinking_block( + &normalized_thinking, + Some(signature.as_str()), + )); + } else { + thinking_blocks.push(ClaudeThinkingBlock { + thinking: normalized_thinking, + signature, + }); + } +} + +fn parse_claude_assistant_output(text: &str) -> ParsedClaudeAssistantOutput { + let mut remaining = text; + let mut rendered = String::new(); + let mut thinking_blocks = Vec::new(); + + while !remaining.is_empty() { + let next_start = find_reasoning_start(remaining); + let next_end = find_reasoning_end(remaining); + if let Some((end_idx, end_marker)) = next_end { + let start_before_end = next_start + .as_ref() + .is_some_and(|(start_idx, _, _)| *start_idx < end_idx); + if !start_before_end { + let after_end = &remaining[end_idx + end_marker.len()..]; + let trailing_ws_len = after_end + .char_indices() + .take_while(|(_, ch)| ch.is_whitespace()) + .map(|(idx, ch)| idx + ch.len_utf8()) + .last() + .unwrap_or(0); + append_reasoning_segment( + &mut rendered, + &mut thinking_blocks, + &remaining[..end_idx], + &after_end[..trailing_ws_len], + ); + remaining = &after_end[trailing_ws_len..]; + continue; + } + } + + let Some((start_idx, start_marker, end_marker)) = next_start else { + rendered.push_str(remaining); + break; + }; + + rendered.push_str(&remaining[..start_idx]); + let after_start = &remaining[start_idx + start_marker.len()..]; + let Some(end_idx) = after_start.find(end_marker) else { + rendered.push_str(&remaining[start_idx..]); + break; + }; + + let after_end = &after_start[end_idx + end_marker.len()..]; + let trailing_ws_len = after_end + .char_indices() + .take_while(|(_, ch)| ch.is_whitespace()) + .map(|(idx, ch)| idx + ch.len_utf8()) + .last() + .unwrap_or(0); + append_reasoning_segment( + &mut rendered, + &mut thinking_blocks, + &after_start[..end_idx], + &after_end[..trailing_ws_len], + ); + remaining = &after_end[trailing_ws_len..]; + } + + ParsedClaudeAssistantOutput { + thinking_blocks, + text: rendered, + } +} + fn normalize_claude_volatile_text(text: &mut String) -> usize { let mut occurrences = CLAUDE_SETTINGS_UUID_RE.find_iter(text.as_str()).count(); if occurrences > 0 { @@ -480,6 +670,10 @@ fn normalize_content_blocks(blocks: &mut [ClaudeContentBlock]) -> usize { for block in blocks { normalized += match block { ClaudeContentBlock::Text { text } => normalize_claude_volatile_text(text), + ClaudeContentBlock::Thinking { thinking, .. } => { + normalize_claude_volatile_text(thinking) + } + ClaudeContentBlock::RedactedThinking { data } => normalize_claude_volatile_text(data), ClaudeContentBlock::ToolUse { input, .. } => normalize_json_strings(input), ClaudeContentBlock::ToolResult { content, .. } => { normalize_tool_result_content(content) @@ -571,6 +765,12 @@ fn blocks_to_message_content( items.push(MessageContent::Text { text: text.clone() }); } } + ClaudeContentBlock::Thinking { .. } => { + return Err("thinking blocks are not valid in plain content".to_string()) + } + ClaudeContentBlock::RedactedThinking { .. } => { + return Err("redacted_thinking blocks are not valid in plain content".to_string()) + } ClaudeContentBlock::Image { source } => { if !allow_images { return Err("image blocks are not supported here".to_string()); @@ -611,6 +811,16 @@ fn build_message_content_type(items: Vec) -> Option, text: String) { + if text.is_empty() { + return; + } + match items.last_mut() { + Some(MessageContent::Text { text: existing }) => existing.push_str(&text), + _ => items.push(MessageContent::Text { text }), + } +} + fn tool_result_content_to_text(content: &ClaudeToolResultContent) -> Result { match content { ClaudeToolResultContent::Text(text) => Ok(text.clone()), @@ -624,6 +834,12 @@ fn tool_result_content_to_text(content: &ClaudeToolResultContent) -> Result { + if !combined.is_empty() { + combined.push(' '); + } + combined.push_str(thinking); + } _ => { return Err( "only text blocks are supported inside tool_result content".to_string() @@ -821,7 +1037,7 @@ fn convert_claude_message(message: &ClaudeMessage) -> Result, S match &message.content { ClaudeContent::Text(text) => { - if text.trim().is_empty() { + if text.is_empty() { return Ok(Vec::new()); } return Ok(vec![ChatMessage::text(role, text.clone())]); @@ -837,8 +1053,32 @@ fn convert_claude_message(message: &ClaudeMessage) -> Result, S if !tool_calls.is_empty() { flush_tool_call_message(&mut out, &mut tool_calls); } - if !text.trim().is_empty() { - content_items.push(MessageContent::Text { text: text.clone() }); + if !text.is_empty() { + push_text_content(&mut content_items, text.clone()); + } + } + ClaudeContentBlock::Thinking { + thinking, + signature, + } => { + if role != "assistant" { + return Err("thinking blocks must be in assistant messages".to_string()); + } + if !tool_calls.is_empty() { + flush_tool_call_message(&mut out, &mut tool_calls); + } + push_text_content( + &mut content_items, + replay_text_for_thinking_block(thinking, signature.as_deref()), + ); + } + ClaudeContentBlock::RedactedThinking { .. } => { + if role != "assistant" { + return Err("redacted_thinking blocks must be in assistant messages" + .to_string()); + } + if !tool_calls.is_empty() { + flush_tool_call_message(&mut out, &mut tool_calls); } } ClaudeContentBlock::Image { source } => { @@ -1038,6 +1278,412 @@ fn send_tool_use_block( Ok(()) } +fn longest_partial_marker_suffix(text: &str, markers: &[&str]) -> usize { + let mut longest = 0usize; + for marker in markers { + let max_len = marker.len().saturating_sub(1).min(text.len()); + for len in 1..=max_len { + if text.ends_with(&marker[..len]) { + longest = longest.max(len); + } + } + } + longest +} + +#[derive(Debug)] +enum ClaudeThinkingStreamMode { + Text, + ThinkingCandidate { + end_marker: &'static str, + buffered: String, + }, + Thinking { + index: usize, + end_marker: &'static str, + }, + PendingThinkingClose { + index: usize, + trailing_ws: String, + }, + PendingEmptyThinking { + placeholder: String, + }, +} + +#[derive(Debug)] +struct ClaudeThinkingStreamEmitter { + next_block_index: usize, + open_text_block: Option, + parse_buffer: String, + mode: ClaudeThinkingStreamMode, +} + +impl ClaudeThinkingStreamEmitter { + fn new() -> Self { + Self { + next_block_index: 0, + open_text_block: None, + parse_buffer: String::new(), + mode: ClaudeThinkingStreamMode::Text, + } + } + + fn open_text_block_index(&self) -> Option { + self.open_text_block + } + + fn ensure_text_block( + &mut self, + stream_ctx: &ClaudeStreamingContext, + ) -> Result { + if let Some(index) = self.open_text_block { + return Ok(index); + } + let index = self.next_block_index; + let start_block = ClaudeContentBlockStartEvent { + event_type: "content_block_start", + index, + content_block: ClaudeContentBlockOut::Text { + text: String::new(), + }, + }; + stream_ctx.send_json_event("content_block_start", &start_block)?; + self.open_text_block = Some(index); + self.next_block_index += 1; + Ok(index) + } + + fn emit_text( + &mut self, + stream_ctx: &ClaudeStreamingContext, + logger: Option<&ChatCompletionLogger>, + text: &str, + ) -> Result<(), StreamSendError> { + if text.is_empty() { + return Ok(()); + } + if let Some(logger) = logger { + logger.log_stream_token(text); + } + let index = self.ensure_text_block(stream_ctx)?; + send_text_delta(stream_ctx, index, text) + } + + fn close_text_block( + &mut self, + stream_ctx: &ClaudeStreamingContext, + ) -> Result<(), StreamSendError> { + let Some(index) = self.open_text_block.take() else { + return Ok(()); + }; + let stop_event = ClaudeContentBlockStopEvent { + event_type: "content_block_stop", + index, + }; + stream_ctx.send_json_event("content_block_stop", &stop_event) + } + + fn start_thinking_candidate(&mut self, end_marker: &'static str) { + self.mode = ClaudeThinkingStreamMode::ThinkingCandidate { + end_marker, + buffered: String::new(), + }; + } + + fn start_thinking_block( + &mut self, + stream_ctx: &ClaudeStreamingContext, + ) -> Result { + self.close_text_block(stream_ctx)?; + let index = self.next_block_index; + let start_block = ClaudeContentBlockStartEvent { + event_type: "content_block_start", + index, + content_block: ClaudeContentBlockOut::Thinking { + thinking: String::new(), + signature: None, + }, + }; + stream_ctx.send_json_event("content_block_start", &start_block)?; + self.next_block_index += 1; + Ok(index) + } + + fn emit_thinking_delta( + stream_ctx: &ClaudeStreamingContext, + index: usize, + text: &str, + ) -> Result<(), StreamSendError> { + if text.is_empty() { + return Ok(()); + } + let delta = ClaudeContentBlockDeltaEvent { + event_type: "content_block_delta", + index, + delta: ClaudeContentDelta::ThinkingDelta { + thinking: text.to_string(), + }, + }; + stream_ctx.send_json_event("content_block_delta", &delta) + } + + fn close_thinking_block( + &mut self, + stream_ctx: &ClaudeStreamingContext, + index: usize, + suffix_placeholder: &str, + ) -> Result<(), StreamSendError> { + let signature = encode_synthetic_thinking_signature(suffix_placeholder); + let delta = ClaudeContentBlockDeltaEvent { + event_type: "content_block_delta", + index, + delta: ClaudeContentDelta::SignatureDelta { signature }, + }; + stream_ctx.send_json_event("content_block_delta", &delta)?; + let stop = ClaudeContentBlockStopEvent { + event_type: "content_block_stop", + index, + }; + stream_ctx.send_json_event("content_block_stop", &stop)?; + self.mode = ClaudeThinkingStreamMode::Text; + Ok(()) + } + + fn push_chunk( + &mut self, + stream_ctx: &ClaudeStreamingContext, + logger: Option<&ChatCompletionLogger>, + text: &str, + ) -> Result<(), StreamSendError> { + if text.is_empty() { + return Ok(()); + } + self.parse_buffer.push_str(text); + self.drain(stream_ctx, logger, false) + } + + fn finish( + &mut self, + stream_ctx: &ClaudeStreamingContext, + logger: Option<&ChatCompletionLogger>, + ) -> Result { + self.drain(stream_ctx, logger, true)?; + self.close_text_block(stream_ctx)?; + Ok(self.next_block_index) + } + + fn drain( + &mut self, + stream_ctx: &ClaudeStreamingContext, + logger: Option<&ChatCompletionLogger>, + finalize: bool, + ) -> Result<(), StreamSendError> { + loop { + let mode = std::mem::replace(&mut self.mode, ClaudeThinkingStreamMode::Text); + match mode { + ClaudeThinkingStreamMode::Text => { + let next_start = find_reasoning_start(&self.parse_buffer); + let next_end = find_reasoning_end(&self.parse_buffer); + let Some((start_idx, start_marker, end_marker)) = next_start else { + if let Some((end_idx, end_marker)) = next_end { + let buffered = self.parse_buffer[..end_idx].to_string(); + self.parse_buffer.drain(..end_idx); + self.mode = ClaudeThinkingStreamMode::ThinkingCandidate { + end_marker, + buffered, + }; + continue; + } + let reserve = if finalize { + 0 + } else { + let markers: Vec<&str> = CLAUDE_REASONING_MARKERS + .iter() + .map(|(start, _)| *start) + .collect(); + longest_partial_marker_suffix(&self.parse_buffer, &markers) + }; + let emit_len = self.parse_buffer.len().saturating_sub(reserve); + if emit_len == 0 { + self.mode = ClaudeThinkingStreamMode::Text; + return Ok(()); + } + let emit_text = self.parse_buffer[..emit_len].to_string(); + self.parse_buffer.drain(..emit_len); + self.emit_text(stream_ctx, logger, &emit_text)?; + self.mode = ClaudeThinkingStreamMode::Text; + continue; + }; + + if let Some((end_idx, end_marker_only)) = next_end { + if end_idx < start_idx { + let buffered = self.parse_buffer[..end_idx].to_string(); + self.parse_buffer.drain(..end_idx); + self.mode = ClaudeThinkingStreamMode::ThinkingCandidate { + end_marker: end_marker_only, + buffered, + }; + continue; + } + } + + if start_idx > 0 { + let emit_text = self.parse_buffer[..start_idx].to_string(); + self.parse_buffer.drain(..start_idx); + self.emit_text(stream_ctx, logger, &emit_text)?; + self.mode = ClaudeThinkingStreamMode::Text; + continue; + } + + if self.parse_buffer.len() < start_marker.len() { + self.mode = ClaudeThinkingStreamMode::Text; + return Ok(()); + } + + self.parse_buffer.drain(..start_marker.len()); + self.start_thinking_candidate(end_marker); + } + ClaudeThinkingStreamMode::ThinkingCandidate { + end_marker, + mut buffered, + } => { + if let Some(end_idx) = self.parse_buffer.find(end_marker) { + let delta_text = self.parse_buffer[..end_idx].to_string(); + self.parse_buffer.drain(..end_idx + end_marker.len()); + buffered.push_str(&delta_text); + self.mode = ClaudeThinkingStreamMode::PendingEmptyThinking { + placeholder: format!(" {}", normalize_suffix_thinking_text(&buffered)), + }; + continue; + } + + let reserve = if finalize { + 0 + } else { + longest_partial_marker_suffix(&self.parse_buffer, &[end_marker]) + }; + let emit_len = self.parse_buffer.len().saturating_sub(reserve); + if emit_len == 0 { + if finalize { + let remaining = std::mem::take(&mut self.parse_buffer); + buffered.push_str(&remaining); + if buffered.trim().is_empty() { + let placeholder = + format!(" {}", normalize_suffix_thinking_text(&buffered)); + self.emit_text(stream_ctx, logger, &placeholder)?; + self.mode = ClaudeThinkingStreamMode::Text; + } else { + let index = self.start_thinking_block(stream_ctx)?; + Self::emit_thinking_delta(stream_ctx, index, &buffered)?; + self.close_thinking_block(stream_ctx, index, " ")?; + } + continue; + } + self.mode = ClaudeThinkingStreamMode::ThinkingCandidate { + end_marker, + buffered, + }; + return Ok(()); + } + let delta_text = self.parse_buffer[..emit_len].to_string(); + self.parse_buffer.drain(..emit_len); + buffered.push_str(&delta_text); + if !buffered.trim().is_empty() { + let index = self.start_thinking_block(stream_ctx)?; + Self::emit_thinking_delta(stream_ctx, index, &buffered)?; + self.mode = ClaudeThinkingStreamMode::Thinking { index, end_marker }; + } else { + self.mode = ClaudeThinkingStreamMode::ThinkingCandidate { + end_marker, + buffered, + }; + } + } + ClaudeThinkingStreamMode::Thinking { index, end_marker } => { + if let Some(end_idx) = self.parse_buffer.find(end_marker) { + let delta_text = self.parse_buffer[..end_idx].to_string(); + self.parse_buffer.drain(..end_idx + end_marker.len()); + Self::emit_thinking_delta(stream_ctx, index, &delta_text)?; + self.mode = ClaudeThinkingStreamMode::PendingThinkingClose { + index, + trailing_ws: String::new(), + }; + continue; + } + + let reserve = if finalize { + 0 + } else { + longest_partial_marker_suffix(&self.parse_buffer, &[end_marker]) + }; + let emit_len = self.parse_buffer.len().saturating_sub(reserve); + if emit_len == 0 { + if finalize { + let remaining = std::mem::take(&mut self.parse_buffer); + Self::emit_thinking_delta(stream_ctx, index, &remaining)?; + self.close_thinking_block(stream_ctx, index, " ")?; + continue; + } + self.mode = ClaudeThinkingStreamMode::Thinking { index, end_marker }; + return Ok(()); + } + let delta_text = self.parse_buffer[..emit_len].to_string(); + self.parse_buffer.drain(..emit_len); + Self::emit_thinking_delta(stream_ctx, index, &delta_text)?; + self.mode = ClaudeThinkingStreamMode::Thinking { index, end_marker }; + } + ClaudeThinkingStreamMode::PendingThinkingClose { + index, + mut trailing_ws, + } => { + let ws_len = self + .parse_buffer + .char_indices() + .take_while(|(_, ch)| ch.is_whitespace()) + .map(|(idx, ch)| idx + ch.len_utf8()) + .last() + .unwrap_or(0); + if ws_len > 0 { + trailing_ws.push_str(&self.parse_buffer[..ws_len]); + self.parse_buffer.drain(..ws_len); + } + if !self.parse_buffer.is_empty() || finalize { + let suffix_placeholder = format!(" {}", trailing_ws); + self.close_thinking_block(stream_ctx, index, &suffix_placeholder)?; + continue; + } + self.mode = + ClaudeThinkingStreamMode::PendingThinkingClose { index, trailing_ws }; + return Ok(()); + } + ClaudeThinkingStreamMode::PendingEmptyThinking { mut placeholder } => { + let ws_len = self + .parse_buffer + .char_indices() + .take_while(|(_, ch)| ch.is_whitespace()) + .map(|(idx, ch)| idx + ch.len_utf8()) + .last() + .unwrap_or(0); + if ws_len > 0 { + placeholder + .push_str(&suffix_newlines_to_spaces(&self.parse_buffer[..ws_len])); + self.parse_buffer.drain(..ws_len); + } + if !self.parse_buffer.is_empty() || finalize { + let text = std::mem::take(&mut placeholder); + self.emit_text(stream_ctx, logger, &text)?; + self.mode = ClaudeThinkingStreamMode::Text; + continue; + } + self.mode = ClaudeThinkingStreamMode::PendingEmptyThinking { placeholder }; + return Ok(()); + } + } + } + } +} + async fn send_json_event_with_timeout( seq_id: usize, response_tx: &flume::Sender, @@ -1274,7 +1920,7 @@ fn thinking_to_bool(thinking: &Option) -> Option { crate::log_warn!("Anthropic thinking budget_tokens provided but ignored"); } match config.mode.as_str() { - "enabled" => Some(true), + "enabled" | "adaptive" => Some(true), "disabled" => Some(false), other => { crate::log_warn!("Anthropic thinking mode '{}' not recognized", other); @@ -1336,11 +1982,13 @@ pub async fn messages( .unwrap_or(600), ); + let anthropic_thinking = thinking_to_bool(&request.thinking); + let anthropic_thinking_enabled = anthropic_thinking == Some(true); let mut params = SamplingParams::new_with_max_tokens(max_tokens); params.temperature = request.temperature; params.top_k = request.top_k.map(|v| v as isize); params.top_p = request.top_p; - params.thinking = thinking_to_bool(&request.thinking); + params.thinking = anthropic_thinking; if let Some(stop_sequences) = &request.stop_sequences { if !stop_sequences.is_empty() { params.stop_sequences = Some(stop_sequences.clone()); @@ -1416,6 +2064,7 @@ pub async fn messages( StatusCode::UNPROCESSABLE_ENTITY, ); } + let use_claude_thinking_blocks = anthropic_thinking_enabled && resolved_tools.is_empty(); let tool_schemas = Arc::new(build_tool_schema_map(&resolved_tools)); params.mcp_mode = if !resolved_tools.is_empty() { @@ -1599,6 +2248,7 @@ pub async fn messages( let text_block_index = 0usize; let mut pending_tool_calls: Vec = Vec::new(); let mut suppressed_tool_markup: String = String::new(); + let mut pending_reasoning_whitespace_placeholder = false; let mut buffering_since: Option = None; let mut buffering_cancel_requested = false; let mut buffering_warned = false; @@ -1611,6 +2261,9 @@ pub async fn messages( ); tool_parser.set_initial_reasoning_end_marker(prefilled_reasoning_end.clone()); let should_parse_tools = !stream_tools.is_empty(); + let use_claude_thinking_stream = anthropic_thinking_enabled && !should_parse_tools; + let mut thinking_stream = + use_claude_thinking_stream.then(ClaudeThinkingStreamEmitter::new); let mut current_stream = stream; 'stream: loop { @@ -1674,21 +2327,42 @@ pub async fn messages( ); continue; } - if let Some(ref l) = stream_logger { - l.log_stream_token(&text); - } - if let Err(err) = send_text_with_start( - &stream_ctx, - &mut text_block_started, - text_block_index, - &text, - ) { + let send_result = if use_claude_thinking_stream { + thinking_stream + .as_mut() + .expect("thinking stream emitter") + .push_chunk( + &stream_ctx, + stream_logger.as_ref().map(|logger| &**logger), + &text, + ) + } else { + let display_text = + crate::server::parser::apply_reasoning_placeholder_stream_logic( + &token, + &text, + &mut pending_reasoning_whitespace_placeholder, + ); + if let Some(ref l) = stream_logger { + l.log_stream_token(&display_text); + } + send_text_with_start( + &stream_ctx, + &mut text_block_started, + text_block_index, + &display_text, + ) + }; + if let Err(err) = send_result { + let text_status = thinking_stream + .as_ref() + .and_then(|emitter| emitter.open_text_block_index()); handle_stream_send_error( err, seq_id, &response_tx, - text_block_started, - text_block_index, + text_status.is_some() || text_block_started, + text_status.unwrap_or(text_block_index), total_decoded_tokens, true, ) @@ -1700,6 +2374,7 @@ pub async fn messages( } } StreamResult::Buffering => { + pending_reasoning_whitespace_placeholder = false; if buffering_since.is_none() { buffering_since = Some(Instant::now()); buffering_warned = false; @@ -1740,6 +2415,7 @@ pub async fn messages( } } StreamResult::FlushBuffer(text) => { + pending_reasoning_whitespace_placeholder = false; buffering_since = None; buffering_cancel_requested = false; buffering_warned = false; @@ -1768,27 +2444,44 @@ pub async fn messages( } let safe_text = tool_parser.sanitize_tool_markup_for_display(&text); - if safe_text != text { + if !use_claude_thinking_stream && safe_text != text { crate::log_warn!( "[Seq {}] Sanitized leaked tool markup in flushed text", seq_id ); } - if let Some(ref l) = stream_logger { - l.log_stream_token(&safe_text); - } - if let Err(err) = send_text_with_start( - &stream_ctx, - &mut text_block_started, - text_block_index, - &safe_text, - ) { + let send_result = if use_claude_thinking_stream { + thinking_stream + .as_mut() + .expect("thinking stream emitter") + .push_chunk( + &stream_ctx, + stream_logger.as_ref().map(|logger| &**logger), + &safe_text, + ) + } else { + let safe_text = + crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_text); + if let Some(ref l) = stream_logger { + l.log_stream_token(&safe_text); + } + send_text_with_start( + &stream_ctx, + &mut text_block_started, + text_block_index, + &safe_text, + ) + }; + if let Err(err) = send_result { + let text_status = thinking_stream + .as_ref() + .and_then(|emitter| emitter.open_text_block_index()); handle_stream_send_error( err, seq_id, &response_tx, - text_block_started, - text_block_index, + text_status.is_some() || text_block_started, + text_status.unwrap_or(text_block_index), total_decoded_tokens, true, ) @@ -1800,6 +2493,7 @@ pub async fn messages( } } StreamResult::ToolCalls(calls) => { + pending_reasoning_whitespace_placeholder = false; buffering_since = None; buffering_cancel_requested = false; buffering_warned = false; @@ -1807,21 +2501,36 @@ pub async fn messages( } } } else if !token.is_empty() { - if let Some(ref l) = stream_logger { - l.log_stream_token(&token); - } - if let Err(err) = send_text_with_start( - &stream_ctx, - &mut text_block_started, - text_block_index, - &token, - ) { + let send_result = if use_claude_thinking_stream { + thinking_stream + .as_mut() + .expect("thinking stream emitter") + .push_chunk( + &stream_ctx, + stream_logger.as_ref().map(|logger| &**logger), + &token, + ) + } else { + if let Some(ref l) = stream_logger { + l.log_stream_token(&token); + } + send_text_with_start( + &stream_ctx, + &mut text_block_started, + text_block_index, + &token, + ) + }; + if let Err(err) = send_result { + let text_status = thinking_stream + .as_ref() + .and_then(|emitter| emitter.open_text_block_index()); handle_stream_send_error( err, seq_id, &response_tx, - text_block_started, - text_block_index, + text_status.is_some() || text_block_started, + text_status.unwrap_or(text_block_index), total_decoded_tokens, true, ) @@ -1868,18 +2577,34 @@ pub async fn messages( } else { let safe_buffer = tool_parser .sanitize_tool_markup_for_display(&buffer); - if safe_buffer != buffer { + if !use_claude_thinking_stream + && safe_buffer != buffer + { crate::log_warn!( "[Seq {}] Sanitized leaked tool markup in partial buffer", seq_id ); } - let _ = send_text_with_start( - &stream_ctx, - &mut text_block_started, - text_block_index, - &safe_buffer, - ); + if use_claude_thinking_stream { + let _ = thinking_stream + .as_mut() + .expect("thinking stream emitter") + .push_chunk( + &stream_ctx, + stream_logger + .as_ref() + .map(|logger| &**logger), + &safe_buffer, + ); + } else { + let safe_buffer = crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_buffer); + let _ = send_text_with_start( + &stream_ctx, + &mut text_block_started, + text_block_index, + &safe_buffer, + ); + } } } } @@ -1906,15 +2631,27 @@ pub async fn messages( seq_id, safe_suppressed.len() ); - if let Some(ref l) = stream_logger { - l.log_stream_token(&safe_suppressed); + if use_claude_thinking_stream { + let _ = thinking_stream + .as_mut() + .expect("thinking stream emitter") + .push_chunk( + &stream_ctx, + stream_logger.as_ref().map(|logger| &**logger), + &safe_suppressed, + ); + } else { + let safe_suppressed = crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_suppressed); + if let Some(ref l) = stream_logger { + l.log_stream_token(&safe_suppressed); + } + let _ = send_text_with_start( + &stream_ctx, + &mut text_block_started, + text_block_index, + &safe_suppressed, + ); } - let _ = send_text_with_start( - &stream_ctx, - &mut text_block_started, - text_block_index, - &safe_suppressed, - ); } else if !pending_tool_calls.is_empty() && !suppressed_tool_markup.is_empty() { @@ -2014,21 +2751,36 @@ pub async fn messages( if !has_tool_calls { if let Some(feedback) = invalid_feedback.as_deref() { - if let Some(ref l) = stream_logger { - l.log_stream_token(feedback); - } - if let Err(err) = send_text_with_start( - &stream_ctx, - &mut text_block_started, - text_block_index, - feedback, - ) { + let send_result = if use_claude_thinking_stream { + thinking_stream + .as_mut() + .expect("thinking stream emitter") + .push_chunk( + &stream_ctx, + stream_logger.as_ref().map(|logger| &**logger), + feedback, + ) + } else { + if let Some(ref l) = stream_logger { + l.log_stream_token(feedback); + } + send_text_with_start( + &stream_ctx, + &mut text_block_started, + text_block_index, + feedback, + ) + }; + if let Err(err) = send_result { + let text_status = thinking_stream + .as_ref() + .and_then(|emitter| emitter.open_text_block_index()); handle_stream_send_error( err, seq_id, &response_tx, - text_block_started, - text_block_index, + text_status.is_some() || text_block_started, + text_status.unwrap_or(text_block_index), total_decoded_tokens, true, ) @@ -2041,33 +2793,63 @@ pub async fn messages( } } - let mut next_block_index = 0usize; - if text_block_started { - let stop_event = ClaudeContentBlockStopEvent { - event_type: "content_block_stop", - index: text_block_index, - }; - if let Err(err) = - stream_ctx.send_json_event("content_block_stop", &stop_event) + let next_block_index = if use_claude_thinking_stream { + match thinking_stream + .as_mut() + .expect("thinking stream emitter") + .finish(&stream_ctx, stream_logger.as_ref().map(|logger| &**logger)) { - handle_stream_send_error( - err, - seq_id, - &response_tx, - text_block_started, - text_block_index, - total_decoded_tokens, - true, - ) - .await; - let mut e = engine_clone.write(); - e.cancel(seq_id); - stream_finished = true; - break 'stream; + Ok(index) => index, + Err(err) => { + let text_status = thinking_stream + .as_ref() + .and_then(|emitter| emitter.open_text_block_index()); + handle_stream_send_error( + err, + seq_id, + &response_tx, + text_status.is_some(), + text_status.unwrap_or(text_block_index), + total_decoded_tokens, + true, + ) + .await; + let mut e = engine_clone.write(); + e.cancel(seq_id); + stream_finished = true; + break 'stream; + } } - text_block_started = false; - next_block_index = text_block_index + 1; - } + } else { + let mut next_block_index = 0usize; + if text_block_started { + let stop_event = ClaudeContentBlockStopEvent { + event_type: "content_block_stop", + index: text_block_index, + }; + if let Err(err) = + stream_ctx.send_json_event("content_block_stop", &stop_event) + { + handle_stream_send_error( + err, + seq_id, + &response_tx, + text_block_started, + text_block_index, + total_decoded_tokens, + true, + ) + .await; + let mut e = engine_clone.write(); + e.cancel(seq_id); + stream_finished = true; + break 'stream; + } + text_block_started = false; + next_block_index = text_block_index + 1; + } + next_block_index + }; if has_tool_calls { let tool_blocks = tool_calls_to_blocks(&tool_calls); @@ -2344,18 +3126,65 @@ pub async fn messages( } } let content = if has_tool_calls { - tool_calls_to_blocks(&valid_calls) + if use_claude_thinking_blocks { + let parsed = parse_claude_assistant_output(&output.decode_output); + let mut blocks = parsed + .thinking_blocks + .into_iter() + .map(|block| ClaudeContentBlockOut::Thinking { + thinking: block.thinking, + signature: Some(block.signature), + }) + .collect::>(); + if !parsed.text.is_empty() { + blocks.push(ClaudeContentBlockOut::Text { text: parsed.text }); + } + blocks.extend(tool_calls_to_blocks(&valid_calls)); + blocks + } else { + let safe_text = tool_parser.sanitize_tool_markup_for_display(&output.decode_output); + let safe_text = + crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_text); + let mut blocks = Vec::new(); + if !safe_text.is_empty() { + blocks.push(ClaudeContentBlockOut::Text { text: safe_text }); + } + blocks.extend(tool_calls_to_blocks(&valid_calls)); + blocks + } } else { let safe_text = if let Some(feedback) = invalid_feedback { feedback + } else if tool_parser.contains_tool_markup(&output.decode_output) { + tool_parser.sanitize_tool_markup_for_display(&output.decode_output) } else { - if tool_parser.contains_tool_markup(&output.decode_output) { - tool_parser.sanitize_tool_markup_for_display(&output.decode_output) + output.decode_output.clone() + }; + if use_claude_thinking_blocks { + let parsed = parse_claude_assistant_output(&safe_text); + let mut blocks = parsed + .thinking_blocks + .into_iter() + .map(|block| ClaudeContentBlockOut::Thinking { + thinking: block.thinking, + signature: Some(block.signature), + }) + .collect::>(); + if !parsed.text.is_empty() { + blocks.push(ClaudeContentBlockOut::Text { text: parsed.text }); + } + if blocks.is_empty() { + vec![ClaudeContentBlockOut::Text { + text: String::new(), + }] } else { - output.decode_output.clone() + blocks } - }; - vec![ClaudeContentBlockOut::Text { text: safe_text }] + } else { + let safe_text = + crate::server::parser::strip_reasoning_markers_for_tool_response(&safe_text); + vec![ClaudeContentBlockOut::Text { text: safe_text }] + } }; let response = ClaudeMessageResponse { @@ -2723,6 +3552,75 @@ mod tests { assert_eq!(enabled, Some(true)); } + #[test] + fn accepts_adaptive_thinking_config() { + let thinking = Some(ClaudeThinking::Config(ClaudeThinkingConfig { + mode: "adaptive".to_string(), + budget_tokens: None, + })); + assert_eq!(thinking_to_bool(&thinking), Some(true)); + } + + #[test] + fn converts_assistant_thinking_blocks_back_to_placeholder_text() { + let signature = encode_synthetic_thinking_signature(" \n\n"); + let message = ClaudeMessage { + role: "assistant".to_string(), + content: ClaudeContent::Blocks(vec![ + ClaudeContentBlock::Thinking { + thinking: "\nplan\n".to_string(), + signature: Some(signature), + }, + ClaudeContentBlock::Text { + text: "done".to_string(), + }, + ]), + }; + + let converted = convert_claude_message(&message).unwrap(); + let text = match converted[0].content.as_ref() { + Some(MessageContentType::Single(MessageContent::Text { text })) => text.clone(), + Some(MessageContentType::Multi(items)) => match &items[0] { + MessageContent::Text { text } => text.clone(), + _ => String::new(), + }, + _ => String::new(), + }; + assert_eq!(text, " \nplan\n done"); + } + + #[test] + fn parses_assistant_output_into_thinking_blocks_and_text() { + let parsed = parse_claude_assistant_output("\nabc\n\n\nResult text"); + assert_eq!( + parsed.thinking_blocks, + vec![ClaudeThinkingBlock { + thinking: "\nabc\n".to_string(), + signature: encode_synthetic_thinking_signature(" \n\n"), + }] + ); + assert_eq!(parsed.text, "Result text"); + let replay = replay_text_for_thinking_block( + &parsed.thinking_blocks[0].thinking, + Some(parsed.thinking_blocks[0].signature.as_str()), + ); + assert_eq!(replay, " \nabc\n "); + } + + #[test] + fn normalizes_newlines_in_empty_suffix_thinking_block() { + let parsed = parse_claude_assistant_output("\n\n\n\n"); + assert!(parsed.thinking_blocks.is_empty()); + assert_eq!(parsed.text, " "); + } + + #[test] + fn normalizes_prefilled_reasoning_close_suffix_to_placeholders() { + let parsed = parse_claude_assistant_output(" \n\n"); + assert!(parsed.thinking_blocks.is_empty()); + assert_eq!(parsed.text, " "); + } + #[test] fn converts_tools_to_openai_format() { let tool = ClaudeTool { diff --git a/src/server/parser.rs b/src/server/parser.rs index 0143a287..dc6843bb 100644 --- a/src/server/parser.rs +++ b/src/server/parser.rs @@ -388,6 +388,24 @@ pub fn token_text_is_reasoning_boundary(text: &str) -> bool { .any(|marker| trimmed == *marker) } +pub fn apply_reasoning_placeholder_stream_logic( + token: &str, + text: &str, + pending_reasoning_whitespace_placeholder: &mut bool, +) -> String { + if token_text_is_reasoning_boundary(token) { + *pending_reasoning_whitespace_placeholder = true; + " ".to_string() + } else if *pending_reasoning_whitespace_placeholder + && token.chars().all(|ch| ch.is_whitespace()) + { + " ".to_string() + } else { + *pending_reasoning_whitespace_placeholder = false; + strip_reasoning_markers_for_tool_response(text) + } +} + /// Detect whether a rendered prompt already ends inside a reasoning block. /// /// This happens for templates that prefill `` in `add_generation_prompt`. @@ -1995,6 +2013,26 @@ abc assert_eq!(strip_reasoning_markers_for_tool_response(text), " \nabc\n "); } + #[test] + fn test_apply_reasoning_placeholder_stream_logic_replaces_boundary_and_following_whitespace() { + let mut pending = false; + assert_eq!( + apply_reasoning_placeholder_stream_logic("", "", &mut pending), + " " + ); + assert!(pending); + assert_eq!( + apply_reasoning_placeholder_stream_logic("\n\n", "\n\n", &mut pending), + " " + ); + assert!(pending); + assert_eq!( + apply_reasoning_placeholder_stream_logic("hello", "hello", &mut pending), + "hello" + ); + assert!(!pending); + } + #[test] fn test_parser_defaults_to_qwen_coder_for_qwen35() { assert_eq!(