Skip to content

Commit f582d90

Browse files
ayushag-nvnv-tusharma
authored andcommitted
feat: reasoning parser transformation (#3295)
Signed-off-by: ayushag <[email protected]>
1 parent 606b7f3 commit f582d90

File tree

7 files changed

+722
-74
lines changed

7 files changed

+722
-74
lines changed

lib/llm/src/engines.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,12 @@ impl
159159
for c in prompt.chars() {
160160
// we are returning characters not tokens, so there will be some postprocessing overhead
161161
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
162-
let response = deltas.create_choice(0, Some(c.to_string()), None, None, None);
162+
let response = deltas.create_choice(0, Some(c.to_string()), None, None);
163163
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
164164
id += 1;
165165
}
166166

167-
let response = deltas.create_choice(0, None, None, Some(dynamo_async_openai::types::FinishReason::Stop), None);
167+
let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::FinishReason::Stop), None);
168168
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
169169
};
170170

lib/llm/src/preprocessor.rs

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::preprocessor::prompt::OAIChatLikeRequest;
2828
use crate::protocols::common::preprocessor::PreprocessedRequestBuilder;
2929
use crate::tokenizers::Encoding;
3030

31+
use dynamo_parsers::{ReasoningParser, ReasoningParserType};
3132
use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseStream};
3233
use dynamo_runtime::pipeline::{
3334
AsyncEngineContext, Error, ManyOut, Operator, SingleIn, async_trait,
@@ -93,6 +94,12 @@ impl LLMMetricAnnotation {
9394
}
9495
}
9596

97+
// Reasoning State for reasoning parsing transformation step
98+
struct ReasoningState {
99+
stream: Pin<Box<dyn Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send>>,
100+
reasoning_parser: Option<Box<dyn ReasoningParser>>,
101+
}
102+
96103
pub struct OpenAIPreprocessor {
97104
mdcsum: String,
98105
formatter: Arc<dyn OAIPromptFormatter>,
@@ -668,6 +675,56 @@ impl OpenAIPreprocessor {
668675
.build();
669676
jail.apply(stream)
670677
}
678+
679+
// Motivation: Each transformation on the stream should be a separate step to allow for more flexibility
680+
// Earlier reasoning parser logic was nested under delta generation logic in choice_from_postprocessor
681+
// Since we have tool calling parsing as separate step, it makes sense to have reasoning parser as separate step as well
682+
pub fn parse_reasoning_content_from_stream<S>(
683+
stream: S,
684+
parser_name: String,
685+
) -> impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send
686+
where
687+
S: Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send + 'static,
688+
{
689+
// Initialize reasoning parser from parser_name
690+
let reasoning_parser = Box::new(ReasoningParserType::get_reasoning_parser_from_name(
691+
parser_name.as_ref(),
692+
)) as Box<dyn ReasoningParser>;
693+
694+
let state = ReasoningState {
695+
stream: Box::pin(stream),
696+
reasoning_parser: Some(reasoning_parser),
697+
};
698+
699+
stream::unfold(state, |mut state| async move {
700+
if let Some(response) = state.stream.next().await {
701+
// Process the response through reasoning parser if available
702+
let processed_response = if let Some(ref mut parser) = state.reasoning_parser {
703+
response.map_data(|mut data| {
704+
// Process all choices, not just the first one
705+
for choice in data.choices.iter_mut() {
706+
if let Some(text) = choice.delta.content.as_ref() {
707+
let parser_result =
708+
parser.parse_reasoning_streaming_incremental(text, &[]);
709+
710+
// Update this specific choice with parsed content
711+
choice.delta.content = parser_result.get_some_normal_text();
712+
choice.delta.reasoning_content = parser_result.get_some_reasoning();
713+
}
714+
}
715+
Ok(data)
716+
})
717+
} else {
718+
// No reasoning parser configured, pass through unchanged
719+
response
720+
};
721+
722+
Some((processed_response, state))
723+
} else {
724+
None
725+
}
726+
})
727+
}
671728
}
672729

673730
// for pals, we do not want to add the generation prompt to the formatted prompt
@@ -715,9 +772,6 @@ impl
715772

716773
let mut response_generator = Box::new(response_generator);
717774

718-
// set the runtime configuration
719-
response_generator.set_reasoning_parser(self.runtime_config.clone());
720-
721775
// update isl
722776
response_generator.update_isl(common_request.token_ids.len() as u32);
723777

@@ -744,6 +798,25 @@ impl
744798
context.clone(),
745799
);
746800

801+
// Try to parse reasoning content only if parser is configured
802+
let should_parse_reasoning = self.runtime_config.reasoning_parser.is_some();
803+
804+
// Reasoning Content Parsing Transformation Step
805+
// Current Solution:
806+
// This step operates on Deltas created by the transform_postprocessor_stream function
807+
// Only access to text and not token_ids - so can not support parsing based on token_ids for now
808+
// Future Solution:
809+
// To address the limitation if needed in future: move this step before transform_postprocessor_stream and add new field of reasoning_content to the backend output
810+
// Use backend_output.reasoning_content field to fill out the deltas.
811+
let stream: Pin<Box<dyn Stream<Item = _> + Send>> = if should_parse_reasoning {
812+
Box::pin(Self::parse_reasoning_content_from_stream(
813+
stream,
814+
self.runtime_config.reasoning_parser.clone().unwrap(), // Safety: We already checked that parser is some, so gtg
815+
))
816+
} else {
817+
Box::pin(stream)
818+
};
819+
747820
// Check if tools are present and if we should apply jail
748821
let has_tools =
749822
request.inner.tools.is_some() && !request.inner.tools.as_ref().unwrap().is_empty();

lib/llm/src/protocols/openai/chat_completions/delta.rs

Lines changed: 2 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use crate::{
77
protocols::common::{self},
88
types::TokenIdType,
99
};
10-
use dynamo_parsers::{ParserResult, ReasoningParser, ReasoningParserType, ReasoningParserWrapper};
1110

1211
/// Provides a method for generating a [`DeltaGenerator`] from a chat completion request.
1312
impl NvCreateChatCompletionRequest {
@@ -66,11 +65,6 @@ pub struct DeltaGenerator {
6665
msg_counter: u64,
6766
/// Configuration options for response generation.
6867
options: DeltaGeneratorOptions,
69-
70-
/// Reasoning Parser object
71-
/// This is used to parse reasoning content in the response.
72-
/// None means no reasoning parsing will be performed.
73-
reasoning_parser: Option<ReasoningParserWrapper>,
7468
}
7569

7670
impl DeltaGenerator {
@@ -101,14 +95,6 @@ impl DeltaGenerator {
10195
completion_tokens_details: None,
10296
};
10397

104-
// Reasoning parser type
105-
// If no parser is specified (None), no reasoning parsing will be performed
106-
let reasoning_parser = options
107-
.runtime_config
108-
.reasoning_parser
109-
.as_deref()
110-
.map(ReasoningParserType::get_reasoning_parser_from_name);
111-
11298
let chatcmpl_id = format!("chatcmpl-{request_id}");
11399

114100
Self {
@@ -121,21 +107,6 @@ impl DeltaGenerator {
121107
usage,
122108
msg_counter: 0,
123109
options,
124-
reasoning_parser,
125-
}
126-
}
127-
128-
/// Update runtime configuration and reconfigure the reasoning parser accordingly.
129-
pub fn set_reasoning_parser(&mut self, runtime_config: ModelRuntimeConfig) {
130-
self.options.runtime_config = runtime_config.clone();
131-
match self.options.runtime_config.reasoning_parser.as_deref() {
132-
Some(name) => {
133-
self.reasoning_parser =
134-
Some(ReasoningParserType::get_reasoning_parser_from_name(name));
135-
}
136-
None => {
137-
self.reasoning_parser = None;
138-
}
139110
}
140111
}
141112

@@ -212,24 +183,6 @@ impl DeltaGenerator {
212183
})
213184
}
214185

215-
fn create_reasoning_content(
216-
&mut self,
217-
text: &Option<String>,
218-
token_ids: &[u32],
219-
) -> Option<ParserResult> {
220-
// If no reasoning parser is configured, return None
221-
let reasoning_parser = self.reasoning_parser.as_mut()?;
222-
223-
let text_ref = text.as_deref().unwrap_or("");
224-
if text_ref.is_empty() && token_ids.is_empty() {
225-
return None;
226-
}
227-
let parser_result =
228-
reasoning_parser.parse_reasoning_streaming_incremental(text_ref, token_ids);
229-
230-
Some(parser_result)
231-
}
232-
233186
/// Creates a choice within a chat completion response.
234187
///
235188
/// # Arguments
@@ -245,7 +198,6 @@ impl DeltaGenerator {
245198
&mut self,
246199
index: u32,
247200
text: Option<String>,
248-
reasoning_content: Option<String>,
249201
finish_reason: Option<dynamo_async_openai::types::FinishReason>,
250202
logprobs: Option<dynamo_async_openai::types::ChatChoiceLogprobs>,
251203
) -> NvCreateChatCompletionStreamResponse {
@@ -259,7 +211,7 @@ impl DeltaGenerator {
259211
None
260212
},
261213
refusal: None,
262-
reasoning_content,
214+
reasoning_content: None,
263215
};
264216

265217
let choice = dynamo_async_openai::types::ChatChoiceStream {
@@ -371,25 +323,9 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes
371323
None => None,
372324
};
373325

374-
// Handle reasoning parsing if enabled, otherwise treat all text as normal
375-
let (normal_text, reasoning_content) =
376-
match self.create_reasoning_content(&delta.text, &delta.token_ids) {
377-
Some(reasoning_parser_result) => (
378-
reasoning_parser_result.get_some_normal_text(),
379-
reasoning_parser_result.get_some_reasoning(),
380-
),
381-
None => (delta.text, None),
382-
};
383-
384326
// Create the streaming response.
385327
let index = 0;
386-
let stream_response = self.create_choice(
387-
index,
388-
normal_text,
389-
reasoning_content,
390-
finish_reason,
391-
logprobs,
392-
);
328+
let stream_response = self.create_choice(index, delta.text, finish_reason, logprobs);
393329

394330
Ok(stream_response)
395331
}

lib/llm/tests/http-service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl
8989
let stream = stream! {
9090
tokio::time::sleep(std::time::Duration::from_millis(max_tokens)).await;
9191
for i in 0..10 {
92-
let output = generator.create_choice(i,Some(format!("choice {i}")), None, None, None);
92+
let output = generator.create_choice(i,Some(format!("choice {i}")), None, None);
9393

9494
yield Annotated::from_data(output);
9595
}

lib/llm/tests/http_metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl
5252

5353
// Generate 5 response chunks
5454
for i in 0..5 {
55-
let output = generator.create_choice(i, Some(format!("Mock response {i}")), None, None, None);
55+
let output = generator.create_choice(i, Some(format!("Mock response {i}")), None, None);
5656
yield Annotated::from_data(output);
5757
}
5858
};

0 commit comments

Comments
 (0)