[rollout] feat: enable R3 in fully async mode#5344
[rollout] feat: enable R3 in fully async mode#5344guillemgt wants to merge 5 commits intoverl-project:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
The pull request successfully enables R3 (routed experts) support in fully async mode by updating the generate_for_partial interface and implementing expert extraction in the async servers. However, there are critical issues in the AsyncPartialToolAgentLoop implementation where routed expert data is not correctly preserved during task cancellation and is incorrectly sliced during output construction. Additionally, a truth value check on a numpy array in AsyncPartialToolAgentLoop will cause a runtime error. These issues must be addressed to ensure the correctness and stability of R3 training.
| response_ids = token_outputs.token_ids | ||
| log_probs = token_outputs.log_probs | ||
| routed_experts = token_outputs.routed_experts # already contains routed experts for prefix |
There was a problem hiding this comment.
The routed_experts data should be saved to agent_data immediately after it is received from the server. In the current implementation, if the generation is cancelled (line 185) or terminates early due to length (line 192), the latest routed_experts information is not preserved in agent_data. This will cause the expert routing data to be lost when the task is resumed or completed, which is critical for R3 (Routing Replay) correctness.
| response_ids = token_outputs.token_ids | |
| log_probs = token_outputs.log_probs | |
| routed_experts = token_outputs.routed_experts # already contains routed experts for prefix | |
| response_ids = token_outputs.token_ids | |
| log_probs = token_outputs.log_probs | |
| routed_experts = token_outputs.routed_experts # already contains routed experts for prefix | |
| if routed_experts is not None: | |
| agent_data.routed_experts = routed_experts |
There was a problem hiding this comment.
routed_experts contains the routed experts for both input and output. As described in the PR:
In the case of partial rollouts, the routed experts returned are those for the last part of the generation. This is not ideal, as we should keep track of the returned experts for each part and force the rollout engine to use the previously routed experts. However, rollout engines such as vLLM do not currently support this, so we settle for this implementation.
|
Thank you for your contribution ! |
c24c367 to
a17f8b8
Compare
|
@ArronHZG I have rebased and added more experimental details in the description of the PR. |
1cd47ad to
3cbb55f
Compare
- Remove redundant _FakeTokenOutput, use real TokenOutput instead - Update _FakeServerManager to accept return_routed_experts parameter - Update both generate and generate_for_partial to return TokenOutput - Add test_agent_loop_with_routed_experts_on_cpu to verify R3 functionality
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
3cbb55f to
ae9ce17
Compare
|
@guillemgt Please push R3's work forward a bit more; I've added it to the Q2 roadmap. |
#6029) ### What does this PR do? Fixes a bug in `FullyAsyncLLMServerManager.generate()` where `routed_experts` was incorrectly concatenated via `torch.cat` during partial rollout resume, causing duplicated routing data and broken MoE expert replay in the actor. sglang returns `routed_experts` for the **full sequence** (prompt + all generated tokens). Evidence from sglang source: 1. **[`io_struct.py#L1020`](https://github.com/sgl-project/sglang/blob/v0.5.9/python/sglang/srt/managers/io_struct.py#L1020)** — field definition: ```python # The routed experts for each token, including both input and output tokens # routed_experts[i] is a tensor of shape (token, layer, top_k) for request i routed_experts: List[Optional[torch.Tensor]] ``` 2. **`schedule_batch.py`** — `seqlen` used to collect routing covers the full sequence: ```python @Property def seqlen(self) -> int: return len(self.origin_input_ids) + len(self.output_ids) ``` 3. **`topk.py#L1049-1051`** — capture is unconditional (no prefill/decode check): ```python get_global_experts_capturer().capture(layer_id=layer_id, topk_ids=topk_ids) ``` 4. **`scheduler_output_processor_mixin.py#L105-111`** — collection uses full `seqlen`: ```python req.routed_experts = get_global_experts_capturer().get_routed_experts( req_pool_idx=req.req_pool_idx, seqlen=req.seqlen, # origin_input_ids + output_ids req_to_token_pool=self.req_to_token_pool, ) ``` When partial rollout resumes after abort, the input becomes `prompt + already_generated_tokens`. sglang re-processes the entire input during prefill and returns `routed_experts` covering all positions. The old code concatenated this with the previous `routed_experts`: ``` old routing: prompt + A B C new routing: prompt + A B C + D E concat result: prompt + A B C + prompt + A B C + D E <-- duplicated! expected: prompt + A B C + D E ``` This shifted the routing and caused incorrect MoE expert replay, leading to `actor/ppo_kl` spikes. **Fix:** replace `routed_experts` instead of concatenating, since the resumed call already covers all positions. Related: #4348 (partial rollout RFC), #4101 (R3 router replay), #5344 (R3 in fully async) ### Checklist Before Starting - [x] Search for similar PRs: https://github.com/verl-project/verl/pulls?q=routed_experts+partial_rollout - [x] Format the PR title as `[{modules}] {type}: {description}` ### Test - Ran async training with `partial_rollout=True` and `enable_rollout_routing_replay=True` (R3 mode) - Verified `actor/ppo_kl` no longer spikes after partial rollout resume - Verified `routed_experts` tensor shape matches `(prompt_len + response_len, num_layers, top_k)` after resume ### Design & Code Changes Single-line change in `verl/experimental/fully_async_policy/agent_loop/agent_loop.py`: ```diff - if output.routed_experts is not None: - if final_output.routed_experts is None: - final_output.routed_experts = output.routed_experts - else: - final_output.routed_experts = torch.cat([final_output.routed_experts, output.routed_experts], dim=0) + # sglang returns routed_experts for the full sequence (prompt + all tokens), + # so on partial rollout resume the new output already covers all positions. + if output.routed_experts is not None: + final_output.routed_experts = output.routed_experts ``` ### Checklist Before Submitting - [x] Read the [Contribute Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md). - [x] Apply [pre-commit checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: This is an async distributed training bug that requires multi-node sglang + megatron setup with MoE model and partial rollout enabled. Not feasible to reproduce in CI. - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1). Co-authored-by: vadim <vadim@mail.ru>
What does this PR do?
This PR enables R3 routing replay support in fully async mode.
In the case of partial rollouts, the routed experts returned are those for the last part of the generation. This is not ideal, as we should keep track of the returned experts for each part and force the rollout engine to use the previously routed experts. However, rollout engines such as vLLM do not currently support this, so we settle for this implementation.
The PR also updates the
generate_for_partialinterface to return structuredTokenOutputobjects (more similar to the interface forgenerate).Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,veomni,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,cfg,reward,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
Manually verified by training Qwen3 30B-A3B with R2/R3 on both synchronouos and fully async modes.
Compared to R2, R3 leads to a much lower KL divergence between rollouts and training in both modes.
API and Usage Example
R3 has no API changes.
The
generate_for_partialmethod in the fully async agent loop now returns aTokenOutputobject instead of a tuple:Design & Code Changes
High-level design:
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)recipesubmodule, please also update the reference to the submodule commit viagit submodule update --remoteorcd recipe && git pull origin main. (not applicable)