-
-
Notifications
You must be signed in to change notification settings - Fork 11.5k
[BugFix] Fix malformed output with streaming responses API #29044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -214,6 +214,8 @@ def _update_num_reasoning_tokens(self): | |
|
|
||
| def append_output(self, output: RequestOutput) -> None: | ||
| output_token_ids = output.outputs[0].token_ids | ||
| # Reset parser for each append_output call to handle multi-turn scenarios | ||
| # where the parser needs to start fresh for each assistant response | ||
| self.parser = get_streamable_parser_for_assistant() | ||
| for token_id in output_token_ids: | ||
| self.parser.process(token_id) | ||
|
|
@@ -519,7 +521,8 @@ def append_output(self, output: RequestOutput) -> None: | |
| # (finished=True), then the next token processed will mark the | ||
| # beginning of a new message | ||
| self.first_tok_of_message = output.finished | ||
| for tok in output.outputs[0].token_ids: | ||
| token_ids = output.outputs[0].token_ids | ||
| for tok in token_ids: | ||
| self.parser.process(tok) | ||
| self._update_decode_token_usage(output) | ||
|
|
||
|
|
@@ -529,7 +532,9 @@ def append_output(self, output: RequestOutput) -> None: | |
| self.current_turn_metrics.reset() | ||
| # Check if the current token is part of reasoning content | ||
| self._update_num_reasoning_tokens() | ||
| self.last_tok = tok | ||
| # Only update last_tok if we actually processed tokens | ||
| if token_ids: | ||
| self.last_tok = tok | ||
| if len(self._messages) - self.num_init_messages < len(self.parser.messages): | ||
| self._messages.extend( | ||
| self.parser.messages[len(self._messages) - self.num_init_messages :] | ||
|
|
@@ -547,7 +552,8 @@ def append_tool_output(self, output: list[Message]) -> None: | |
| for tok in toks: | ||
| self.parser.process(tok) | ||
| self.last_tok = toks[-1] | ||
| # TODO: add tool_output messages to self._messages | ||
| # Add tool output messages to self._messages | ||
| self._messages.extend(output) | ||
|
||
|
|
||
| def is_expecting_start(self) -> bool: | ||
| return self.parser.state == StreamState.EXPECT_START | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heheda12345 it looks like you added this line in #22512... wonder if you could check this and claude's logic when you have a chance?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually in response to the new test failure below, claude suggested to reinstate this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's not clear to me is why the non-streaming case resets the parser between turns but the streaming case uses the same parser and tries to feed the missing tokens into it to catch it back up with the expected state. On the surface, it feels like both of these should take the same approach - either resetting the parser between turns or doing the extra logic the streaming side does down in streaming's render_for_completion.
But, for the sake of trying to get this going without digging deeper into things here, I think putting this back like it was with the added comment at least makes it clear why it's doing this.