[recipe] feat: asynchronous reward agent with mini-batch pipeline and one-step off-policy training#2854
[recipe] feat: asynchronous reward agent with mini-batch pipeline and one-step off-policy training#2854haolinyan wants to merge 23 commits intoverl-project:mainfrom
Conversation
- add spacing in the header - Ffix numerical errors and unified precision in "Training Time and Percentage." - update the agent reward configuration introduction, emphasizing support for LLM-as-a-Judge, RAG, etc., and provided an OpenAI API-based LLM-as-a-Judge example - fix some syntax errors
There was a problem hiding this comment.
Code Review
This pull request introduces a significant new feature: an asynchronous reward agent designed to improve RL training efficiency by overlapping communication with computation. The implementation includes a mini-batch pipeline and a one-step off-policy strategy, with support for both FSDP and Megatron backends. The code is well-documented with comprehensive READMEs and example scripts.
My review has identified a critical race condition in the reward_agent.py that could lead to incorrect behavior in reward processing. I've also found a high-severity correctness issue in both dp_actor.py and megatron_actor.py related to inconsistent logic for selecting the policy loss function. Addressing these points will be crucial for the stability and correctness of this new feature.
| request.group_dict[index][0][intra_data_index] = (score, valid_response_length) | ||
| request.group_dict[index][1] -= 1 | ||
| timestamps.append(datetime.datetime.now().isoformat()) | ||
| queries.append(query) | ||
| results.append(response) | ||
| latencies.append(end_time - start_time) | ||
| group_uids.append(index) | ||
|
|
||
| if request.group_dict[index][1] == 0: |
There was a problem hiding this comment.
There is a potential race condition in the proxy_func method. Multiple worker threads from the ThreadPoolExecutor can access and modify the shared request.group_dict concurrently for the same group_uid (index).
The operation request.group_dict[index][1] -= 1 (line 215) is not atomic. If two threads execute this line for the same group (index) concurrently, one of the decrements could be lost. This can lead to incorrect group completion logic, where a group might be processed before all its rewards are collected, or never processed at all.
To fix this, you should introduce a threading.Lock to protect the read and write access to request.group_dict within the for future in as_completed(futures): loop. I recommend creating a lock in the __init__ method (e.g., self.group_dict_lock = threading.Lock()) and using a with self.group_dict_lock: block around the critical sections that modify request.group_dict.
There was a problem hiding this comment.
The access to request.group_dict only occurs within a single deamon thread, so there is no race condition.
for (
data_source,
response_str,
ground_truth,
extra_info,
group_uid,
data_idx,
valid_response_length,
) in request.request_data:
# Each worker thread independently executes the user-defined function (self.user_defined_func)
# without modifying the request object.
future = self.executor.submit(
self.user_defined_func, data_source, response_str, ground_truth, extra_info
)
future.meta_info = [group_uid, data_idx, valid_response_length, time.time(), response_str]
futures.append(future)
for future in as_completed(futures):
score, query, response = future.result()
end_time = time.time()
index, intra_data_index, valid_response_length, start_time, response_str = future.meta_info
if index not in request.group_dict:
print(f"Warning: index {index} not in request.group_dict, add it in func: proxy_func")
request.group_dict[index] = [dict(), request.group_size]
request.group_dict[index][0][intra_data_index] = (score, valid_response_length)
request.group_dict[index][1] -= 1
timestamps.append(datetime.datetime.now().isoformat())
queries.append(query)
results.append(response)
latencies.append(end_time - start_time)
group_uids.append(index)|
|
||
| loss_mode = self.config.policy_loss.get("loss_mode", "vanilla") | ||
|
|
||
| if self.config.policy_loss.loss_mode == "vanilla": |
There was a problem hiding this comment.
There's an inconsistency in how the policy loss mode is checked. On line 435, you define a local variable loss_mode = self.config.policy_loss.get("loss_mode", "vanilla"). However, this if condition checks self.config.policy_loss.loss_mode == "vanilla" instead of using the local loss_mode variable. This could lead to incorrect behavior if the configuration structure changes or if loss_mode is intended to be the single source of truth for this logic. The elif on line 450 correctly uses the loss_mode variable. For consistency and correctness, you should use the loss_mode variable in this if condition.
| if self.config.policy_loss.loss_mode == "vanilla": | |
| if loss_mode == "vanilla": |
|
|
||
| loss_mode = self.config.policy_loss.get("loss_mode", "vanilla") | ||
|
|
||
| if self.config.policy_loss.loss_mode == "vanilla": |
There was a problem hiding this comment.
There's an inconsistency in how the policy loss mode is checked. On line 433, you define a local variable loss_mode = self.config.policy_loss.get("loss_mode", "vanilla"). However, this if condition checks self.config.policy_loss.loss_mode == "vanilla" instead of using the local loss_mode variable. This could lead to incorrect behavior if the configuration structure changes or if loss_mode is intended to be the single source of truth for this logic. The elif on line 448 correctly uses the loss_mode variable. For consistency and correctness, you should use the loss_mode variable in this if condition.
| if self.config.policy_loss.loss_mode == "vanilla": | |
| if loss_mode == "vanilla": |
|
thanks! could u remove the tensorboard artifacts from this PR? |
Got it, I've removed the tensorboard artifacts in the new commit. Please check if everything looks good now. Let me know if there's anything else needed for this PR. |
|
@eric-haibin-lin hi, the latest commit (dcd8dc3) has passed all CI checks. Could you please review the changes when you have time? I've removed the TensorBoard artifacts as requested, and everything should be ready for your final check. If you're unavailable, I'd also appreciate it if you could suggest or assign another appropriate reviewer. Thanks for your time! |
|
@haolinyan Thanks for your great work. Batch rollout mode has some drawbacks:
Due to the above reasons, we're going to deprecate it and switch to server rollout mode: Agent Loop. |
|
@wuxibin89 Thank you for your comments! It is true that the calculation of reward scores in In addition, we would like to emphasize that the main dilemma we face in using RL for many industrial tasks currently lies in how to define effective reward signals. Therefore, we adopt the method of remote rewards, guiding LLMs (such as GPT-4) to score responses through prompt design, so as to realize rapid exploration and experience accumulation in the initial stage (we believe we are not alone in this). In this context, we propose this scheme to enhance the training efficiency of verl, with the hope that it can be more efficiently applied to a wide range of tasks. Finally, to avoid duplicate development, we would like to ask whether there are already relevant development plans to implement reward calculation in |
|
@haolinyan Good job! But I have an error after running your recipe. Error is "omegaconf.errors.ConfigAttributeError: Key 'ray_init' is not in struct" on recipe/async_reward_agent/main_ppo.py 226 |
@edc3000 Thanks for using our recipe! The error occurs because We recommend merging our PR based on this commit: 3e2bceb and trying the training again. Let us know if you run into any further issues! |
@haolinyan, Your work is very helpful to me, and I am using your recipe to train my RL model. But now I found that entropy in actor is unusual, like this picture (However, the reward and response length are normal and rising). I very need your help.
|
|
@edc3000 I suspect this might tie to your training parameter settings—specifically, PPO-related coefficients like |
|
@haolinyan I think this might be the code in async_reward_agent/main_ppo.py. In the function run(), the fsdp_workers is imported by verl, not your code. I changed it like |


What does this PR do?
This PR introduces the asynchronous reward agent to schedule and mitigate communication bottlenecks in RL training scenarios that rely on remote reward services (e.g., LLM-as-a-Judge, RAG, hybrid rule-based scoring). By leveraging the “mini-batch pipeline + one-step off-policy” strategy, it overlaps communication latency with GPU computation, significantly improving training efficiency.
Checklist Before Starting
[{modules}] {type}: {description}(This will be checked by the CI)Test
To validate this solution, we utilize the GSM8K dataset and introduce randomized artificial delays ranging from 1 to 40 seconds during reward computation for each sample, simulating the latency typically incurred when calling remote reward services.
The experimental results show that:
API and Usage Example
1. Reward Function Configuration:
Users can flexibly integrate a remote reward service (such as LLM-as-a-Judge, RAG-enhanced scoring, hybrid rule-based + model scoring, etc) in two ways:
For example, users can implement a reward class that calls the OpenAI-style API to score individual responses, then performs group-wise post-processing of the results.
Then, specify the function name and file path into the following training configuration:
2. Training Configuration
When launching a training process, the parameters below should be configured:
python3 -m recipe.async_reward_agent.main_ppo \ # make sure you set the correct path of the config folder --config-path="${HOME}/verl/trainer/config" \ custom_reward_function.path=${reward_file} \ custom_reward_function.name=${reward_function_name} \ reward_model.reward_manager=batch \ reward_model.launch_reward_fn_async=True \ # enable mini-batch pipeline strategy +mini_batch_pipeline=TrueDesign & Code Changes
We designed an asynchronous reward agent that handles concurrent requests and manages their lifecycle. Then we leveraged the one-step off-policy training and mini-batch pipeline starategies to achieve overlapping of communication latency with computation:
For detailed design and code changes, please refer to Doc/文档.
Checklist Before Submitting
Important
Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.
pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=alwaysci-requestchannel in theverlSlack workspace. (If not accessible, please try the Feishu group (飞书群).)