Skip to content

[ray]{feat}: 1) add multithread execution mode and global thread pool management; 2) add launch_reward_fn_sub_thread#2861

Closed
chenchaoxu7575 wants to merge 4 commits intoverl-project:mainfrom
chenchaoxu7575:chenchao_vllm_optim
Closed

[ray]{feat}: 1) add multithread execution mode and global thread pool management; 2) add launch_reward_fn_sub_thread#2861
chenchaoxu7575 wants to merge 4 commits intoverl-project:mainfrom
chenchaoxu7575:chenchao_vllm_optim

Conversation

@chenchaoxu7575
Copy link
Contributor

@chenchaoxu7575 chenchaoxu7575 commented Aug 1, 2025

What does this PR do?

We found that ray.remote() takes long time to submit tasks withlarge batch. We can use a thread pool to run these ray task submissions in the single controller. In an experiment setting of vllm, grpo, geo3k, 8 H100, we can speed up RL training: 1) reward_fn: 5s -> <1s; 2) execute worker (x3): 18s -> 12s.
Here are some experiments of this feature.

1 REWARD FUNCTION

The reward_compute block is hidden in the timeline by using launch_reward_fn_sub_thread=True.

1.1 launch_reward_fn_async = True (default)

launch_reward_fn_async

1.2 launch_reward_fn_sub_thread=True (new)

launch_reward_fn_sub_thread

2. EXECUTE WORKERS

Start time of worker in each rank are different. Workers do not run util the slowest rank starts. Multithread execute can help reduce this overhead cost from 18s to 12s.

2.1 execute_all (default)

exec_all_1 exec_all_2

log

2.2 trainer.execute_mode=all_multithread (new)

exec_threads

log_multithreads

main changes

  • Added ALL_MULTITHREAD to the Execute enum.

  • Updated get_predefined_execute_fn to return the appropriate function for the new execution mode.

  • Implemented execute_all_multithread_submit in RayWorkerGroup for parallel method execution.

  • Introduced GlobalThreadPoolManager to manage a shared thread pool across the application.

  • Updated configuration files to include options for the new execution mode and thread pool size.

  • Added launch_reward_fn_sub_thread configuration option for asynchronous reward computation.

  • Implemented _async_compute_reward_wrapper method in RayPPOTrainer for thread-safe reward computation.

  • Added conditional thread pool initialization based on launch_reward_fn_sub_thread setting.

  • Enhanced reward computation flow to support three modes:

    • launch_reward_fn_sub_thread: True - Use local thread pool for async computation
    • launch_reward_fn_async: True - Use Ray remote function for async computation
    • Both False - Synchronous computation
  • Added mutual exclusivity check between launch_reward_fn_sub_thread and launch_reward_fn_async.

  • Implemented proper thread pool cleanup in training completion.

  • Added execute_mode and execute_thread_pool_size options in ppo_trainer.yaml for user configuration.

  • Added launch_reward_fn_sub_thread: False in reward_model.yaml for reward computation mode control.

  • Renamed thread_pool_size to execute_thread_pool_size for better clarity.

  • Thread pool is only initialized when launch_reward_fn_sub_thread is enabled, reducing resource usage.

  • Reward computation wrapper ensures proper device handling in sub-threads.

  • Safe thread pool shutdown with null checks to prevent errors.

  • Maintains backward compatibility with existing reward computation methods.

This enhancement aims to improve performance and flexibility in distributed execution scenarios, providing both parallel worker group execution and asynchronous reward computation capabilities.

Add concise overview of what this PR aims to achieve or accomplish. Reference related GitHub issues and PRs that help with the review.

Checklist Before Starting

Test

For changes that can not be tested by CI (e.g., algorithm implementation, new model support), validate by experiment(s) and show results like training curve plots, evaluation results, etc.

API and Usage Example

Demonstrate how the API changes if any, and provide usage example(s) if possible.

# Add code snippet or script demonstrating how to use this

Design & Code Changes

Demonstrate the high-level design if this PR is complex, and list the specific changes.

Checklist Before Submitting

Important

Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review.

…management; 2) add launch_reward_fn_sub_thread

- Added `ALL_MULTITHREAD` to the `Execute` enum.
- Updated `get_predefined_execute_fn` to return the appropriate function for the new execution mode.
- Implemented `execute_all_multithread_submit` in `RayWorkerGroup` for parallel method execution.
- Introduced `GlobalThreadPoolManager` to manage a shared thread pool across the application.
- Updated configuration files to include options for the new execution mode and thread pool size.

- Added `launch_reward_fn_sub_thread` configuration option for asynchronous reward computation.
- Implemented `_async_compute_reward_wrapper` method in `RayPPOTrainer` for thread-safe reward computation.
- Added conditional thread pool initialization based on `launch_reward_fn_sub_thread` setting.
- Enhanced reward computation flow to support three modes:
  - `launch_reward_fn_sub_thread: True` - Use local thread pool for async computation
  - `launch_reward_fn_async: True` - Use Ray remote function for async computation
  - Both False - Synchronous computation
- Added mutual exclusivity check between `launch_reward_fn_sub_thread` and `launch_reward_fn_async`.
- Implemented proper thread pool cleanup in training completion.

- Added `execute_mode` and `execute_thread_pool_size` options in `ppo_trainer.yaml` for user configuration.
- Added `launch_reward_fn_sub_thread: False` in `reward_model.yaml` for reward computation mode control.
- Renamed `thread_pool_size` to `execute_thread_pool_size` for better clarity.

- Thread pool is only initialized when `launch_reward_fn_sub_thread` is enabled, reducing resource usage.
- Reward computation wrapper ensures proper device handling in sub-threads.
- Safe thread pool shutdown with null checks to prevent errors.
- Maintains backward compatibility with existing reward computation methods.

This enhancement aims to improve performance and flexibility in distributed execution scenarios, providing both parallel worker group execution and asynchronous reward computation capabilities.
@CLAassistant
Copy link

CLAassistant commented Aug 1, 2025

CLA assistant check
All committers have signed the CLA.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a multithreaded execution mode and asynchronous reward computation, which are great for improving performance. My review focuses on improving maintainability, correctness, and robustness. Key suggestions include refactoring duplicated code, fixing a critical bug in a fallback mechanism, correcting a faulty conditional check, and improving exception handling to avoid masking errors. Addressing these points will make the new features more reliable and easier to maintain.

Comment on lines +60 to +61
except Exception as e:
print(f"[EXECUTE MODE ERROR] Failed to use custom {custom_execute_mode}, falling back to default: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Catching a broad Exception can hide underlying issues and make debugging difficult. It's better to catch more specific exceptions that you expect to handle, such as AttributeError if a method is not found, or ValueError from get_predefined_execute_fn. If you must catch a broad exception, consider logging the full traceback for better diagnostics.

Comment on lines +724 to +725
except Exception as e:
print(f"[WARNING] Global thread pool not available, falling back to sync execution: {e}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Catching a broad Exception here can mask problems with the global_thread_pool_manager, such as configuration errors. This makes debugging harder. It's recommended to catch more specific exceptions if possible, or at least log the full traceback to provide more context on the failure.

Comment on lines +1047 to +1048
"active_threads": len(self._thread_pool._threads),
"queue_size": self._thread_pool._work_queue.qsize() if hasattr(self._thread_pool, '_work_queue') else 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing private attributes _threads and _work_queue of ThreadPoolExecutor is fragile and can break with future Python updates. ThreadPoolExecutor does not provide a public API for these stats. While this might work now, it's a maintainability risk. Consider wrapping the executor to track tasks and provide statistics in a safer way if these stats are critical.

Comment on lines +903 to +925
if OmegaConf.select(self.config.trainer, "execute_mode") is not None:
self.critic_wg.set_execute_mode(self.config.trainer.execute_mode)
self.critic_wg.init_model()

if self.use_reference_policy and not self.ref_in_actor:
self.ref_policy_wg = all_wg["ref"]
# Set execute mode for reference policy worker group
if OmegaConf.select(self.config.trainer, "execute_mode") is not None:
self.ref_policy_wg.set_execute_mode(self.config.trainer.execute_mode)
self.ref_policy_wg.init_model()

if self.use_rm:
self.rm_wg = all_wg["rm"]
# Set execute mode for reward model worker group
if OmegaConf.select(self.config.trainer, "execute_mode") is not None:
self.rm_wg.set_execute_mode(self.config.trainer.execute_mode)
self.rm_wg.init_model()

# we should create rollout at the end so that vllm can have a better estimation of kv cache memory
self.actor_rollout_wg = all_wg["actor_rollout"]
# Set execute mode for actor rollout worker group
if OmegaConf.select(self.config.trainer, "execute_mode") is not None:
self.actor_rollout_wg.set_execute_mode(self.config.trainer.execute_mode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The call to OmegaConf.select(self.config.trainer, "execute_mode") is repeated for each worker group (critic_wg, ref_policy_wg, rm_wg, actor_rollout_wg). This is inefficient and makes the code harder to maintain. Consider fetching the execute_mode once before these blocks and reusing the variable.

chenchaoxu7575 and others added 3 commits August 1, 2025 16:55
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants