Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sgl-router/src/routers/grpc/harmony/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn get_harmony_encoding() -> &'static HarmonyEncoding {
pub struct HarmonyParserAdapter {
parser: StreamableParser,
prev_recipient: Option<String>,
reasoning_token_count: u32,
}

impl HarmonyParserAdapter {
Expand All @@ -35,6 +36,7 @@ impl HarmonyParserAdapter {
Ok(Self {
parser,
prev_recipient: None,
reasoning_token_count: 0,
})
}

Expand Down Expand Up @@ -241,11 +243,20 @@ impl HarmonyParserAdapter {
finish_reason: String,
matched_stop: Option<serde_json::Value>,
) -> Result<HarmonyChannelOutput, String> {
let mut reasoning_token_count = 0u32;

// Feed all tokens to the parser
for &token_id in output_ids {
self.parser
.process(token_id)
.map_err(|e| format!("Failed to process token {}: {}", token_id, e))?;

// Count reasoning tokens (analysis + commentary channels)
if let Some(channel) = self.parser.current_channel() {
if channel == "analysis" || channel == "commentary" {
reasoning_token_count += 1;
}
}
}

// Extract all completed messages from the parser
Expand All @@ -270,6 +281,7 @@ impl HarmonyParserAdapter {
final_text,
finish_reason: final_finish_reason,
matched_stop,
reasoning_token_count,
})
}

Expand Down Expand Up @@ -359,6 +371,13 @@ impl HarmonyParserAdapter {
.process(token_id)
.map_err(|e| format!("Failed to process token {}: {}", token_id, e))?;

// Count reasoning tokens (analysis + commentary channels)
if let Some(channel) = self.parser.current_channel() {
if channel == "analysis" || channel == "commentary" {
self.reasoning_token_count += 1;
}
}

// Check for content delta
if let Ok(Some(delta_text)) = self.parser.last_content_delta() {
has_delta = true;
Expand Down Expand Up @@ -491,6 +510,7 @@ impl HarmonyParserAdapter {
final_text,
finish_reason: final_finish_reason,
matched_stop,
reasoning_token_count: self.reasoning_token_count,
})
}

Expand All @@ -503,6 +523,7 @@ impl HarmonyParserAdapter {
self.parser = StreamableParser::new(encoding.clone(), Some(Role::Assistant))
.map_err(|e| format!("Failed to reset parser: {}", e))?;
self.prev_recipient = None;
self.reasoning_token_count = 0;
Ok(())
}
}
Expand Down
35 changes: 29 additions & 6 deletions sgl-router/src/routers/grpc/harmony/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use crate::{
grpc_client::sglang_proto::generate_complete::MatchedStop::{MatchedStopStr, MatchedTokenId},
protocols::{
chat::{ChatChoice, ChatCompletionMessage, ChatCompletionRequest, ChatCompletionResponse},
common::{ToolCall, Usage},
common::{CompletionTokensDetails, ToolCall, Usage},
responses::{
ResponseContentPart, ResponseOutputItem, ResponseReasoningContent, ResponseStatus,
ResponseUsage, ResponsesRequest, ResponsesResponse, ResponsesUsage,
OutputTokensDetails, ResponseContentPart, ResponseOutputItem, ResponseReasoningContent,
ResponseStatus, ResponseUsage, ResponsesRequest, ResponsesResponse, ResponsesUsage,
},
},
routers::grpc::{
Expand Down Expand Up @@ -50,6 +50,8 @@ impl HarmonyResponseProcessor {

// Build choices by parsing output with HarmonyParserAdapter
let mut choices: Vec<ChatChoice> = Vec::new();
let mut total_reasoning_tokens = 0u32;

for (index, complete) in all_responses.iter().enumerate() {
// Convert matched_stop from proto to JSON
let matched_stop = complete.matched_stop().map(|m| match m {
Expand Down Expand Up @@ -97,6 +99,9 @@ impl HarmonyResponseProcessor {

let finish_reason = parsed.finish_reason;

// Accumulate reasoning tokens across all responses
total_reasoning_tokens += parsed.reasoning_token_count;

choices.push(ChatChoice {
index: index as u32,
message,
Expand All @@ -108,7 +113,14 @@ impl HarmonyResponseProcessor {
}

// Build usage from proto fields
let usage = response_formatting::build_usage(&all_responses);
let mut usage = response_formatting::build_usage(&all_responses);

// Add reasoning token count from parsed analysis/commentary channels
if total_reasoning_tokens > 0 {
usage.completion_tokens_details = Some(CompletionTokensDetails {
reasoning_tokens: Some(total_reasoning_tokens),
});
}

// Final ChatCompletionResponse
Ok(
Expand Down Expand Up @@ -233,7 +245,14 @@ impl HarmonyResponseProcessor {
}

// Build usage (needed for both ToolCallsFound and Completed)
let usage = response_formatting::build_usage(std::slice::from_ref(complete));
let mut usage = response_formatting::build_usage(std::slice::from_ref(complete));

// Add reasoning token count from parsed analysis/commentary channels
if parsed.reasoning_token_count > 0 {
usage.completion_tokens_details = Some(CompletionTokensDetails {
reasoning_tokens: Some(parsed.reasoning_token_count),
});
}

// Check for tool calls in commentary channel
if let Some(tool_calls) = parsed.commentary {
Expand Down Expand Up @@ -288,7 +307,11 @@ impl HarmonyResponseProcessor {
output_tokens: usage.completion_tokens,
total_tokens: usage.total_tokens,
input_tokens_details: None,
output_tokens_details: None,
output_tokens_details: usage.completion_tokens_details.as_ref().and_then(|d| {
d.reasoning_tokens.map(|tokens| OutputTokensDetails {
reasoning_tokens: tokens,
})
}),
}))
.build();

Expand Down
14 changes: 9 additions & 5 deletions sgl-router/src/routers/grpc/harmony/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ use crate::{
protocols::{
common::{Function, ToolCall, ToolChoice, ToolChoiceValue, Usage},
responses::{
McpToolInfo, ResponseContentPart, ResponseInput, ResponseInputOutputItem,
ResponseOutputItem, ResponseReasoningContent, ResponseStatus, ResponseTool,
ResponseToolType, ResponseUsage, ResponsesRequest, ResponsesResponse, ResponsesUsage,
StringOrContentParts,
McpToolInfo, OutputTokensDetails, ResponseContentPart, ResponseInput,
ResponseInputOutputItem, ResponseOutputItem, ResponseReasoningContent, ResponseStatus,
ResponseTool, ResponseToolType, ResponseUsage, ResponsesRequest, ResponsesResponse,
ResponsesUsage, StringOrContentParts,
},
},
routers::grpc::{
Expand Down Expand Up @@ -1134,7 +1134,11 @@ fn build_tool_response(
output_tokens: usage.completion_tokens,
total_tokens: usage.total_tokens,
input_tokens_details: None,
output_tokens_details: None,
output_tokens_details: usage.completion_tokens_details.as_ref().and_then(|d| {
d.reasoning_tokens.map(|tokens| OutputTokensDetails {
reasoning_tokens: tokens,
})
}),
}))
.build()
}
Expand Down
34 changes: 28 additions & 6 deletions sgl-router/src/routers/grpc/harmony/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use crate::{
chat::{
ChatCompletionRequest, ChatCompletionStreamResponse, ChatMessageDelta, ChatStreamChoice,
},
common::{FunctionCallDelta, ToolCall, ToolCallDelta, Usage},
responses::{ResponseStatus, ResponseUsage, ResponsesResponse, ResponsesUsage},
common::{CompletionTokensDetails, FunctionCallDelta, ToolCall, ToolCallDelta, Usage},
responses::{
OutputTokensDetails, ResponseStatus, ResponseUsage, ResponsesResponse, ResponsesUsage,
},
},
routers::grpc::{
common::responses::streaming::{OutputItemType, ResponseStreamEventEmitter},
Expand Down Expand Up @@ -669,6 +671,7 @@ impl HarmonyStreamingProcessor {
let mut matched_stop: Option<serde_json::Value> = None;
let mut prompt_tokens: u32 = 0;
let mut completion_tokens: u32 = 0;
let mut reasoning_token_count: u32 = 0;

// Process stream
let mut chunk_count = 0;
Expand Down Expand Up @@ -870,8 +873,9 @@ impl HarmonyStreamingProcessor {
.finalize(finish_reason.clone(), matched_stop.clone())
.map_err(|e| format!("Finalize error: {}", e))?;

// Store finalized tool calls
// Store finalized tool calls and reasoning token count
accumulated_tool_calls = final_output.commentary.clone();
reasoning_token_count = final_output.reasoning_token_count;

// Complete all tool calls if we have commentary
if let Some(ref tool_calls) = accumulated_tool_calls {
Expand Down Expand Up @@ -1072,7 +1076,13 @@ impl HarmonyStreamingProcessor {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
completion_tokens_details: None,
completion_tokens_details: if reasoning_token_count > 0 {
Some(CompletionTokensDetails {
reasoning_tokens: Some(reasoning_token_count),
})
} else {
None
},
},
request_id: emitter.response_id.clone(),
});
Expand All @@ -1091,15 +1101,27 @@ impl HarmonyStreamingProcessor {
output_tokens: completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
input_tokens_details: None,
output_tokens_details: None,
output_tokens_details: if reasoning_token_count > 0 {
Some(OutputTokensDetails {
reasoning_tokens: reasoning_token_count,
})
} else {
None
},
}))
.build(),
),
usage: Usage {
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
completion_tokens_details: None,
completion_tokens_details: if reasoning_token_count > 0 {
Some(CompletionTokensDetails {
reasoning_tokens: Some(reasoning_token_count),
})
} else {
None
},
},
})
}
Expand Down
3 changes: 3 additions & 0 deletions sgl-router/src/routers/grpc/harmony/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ pub struct HarmonyChannelOutput {

/// Matched stop token (if any)
pub matched_stop: Option<Value>,

/// Number of reasoning tokens (from analysis and commentary channels)
pub reasoning_token_count: u32,
}

/// Streaming delta for SSE responses
Expand Down
Loading