[trainer,vllm] feat: add a recipe to support reorder rollout#4160
[trainer,vllm] feat: add a recipe to support reorder rollout#4160echo-rain wants to merge 8 commits intoverl-project:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new recipe to handle long-tail rollouts by reordering them, which is a valuable addition for optimizing training time. The implementation adds new Reorder variants of AgentLoopManager and RayPPOTrainer, with the core logic relying on separate queues for finished and unfinished rollouts. My review has identified several critical issues in the asynchronous and queue handling logic that could lead to deadlocks or incorrect behavior. I've also pointed out a minor issue with a duplicated decorator. Addressing these points will be crucial for the stability and correctness of this new feature.
recipe/rollpacker/agent_loop.py
Outdated
| kwargs = {k: v[i] for k, v in batch.non_tensor_batch.items()} | ||
| self.tasks.append(asyncio.create_task(self._run_agent_loop(sampling_params, trajectory_info[i], **kwargs))) | ||
|
|
||
| results = await asyncio.gather(*self.tasks) |
There was a problem hiding this comment.
The call to asyncio.gather is missing return_exceptions=True. Without it, if any of the awaited tasks are cancelled, asyncio.gather will raise a CancelledError immediately, and the subsequent loop to handle finished and unfinished tasks will not be executed. This would break the core logic of reordering rollouts.
| results = await asyncio.gather(*self.tasks) | |
| results = await asyncio.gather(*self.tasks, return_exceptions=True) |
recipe/rollpacker/agent_loop.py
Outdated
| ray.get(worker.set_queue.remote(self.queue)) | ||
| ray.get(self.queue.append_worker.remote(worker)) | ||
|
|
||
| ray.get(worker.set_unfinished_queue.remote(self.queue)) |
There was a problem hiding this comment.
The unfinished_queue is incorrectly set to self.queue during initialization. This will cause DataProto objects from unfinished tasks to be mixed with _InternalAgentLoopOutput objects from finished tasks in the same queue. The _postprocess function, which expects only _InternalAgentLoopOutput objects, will then fail. This line should be removed as the correct unfinished_queue is set later via set_unfinished_queue.
recipe/rollpacker/ray_trainer.py
Outdated
| def _get_all_from_queue(queue, max_item=None): | ||
| items = [] | ||
| while not queue.empty(): | ||
| try: | ||
| item = queue.get() | ||
| items.append(item) | ||
| except asyncio.QueueEmpty: | ||
| break | ||
| if not max_item and len(items) >= max_item: | ||
| break | ||
| return items |
There was a problem hiding this comment.
The function _get_all_from_queue is intended to drain a ray.util.queue.Queue but has several critical bugs:
- It uses a blocking
queue.get()call within awhile not queue.empty()loop, which is prone to race conditions and can cause the process to hang indefinitely. - It incorrectly catches
asyncio.QueueEmptyinstead ofqueue.Emptyfrom Python's standard library, which is whatray.util.queue.Queueraises. - The logic for
max_itemis flawed and will raise aTypeErrorifmax_itemisNone.
To fix this, the function should use get_nowait() in a loop and handle queue.Empty correctly. You will also need to import queue at the top of the file, and import asyncio can be removed as it's no longer used.
| def _get_all_from_queue(queue, max_item=None): | |
| items = [] | |
| while not queue.empty(): | |
| try: | |
| item = queue.get() | |
| items.append(item) | |
| except asyncio.QueueEmpty: | |
| break | |
| if not max_item and len(items) >= max_item: | |
| break | |
| return items | |
| def _get_all_from_queue(queue, max_item=None): | |
| items = [] | |
| while True: | |
| if max_item is not None and len(items) >= max_item: | |
| break | |
| try: | |
| item = queue.get_nowait() | |
| items.append(item) | |
| except queue.Empty: | |
| break | |
| return items |
|
|
||
|
|
||
| @ray.remote | ||
| @ray.remote |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new recipe to support reordering rollouts, aiming to mitigate the long-tail effect in training. The changes are substantial, adding new components like ReorderAgentLoopManager, AgentLoopReorderWorker, and RayPPOReorderTrainer. While the overall approach is sound, I have identified a few critical issues related to race conditions, incorrect API usage, and a lack of robustness in data processing. These issues could lead to deadlocks or runtime crashes and should be addressed to ensure the stability and correctness of this new feature.
recipe/rollpacker/agent_loop.py
Outdated
| if inputs[0].response_logprobs is not None: | ||
| optional_outputs["rollout_log_probs"] = torch.cat([input.response_logprobs for input in inputs], dim=0) |
There was a problem hiding this comment.
The current logic for handling response_logprobs is not robust and can lead to a crash. It checks if inputs[0].response_logprobs is not None and then attempts to concatenate response_logprobs from all inputs. If some inputs have response_logprobs as None while others have tensors, torch.cat will fail with a TypeError.
You should handle the case where response_logprobs can be a mix of tensors and None values in the inputs list. A possible solution is to replace None values with a tensor of zeros of the correct shape before concatenation.
first_valid_logprob = next((inp.response_logprobs for inp in inputs if inp.response_logprobs is not None), None)
if first_valid_logprob is not None:
dummy_logprobs = torch.zeros_like(first_valid_logprob)
logprobs_to_cat = [
inp.response_logprobs if inp.response_logprobs is not None else dummy_logprobs for inp in inputs
]
optional_outputs["rollout_log_probs"] = torch.cat(logprobs_to_cat, dim=0)
recipe/rollpacker/ray_trainer.py
Outdated
| def _get_all_from_queue(queue, max_item=None): | ||
| items = [] | ||
| while not queue.empty(): | ||
| try: | ||
| item = queue.get() | ||
| items.append(item) | ||
| except asyncio.QueueEmpty: | ||
| break | ||
| if max_item and len(items) >= max_item: | ||
| break | ||
| return items |
There was a problem hiding this comment.
The implementation of _get_all_from_queue has a critical bug that can lead to a deadlock.
- It uses
queue.get()which is a blocking call, inside awhile not queue.empty()loop. This is a race condition: if the queue becomes empty between the check and the call,get()will block forever. - It catches
asyncio.QueueEmpty, butray.util.queue.Queueraisesqueue.Emptyfrom Python's standard library for non-blocking gets.
A safer implementation would be to use queue.get_nowait() within a try...except queue.Empty block. Please also import the queue module.
| def _get_all_from_queue(queue, max_item=None): | |
| items = [] | |
| while not queue.empty(): | |
| try: | |
| item = queue.get() | |
| items.append(item) | |
| except asyncio.QueueEmpty: | |
| break | |
| if max_item and len(items) >= max_item: | |
| break | |
| return items | |
| def _get_all_from_queue(queue, max_item=None): | |
| import queue | |
| items = [] | |
| num_to_get = queue.qsize() | |
| if max_item is not None: | |
| num_to_get = min(num_to_get, max_item) | |
| for _ in range(num_to_get): | |
| try: | |
| item = queue.get_nowait() | |
| items.append(item) | |
| except queue.Empty: | |
| break | |
| return items |
recipe/rollpacker/agent_loop.py
Outdated
| ray.get(worker.set_queue.remote(self.queue)) | ||
| ray.get(self.queue.append_worker.remote(worker)) | ||
|
|
||
| ray.get(worker.set_unfinished_queue.remote(self.queue)) |
There was a problem hiding this comment.
This line incorrectly sets the unfinished_queue on the worker to be a QueueMonitor actor handle, while the worker expects a ray.util.queue.Queue object. This would cause a runtime AttributeError when put_async is called on it.
Although this is currently overwritten by a subsequent call in RayPPOReorderTrainer, this line is buggy, confusing, and should be removed to avoid future issues.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a feature to reorder rollouts to mitigate the long-tail effect, which is a great initiative for optimizing training time. The changes are extensive, touching upon agent loops, the PPO trainer, and adding new queue utilities. While the overall logic for reordering seems sound, I've identified two critical issues in the implementation that could lead to process hangs or crashes. One is related to improper exception handling during task cancellation, and the other concerns incorrect queue handling in a multiprocess context. Addressing these issues is crucial for the stability of the new feature.
| try: | ||
| server = self._choose_server(request_id) | ||
| task = server.generate.remote( | ||
| request_id=uuid4().hex, # use new request_id for each turn | ||
| prompt_ids=prompt_ids, | ||
| sampling_params=sampling_params, | ||
| image_data=image_data, | ||
| ) | ||
| self.ray_tasks.append(task) | ||
| output = await task | ||
| return output | ||
| except Exception as e: | ||
| logger.error(f"server manager got exception: {e}") |
There was a problem hiding this comment.
The try...except Exception block is too broad. It catches ray.exceptions.TaskCancelledError, logs it as an error, and then implicitly returns None. The caller, _run_agent_loop, does not expect None and will crash with an AttributeError when trying to access attributes on the None object. This will prevent the cancellation logic from working correctly.
Cancellation errors should be handled specifically, and other exceptions should be re-raised to avoid hiding bugs. I suggest handling ray.exceptions.TaskCancelledError by raising an asyncio.CancelledError to correctly propagate cancellation in the asyncio context, and re-raising other exceptions.
| try: | |
| server = self._choose_server(request_id) | |
| task = server.generate.remote( | |
| request_id=uuid4().hex, # use new request_id for each turn | |
| prompt_ids=prompt_ids, | |
| sampling_params=sampling_params, | |
| image_data=image_data, | |
| ) | |
| self.ray_tasks.append(task) | |
| output = await task | |
| return output | |
| except Exception as e: | |
| logger.error(f"server manager got exception: {e}") | |
| try: | |
| server = self._choose_server(request_id) | |
| task = server.generate.remote( | |
| request_id=uuid4().hex, # use new request_id for each turn | |
| prompt_ids=prompt_ids, | |
| sampling_params=sampling_params, | |
| image_data=image_data, | |
| ) | |
| self.ray_tasks.append(task) | |
| output = await task | |
| return output | |
| except ray.exceptions.TaskCancelledError as e: | |
| raise asyncio.CancelledError from e | |
| except Exception as e: | |
| logger.error(f"server manager got exception: {e}") | |
| raise |
| def _get_all_from_queue(queue, max_item=None) -> list[Any]: | ||
| """ | ||
| Get all items from a queue. | ||
| Args: | ||
| queue: queue to get items from. | ||
| max_item: limit the number of items to return. | ||
|
|
||
| Returns: | ||
| list[Any]: list of items. | ||
| """ | ||
| items = [] | ||
| while not queue.empty(): | ||
| try: | ||
| item = queue.get_nowait() | ||
| items.append(item) | ||
| except asyncio.QueueEmpty: | ||
| break | ||
| if max_item and len(items) >= max_item: | ||
| break | ||
| return items |
There was a problem hiding this comment.
The function _get_all_from_queue has two critical issues:
- It uses
except asyncio.QueueEmpty. The queue being used is aray.util.queue.Queue, which is a multiprocess queue. Itsget_nowait()method raisesqueue.Empty(aliased asray.exceptions.QueueEmpty), notasyncio.QueueEmpty. This means the exception will never be caught. - The
while not queue.empty():check is a race condition in a multiprocess context. The queue can become empty between the check and theget_nowait()call. Combined with the wrong exception type, this can lead to an infinite loop.
The combination of these issues will likely cause the training process to hang. The correct way to drain a queue is to loop indefinitely and break when get_nowait() raises the appropriate empty exception.
| def _get_all_from_queue(queue, max_item=None) -> list[Any]: | |
| """ | |
| Get all items from a queue. | |
| Args: | |
| queue: queue to get items from. | |
| max_item: limit the number of items to return. | |
| Returns: | |
| list[Any]: list of items. | |
| """ | |
| items = [] | |
| while not queue.empty(): | |
| try: | |
| item = queue.get_nowait() | |
| items.append(item) | |
| except asyncio.QueueEmpty: | |
| break | |
| if max_item and len(items) >= max_item: | |
| break | |
| return items | |
| def _get_all_from_queue(queue, max_item=None) -> list[Any]: | |
| """ | |
| Get all items from a queue. | |
| Args: | |
| queue: queue to get items from. | |
| max_item: limit the number of items to return. | |
| Returns: | |
| list[Any]: list of items. | |
| """ | |
| items = [] | |
| while True: | |
| try: | |
| if max_item and len(items) >= max_item: | |
| break | |
| item = queue.get_nowait() | |
| items.append(item) | |
| except ray.exceptions.QueueEmpty: | |
| break | |
| return items |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a 'reorder rollout' mechanism to mitigate the long-tail effect in training, especially for responses with long sequences. The core idea is to identify long-running rollout requests, cancel them to avoid blocking a training step, and then group these cancelled 'long-tail' requests into separate batches for processing in later steps. This is achieved by introducing a queueing system for rollout tasks, allowing the trainer to proceed with completed tasks while unfinished ones are requeued.
My review identifies two critical issues. First, in verl/experimental/agent_loop/agent_loop.py, the _postprocess function can fail with an IndexError if it receives an empty list of inputs, which can occur if all rollout tasks are cancelled. Second, in verl/trainer/ppo/ray_trainer.py, the main training loop in the fit method is structured in a way that processing requeued long-tail batches introduces extra training steps. This incorrectly increments global_steps, which will disrupt learning rate schedules, validation frequency, and checkpointing. These issues need to be addressed to ensure the correctness and robustness of the new feature.
| def _postprocess(inputs: list[_InternalAgentLoopOutput]) -> DataProto: | ||
| """Process the padded outputs from _run_agent_loop and combine them into a batch.""" | ||
| # Convert lists back to tensors and stack them to create a batch. | ||
| prompt_ids = torch.cat([input.prompt_ids for input in inputs], dim=0) |
There was a problem hiding this comment.
The _postprocess function does not handle the case where inputs is an empty list. If inputs is empty, torch.cat on an empty list of tensors can raise an error (depending on the PyTorch version), and accessing inputs[0] at line 317 will definitely raise an IndexError. This scenario can occur in the reorder rollout flow if no requests are completed before being cancelled. The function should gracefully handle an empty list, for example by returning an empty DataProto. Note that this might have downstream effects, for example in _performance_metrics, which may also need to be updated to handle empty inputs.
| def _postprocess(inputs: list[_InternalAgentLoopOutput]) -> DataProto: | |
| """Process the padded outputs from _run_agent_loop and combine them into a batch.""" | |
| # Convert lists back to tensors and stack them to create a batch. | |
| prompt_ids = torch.cat([input.prompt_ids for input in inputs], dim=0) | |
| def _postprocess(inputs: list[_InternalAgentLoopOutput]) -> DataProto: | |
| """Process the padded outputs from _run_agent_loop and combine them into a batch.""" | |
| if not inputs: | |
| return DataProto( | |
| batch=TensorDict({}, batch_size=[0]), | |
| non_tensor_batch={}, | |
| meta_info={"metrics": [], "reward_extra_keys": []}, | |
| ) | |
| # Convert lists back to tensors and stack them to create a batch. | |
| prompt_ids = torch.cat([input.prompt_ids for input in inputs], dim=0) |
| for batch_dict in self.train_dataloader: | ||
| metrics = {} | ||
| timing_raw = {} | ||
|
|
||
| with marked_timer("start_profile", timing_raw): | ||
| self._start_profiling( | ||
| not prev_step_profile and curr_step_profile | ||
| if self.config.global_profiler.profile_continuous_steps | ||
| else curr_step_profile | ||
| ) | ||
| train_batch_size = self.config.data.train_batch_size * self.config.actor_rollout_ref.rollout.n | ||
| if self.reorder_rollout and self.unfinished_queue.qsize() >= train_batch_size: | ||
| # When using reordering, some incomplete long-tail requests are generated, | ||
| # so these requests need to be grouped into an additional batch for inference. | ||
| unfinished_batch_list = _get_all_from_queue(self.unfinished_queue, max_item=train_batch_size) | ||
| batch = collate_fn(unfinished_batch_list) | ||
| self._step(batch, logger, epoch, 1) | ||
|
|
||
| batch: DataProto = DataProto.from_single_dict(batch_dict) | ||
| batch.meta_info["temperature"] = self.config.actor_rollout_ref.rollout.temperature | ||
| self._step(batch, logger, epoch, self.config.actor_rollout_ref.rollout.n) |
There was a problem hiding this comment.
The fit loop is structured to iterate over self.train_dataloader. However, with the reorder_rollout feature, an extra training step (self._step) is performed whenever the unfinished_queue has enough items to form a batch. Since self._step increments self.global_steps, this leads to more training steps than originally planned, as self.total_training_steps is calculated based on the dataloader size. This will cause several issues:
- The training will run for more steps than
total_training_steps. - The
tqdmprogress bar will go beyond its total. - Learning rate schedules, validation frequency, and checkpointing frequency, which are all based on
self.global_steps, will be incorrect.
A possible solution would be to restructure the training loop to be based on self.global_steps (e.g., using a while loop) instead of iterating over the dataloader directly, and to handle fetching batches from either the dataloader or the unfinished_queue within that loop.
Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
… and inference batch size caused step offset. Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
Signed-off-by: shanyicheng.syc <shanyicheng.syc@antgroup.com>
|
@echo-rain when train_batch_size and gen_batch_size is small (like 8 and 10), rollpacker logic works. But when increase train_batch_size to 128, gen_batch_size to 130 and rollout.n to 16, vllm rollout time become very slow, seems related to these code |
|
@yang-ybb It seems unlikely to be a queue issue. Although the queue lacks multi-process concurrency optimizations, this design is intended to prevent a large number of requests from completing within the same time window due to variable request completion times. |
|
@echo-rain ok~
|
|
@yang-ybb got it thank you. |
|
@yang-ybb To be precise, this performance issue isn't caused by concurrency. The root cause is that the |
|
@echo-rain yes, it may be put large data use ray actor. time cost normal after change to self.queue.put.remote(1), and put results in each AgentLoopWorker. and in AgentLoopManager, use worker.get_results.remote() can gather all results to main thread. |
|
@yang-ybb Great idea, thanks for the suggestion. However, in practice, any remote call will serialize the transmitted data. Perhaps a better implementation would be to store the larger tensor in the main process and mark it with some kind of pointer, then queue the pointer for transmission. |
|
@echo-rain The normal behavior should be(the only difference in script is whether gen_train_size equals to train_batch_size): Do you have any comments? As far as I know, theoretically RollPacker shouldn’t affect the convergence of RL training. |
|
@cboss6 Thank you for your comment. In fact, we have already noticed this issue, which is why this PR has not been merged for so long.The results show that RollPacker does have an impact on training performance.In my opinion, adding long requests to the same batch leads to two potential problems. First, the aggregation of long requests causes a sharp increase in length-related penalties. Second, long requests are more easily truncated, resulting in generally lower reward scores. Of course, this is just my conjecture. If you have better conclusions or solutions, please feel free to discuss them with me. |







What does this PR do?
The long-tail effect in the current rollout phase is more pronounced in scenarios with long max response lengths. Therefore, we need a solution to address the long-tail effect in this scenario, thereby reducing the overall training time.
The PR strategy for this project comes from RollPacker
Add multiple requests during the data preparation phase. Once the required number of requests for training is met, end all training rollout requests and save them to a list. In a later step, re-add them in a long-tail batch.
experimental data in GRPO-Qwen3-8b_bf16-tp2-ep1 gen_batch_size=18 train_batch_size=16

normal mean ablation experimental group
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI){modules}includefsdp,megatron,sglang,vllm,rollout,trainer,ci,training_utils,recipe,hardware,deployment,ray,worker,single_controller,misc,perf,model,algo,env,tool,ckpt,doc,data,like[megatron, fsdp, doc]{type}is infeat,fix,refactor,chore,test[BREAKING]to the beginning of the title.[BREAKING][fsdp, megatron] feat: dynamic batchingTest
example: examples/grpo_trainer/run_qwen3-8b_reorder.sh
API and Usage Example
You can enable this feature by adding a batch size larger than the training batch size.
data.train_batch_size=8 +data.gen_batch_size=10Design & Code Changes
1. Data preparation stage
Modify the existing data preparation logic, adding a new variable to configure the number of additional data records that need to be prepared. Add additional logic during data preparation to use identified long-tail requests to form batches.

2. rollout post-process
The original

asycio.gathercheck logic for the end of a rollout needs to be replaced with a check of the number of completed rollout requests. At the same time, unprocessed requests need to be marked as long-tail requests, added to a new long-tail request list, and theabortinterface used to abort the request.3. reload req
Here we offer two options as solutions: adding in small amounts and multiple times, and adding all at once in multiple batches.
The meaning of adding a small number of requests is that long-tail requests from each step will be added to the next round of rollout1. After the requests are added, the long-tail request list is cleared2. However, due to the limited learning rate, requests that exhibited a long-tail effect in the previous round are likely to exhibit a long-tail effect in this round as well. This may lead to a rollback phenomenon, i.e., a rollback to the original implementation that uses the newly added requests. However, if the one-step-off-policy strategy of using the streaming return results of the previous round as the prompt input3 can be used, the rollback phenomenon can be largely avoided.

Adding multiple requests together means that all long-tail requests are added to the training process to execute a complete training step, provided they form a complete batch. The drawback of this approach is that long-sequence requests that should be distributed across multiple training steps are concentrated into a single step, which may limit the learning rate and prevent effective learning of long-sequence representation capabilities.


normal step:
unfinished req step:
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)