-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[CI failure] Revert "[BugFix] Fix the issue of thinker requests being preempted, causing shape mismatch." #3648
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -990,10 +990,19 @@ def _thinker_decode_to_talker_decode( | |
| """ | ||
| embed = payload.get("embed", {}) | ||
| meta = payload.get("meta", {}) | ||
| ids = payload.get("ids", {}) | ||
|
|
||
| cached_thinker_decode_embeds = embed.get("cached_decode", None) | ||
| thinker_decode_embed = embed.get("decode", None) | ||
| start_index = meta.get("num_processed_tokens", 0) | ||
| thinker_output_token_ids = ids.get("output", []) | ||
| if start_index >= len(thinker_output_token_ids) - 1: | ||
| # When the tokens output by the thinker are exhausted, an EOS token needs to be appended. | ||
|
Comment on lines
+999
to
+1000
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The new exhaustion check uses Useful? React with 👍 / 👎. |
||
| # Use the finished_flag to mark that all tokens output by thinker have been consumed. | ||
| if meta.get("eos_emitted", False): | ||
| return self.tts_pad_embed.to(device) | ||
| update_dict.setdefault("meta", {})["eos_emitted"] = True | ||
| return self.tts_eos_embed.to(device) | ||
|
|
||
| if cached_thinker_decode_embeds is not None and start_index < cached_thinker_decode_embeds.shape[0]: | ||
| cached_thinker_decode_embeds = cached_thinker_decode_embeds.to(device) | ||
|
|
@@ -1002,20 +1011,10 @@ def _thinker_decode_to_talker_decode( | |
| thinker_decode_embed = thinker_decode_embed.to(device) | ||
| cached_thinker_decode_embeds = torch.cat([cached_thinker_decode_embeds, thinker_decode_embed], dim=0) | ||
| update_dict.setdefault("embed", {})["cached_decode"] = cached_thinker_decode_embeds | ||
|
|
||
| elif thinker_decode_embed is not None: | ||
| else: | ||
| thinker_embed = thinker_decode_embed | ||
| if thinker_embed.device != device: | ||
| thinker_embed = thinker_embed.to(device) | ||
|
|
||
| else: | ||
| # When the tokens output by the thinker are exhausted, an EOS token needs to be appended. | ||
| # Use the finished_flag to mark that all tokens output by thinker have been consumed. | ||
| if meta.get("eos_emitted", False): | ||
| return self.tts_pad_embed.to(device) | ||
| update_dict.setdefault("meta", {})["eos_emitted"] = True | ||
| return self.tts_eos_embed.to(device) | ||
|
|
||
| update_dict.setdefault("embed", {})["decode"] = None | ||
| return self.talker.text_projection(thinker_embed).to(device) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
save_asyncnow unconditionally enqueues every chunk task, so when async-chunk requests are preempted andrequest.num_computed_tokensrolls back, stale chunks can be sent again whileput_req_chunkcontinues forward. Those duplicate/stale payloads are then merged on the receiver side via tensor/list concatenation, which can desynchronize thinker/talker sequence shapes and reintroduce the preemption shape-mismatch failure under normal preempt-and-resume scheduling.Useful? React with 👍 / 👎.