diff --git a/.github/workflows/doc.yml b/.github/workflows/doc.yml
new file mode 100644
index 00000000000..ff07c07fc39
--- /dev/null
+++ b/.github/workflows/doc.yml
@@ -0,0 +1,52 @@
+name: doc_test
+
+on:
+ # Trigger the workflow on push or pull request,
+ # but only for the main branch
+ push:
+ branches:
+ - main
+ - v0.*
+ pull_request:
+ branches:
+ - main
+ - v0.*
+ paths:
+ - "**/*.py"
+ - "docs/**"
+ - .github/workflows/doc.yml
+
+# Cancel jobs on the same ref if a new one is triggered
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}
+
+# Declare permissions just read content.
+permissions:
+ contents: read # for checkout
+ pages: write # for deploy-pages
+ id-token: write # for deploy-pages
+
+jobs:
+ doc_test:
+ runs-on: ubuntu-latest
+ timeout-minutes: 5 # Increase this timeout value as needed
+ strategy:
+ matrix:
+ python-version: ["3.10"]
+ steps:
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Install the current repository
+ run: |
+ pip install -e .[test]
+ pip install -r docs/requirements-docs.txt
+
+ - name: Run doc make html
+ run: |
+ cd docs
+ make clean
+ make html
diff --git a/docs/data.rst b/docs/api/data.rst
similarity index 100%
rename from docs/data.rst
rename to docs/api/data.rst
diff --git a/docs/api/single_controller.rst b/docs/api/single_controller.rst
new file mode 100644
index 00000000000..f10b6521c87
--- /dev/null
+++ b/docs/api/single_controller.rst
@@ -0,0 +1,26 @@
+Single Controller interface
+============================
+
+The Single Controller provides a unified interface for managing distributed workers
+using Ray or other backends and executing functions across them.
+It simplifies the process of dispatching tasks and collecting results, particularly
+when dealing with data parallelism or model parallelism.
+
+
+Core APIs
+~~~~~~~~~~~~~~~~~
+
+.. autoclass:: verl.single_controller.Worker
+ :members: __init__, __new__, get_master_addr_port, get_cuda_visible_devices, world_size, rank
+
+.. autoclass:: verl.single_controller.WorkerGroup
+ :members: __init__, world_size
+
+.. autoclass:: verl.single_controller.ClassWithInitArgs
+ :members: __init__, __call__
+
+.. autoclass:: verl.single_controller.ResourcePool
+ :members: __init__, world_size, local_world_size_list, local_rank_list
+
+.. automodule:: verl.single_controller.ray
+ :members: RayWorkerGroup, create_colocated_worker_cls
\ No newline at end of file
diff --git a/docs/index.rst b/docs/index.rst
index 041730a2a25..9f456bebf1e 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -106,8 +106,9 @@ verl is fast with:
:maxdepth: 1
:caption: API References
- data
+ api/data
api/utils
+ api/single_controller.rst
.. toctree::
diff --git a/docs/start/install.rst b/docs/start/install.rst
index 81cae89d089..0b6f415b87e 100644
--- a/docs/start/install.rst
+++ b/docs/start/install.rst
@@ -214,11 +214,10 @@ Install with AMD GPUs - ROCM kernel support
------------------------------------------------------------------
When you run on AMD GPUs (MI300) with ROCM platform, you cannot use the previous quickstart to run verl. You should follow the following steps to build a docker and run it.
-
If you encounter any issues in using AMD GPUs running verl, feel free to contact me - `Yusheng Su `_.
Find the docker for AMD ROCm: `docker/Dockerfile.rocm `_
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
.. code-block:: bash
@@ -267,15 +266,15 @@ Find the docker for AMD ROCm: `docker/Dockerfile.rocm None:
+ """Initialize the worker with environment settings and device configuration.
+
+ Args:
+ cuda_visible_devices (str, optional):
+ CUDA visible devices configuration. Defaults to None.
+ """
# construct a meta from environment variable. Note that the import must be inside the class because it is executed remotely
import os
@@ -175,6 +195,12 @@ def __init__(self, cuda_visible_devices=None) -> None:
self.fused_worker_dict = {}
def get_fused_worker_by_name(self, worker_name: str):
+ """Get a fused worker by its name.
+
+ Args:
+ worker_name (str):
+ Name of the worker to retrieve
+ """
return self.fused_worker_dict.get(worker_name, None)
def _configure_with_store(self, store: Dict):
@@ -192,9 +218,11 @@ def _configure_with_store(self, store: Dict):
os.environ["REDIS_STORE_SERVER_HOST"] = str(self._master_addr).replace("[", "").replace("]", "") if self._master_addr else ""
def get_master_addr_port(self):
+ """Get the master address and port for distributed communication."""
return self._master_addr, self._master_port
def get_cuda_visible_devices(self):
+ """Get the CUDA visible devices configuration."""
import os
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "not set")
@@ -202,18 +230,40 @@ def get_cuda_visible_devices(self):
@property
def world_size(self):
+ """Get the total number of workers in the distributed setup."""
return self._world_size
@property
def rank(self):
+ """Get the rank of this worker in the distributed setup."""
return self._rank
@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO_WITH_FUNC)
def execute_with_func_generator(self, func, *args, **kwargs):
+ """Execute a function with function generator dispatch mode.
+
+ Args:
+ func:
+ Function to execute
+ *args:
+ Positional arguments for the function
+ **kwargs:
+ Keyword arguments for the function
+ """
ret_proto = func(self, *args, **kwargs)
return ret_proto
@register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.RANK_ZERO)
def execute_func_rank_zero(self, func, *args, **kwargs):
+ """Execute a function in rank zero execution mode.
+
+ Args:
+ func:
+ Function to execute
+ *args:
+ Positional arguments for the function
+ **kwargs:
+ Keyword arguments for the function
+ """
result = func(*args, **kwargs)
return result
diff --git a/verl/single_controller/base/worker_group.py b/verl/single_controller/base/worker_group.py
index d7761c40613..04c4f15bede 100644
--- a/verl/single_controller/base/worker_group.py
+++ b/verl/single_controller/base/worker_group.py
@@ -25,9 +25,20 @@
class ResourcePool:
- """The resource pool with meta info such as world_size."""
+ """
+ Manages a pool of resources across multiple nodes, tracking process counts and GPU allocations.
+ The class provides methods to calculate world size, local world sizes, and local ranks
+ across all nodes in the pool.
+ """
def __init__(self, process_on_nodes=None, max_colocate_count: int = 10, n_gpus_per_node=8) -> None:
+ """Initialize the ResourcePool with node processes and GPU configuration.
+
+ Args:
+ process_on_nodes (List[int], optional): List of process counts per node. Defaults to empty list.
+ max_colocate_count (int, optional): Maximum number of processes that can be colocated. Defaults to 10.
+ n_gpus_per_node (int, optional): Number of GPUs available per node. Defaults to 8.
+ """
if process_on_nodes is None:
process_on_nodes = []
self._store = process_on_nodes
@@ -39,6 +50,7 @@ def add_node(self, process_count):
@property
def world_size(self):
+ """Total number of processes across all nodes in the pool."""
return sum(self._store)
def __call__(self) -> Any:
@@ -49,38 +61,53 @@ def store(self):
return self._store
def local_world_size_list(self) -> List[int]:
+ """Returns a flat list where each process has its local world size."""
nested_local_world_size_list = [[local_world_size for _ in range(local_world_size)] for local_world_size in self._store]
return [item for row in nested_local_world_size_list for item in row]
def local_rank_list(self) -> List[int]:
+ """Returns a flat list of local ranks for all processes across all nodes."""
nested_local_rank_list = [[i for i in range(local_world_size)] for local_world_size in self._store]
return [item for row in nested_local_rank_list for item in row]
class ClassWithInitArgs:
"""
- This class stores a class constructor and the args/kwargs to construct the class.
- It is used to instantiate the remote class.
+ Wrapper class that stores constructor arguments for deferred instantiation.
+ This class is particularly useful for remote class instantiation where
+ the actual construction needs to happen at a different time or location.
"""
def __init__(self, cls, *args, **kwargs) -> None:
+ """Initialize the ClassWithInitArgs instance.
+
+ Args:
+ cls: The class to be instantiated later
+ *args: Positional arguments for the class constructor
+ **kwargs: Keyword arguments for the class constructor
+ """
self.cls = cls
self.args = args
self.kwargs = kwargs
self.fused_worker_used = False
- # def add_arg(self, arg):
- # self.args += (arg,)
-
- # def add_kwarg(self, key, value):
- # self.kwargs[key] = value
-
def __call__(self) -> Any:
+ """Instantiate the stored class with the stored arguments."""
return self.cls(*self.args, **self.kwargs)
def check_workers_alive(workers: List, is_alive: Callable, gap_time: float = 1) -> None:
+ """Continuously monitors worker processes and raises SIGABRT if any worker dies.
+
+ Args:
+ workers (List):
+ List of worker objects to monitor
+ is_alive (Callable):
+ Function to check if a worker is alive
+ gap_time (float):
+ Time interval between checks
+ """
import time
while True:
@@ -92,7 +119,10 @@ def check_workers_alive(workers: List, is_alive: Callable, gap_time: float = 1)
class WorkerGroup:
- """A group of workers"""
+ """
+ Base class for managing a group of workers in a distributed system.
+ The class provides methods for worker management, aliveness checking, and method binding.
+ """
fused_worker_execute_fn_name = "_fuw_execute"
@@ -116,9 +146,11 @@ def __init__(self, resource_pool: ResourcePool, **kwargs) -> None:
self._checker_thread: threading.Thread = None
def _is_worker_alive(self, worker):
+ """Check if a worker is alive. Must be implemented by derived classes."""
raise NotImplementedError("WorkerGroup._is_worker_alive called, should be implemented in derived class.")
def _block_until_all_workers_alive(self) -> None:
+ """Blocks until all workers in the group are alive."""
while True:
all_state = [self._is_worker_alive(worker) for worker in self._workers]
if False in all_state:
@@ -127,6 +159,11 @@ def _block_until_all_workers_alive(self) -> None:
break
def start_worker_aliveness_check(self, every_n_seconds=1) -> None:
+ """Starts a background thread to monitor worker aliveness.
+
+ Args:
+ every_n_seconds (int): Interval between aliveness checks
+ """
# before starting checking worker aliveness, make sure all workers are already alive
self._block_until_all_workers_alive()
@@ -135,16 +172,19 @@ def start_worker_aliveness_check(self, every_n_seconds=1) -> None:
@property
def world_size(self):
+ """Number of workers in the group."""
return len(self._workers)
- # execute_all_async and execute_rank_zero_async should be implemented by RayWorkerGroup, TorchRPCWorkerGroup,
- # MegatronWorkerGroup, XperfWorkerGroup should skip
-
def _bind_worker_method(self, user_defined_cls, func_generator):
- """
- Bind the worker method to the WorkerGroup
- """
+ """Binds worker methods to the WorkerGroup based on registered attributes.
+ Args:
+ user_defined_cls (type): The class containing methods to bind
+ func_generator (Callable): Function that generates the bound method
+
+ Returns:
+ List[str]: List of method names that were successfully bound
+ """
method_names = []
for method_name in dir(user_defined_cls):
try:
diff --git a/verl/single_controller/ray/base.py b/verl/single_controller/ray/base.py
index 1941ea2f353..c0822e3cf27 100644
--- a/verl/single_controller/ray/base.py
+++ b/verl/single_controller/ray/base.py
@@ -145,6 +145,13 @@ def merge_resource_pool(rp1: RayResourcePool, rp2: RayResourcePool) -> RayResour
class RayClassWithInitArgs(ClassWithInitArgs):
+ """A wrapper class for Ray actors with initialization arguments.
+
+ This class extends ClassWithInitArgs to provide additional functionality for
+ configuring and creating Ray actors with specific resource requirements and
+ scheduling strategies.
+ """
+
def __init__(self, cls, *args, **kwargs) -> None:
# self._options = kwargs.pop('options', dict())
super().__init__(cls, *args, **kwargs)
@@ -152,12 +159,34 @@ def __init__(self, cls, *args, **kwargs) -> None:
self._additional_resource = {}
def set_additional_resource(self, additional_resource):
+ """Set additional resource requirements for the actor.
+
+ Args:
+ additional_resource: Dictionary specifying additional resource requirements
+ """
self._additional_resource = additional_resource
def update_options(self, options: Dict):
+ """Update the Ray actor creation options.
+
+ Args:
+ options: Dictionary of options to update
+ """
self._options.update(options)
def __call__(self, placement_group, placement_group_bundle_idx, use_gpu: bool = True, num_gpus=1, sharing_with=None) -> Any:
+ """Create and return a Ray actor with the configured options.
+
+ Args:
+ placement_group: Ray placement group for scheduling
+ placement_group_bundle_idx: Index of the bundle in the placement group
+ use_gpu: Whether to use GPU resources
+ num_gpus: Number of GPUs to allocate
+ sharing_with: Actor to share resources with
+
+ Returns:
+ A Ray actor handle with the configured options
+ """
if sharing_with is not None:
target_node_id = ray.get(sharing_with.get_node_id.remote())
cuda_visible_devices = ray.get(sharing_with.get_cuda_visible_devices.remote())
@@ -181,6 +210,13 @@ def __call__(self, placement_group, placement_group_bundle_idx, use_gpu: bool =
class RayWorkerGroup(WorkerGroup):
+ """A group of Ray workers that can be managed collectively.
+
+ This class extends WorkerGroup to provide Ray-specific functionality for
+ creating and managing groups of Ray actors with specific resource requirements
+ and scheduling strategies.
+ """
+
def __init__(
self,
resource_pool: RayResourcePool = None,
@@ -193,6 +229,18 @@ def __init__(
ray_wait_register_center_timeout: int = 300,
**kwargs,
) -> None:
+ """Initialize a RayWorkerGroup.
+
+ Args:
+ resource_pool: Resource pool for worker allocation
+ ray_cls_with_init: Class with initialization arguments for workers
+ bin_pack: Whether to use strict bin packing for resource allocation
+ name_prefix: Prefix for worker names
+ detached: Whether workers should be detached
+ worker_names: Names of existing workers to attach to
+ ray_wait_register_center_timeout: Timeout for waiting on register center
+ **kwargs: Additional keyword arguments
+ """
super().__init__(resource_pool=resource_pool, **kwargs)
self.ray_cls_with_init = ray_cls_with_init
self.name_prefix = get_random_string(length=6) if name_prefix is None else name_prefix
@@ -218,6 +266,14 @@ def __init__(
self.method_names = []
def _is_worker_alive(self, worker: ray.actor.ActorHandle):
+ """Check if a worker actor is still alive.
+
+ Args:
+ worker: Ray actor handle to check
+
+ Returns:
+ bool: True if the worker is alive, False otherwise
+ """
worker_state_dict = get_actor(worker._actor_id.hex())
return worker_state_dict.get("state", "undefined") == "ALIVE" if worker_state_dict is not None else False
@@ -231,6 +287,14 @@ def _init_with_detached_workers(self, worker_names, worker_handles):
self._world_size = len(worker_names)
def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, detached):
+ """Initialize the worker group by creating new workers from a resource pool.
+
+ Args:
+ resource_pool: Resource pool for worker allocation
+ ray_cls_with_init: Class with initialization arguments for workers
+ bin_pack: Whether to use strict bin packing for resource allocation
+ detached: Whether workers should be detached
+ """
use_gpu = resource_pool.use_gpu
strategy = "PACK"
@@ -327,21 +391,32 @@ def from_detached(
worker_handles=None,
ray_cls_with_init=None,
):
+ """Create a worker group from existing detached workers.
+
+ Args:
+ name_prefix: Prefix for worker names
+ worker_names: Names of existing workers to attach to
+ ray_cls_with_init: Class with initialization arguments for workers
+
+ Returns:
+ A new RayWorkerGroup instance
+ """
worker_group = cls(resource_pool=None, ray_cls_with_init=ray_cls_with_init, name_prefix=name_prefix, worker_names=worker_names, worker_handles=worker_handles)
return worker_group
def spawn(self, prefix_set):
- """
- spawn to a dictionary of worker groups, each with a subset of method with prefix.
+ """Spawn to a dictionary of worker groups, each with a subset of method with prefix.
+
+ Args:
+ prefix_set: Set of prefixes to create worker groups for
+ Returns:
+ Dictionary of worker groups keyed by prefix
"""
if self.fused_worker_used:
return self.spawn_fused(prefix_set)
def _rebind_actor_methods(worker_group, actor_name):
- """
- bind the method with actor_prefix to its original name
- """
prefix: str = actor_name + "_"
for method_name in dir(worker_group):
if method_name.startswith(prefix):
@@ -364,6 +439,14 @@ def _rebind_actor_methods(worker_group, actor_name):
return new_worker_group_dict
def spawn_fused(self, prefix_set):
+ """Create a dictionary of worker groups for fused workers.
+
+ Args:
+ prefix_set: Set of prefixes to create worker groups for
+
+ Returns:
+ Dictionary of worker groups keyed by prefix
+ """
wg_dict = dict()
for key in prefix_set:
new_wg = deepcopy(self)
@@ -373,6 +456,11 @@ def spawn_fused(self, prefix_set):
return wg_dict
def fuse(self, prefix_set):
+ """Fuse multiple worker groups into the current worker group.
+
+ Args:
+ prefix_set: Set of prefixes to fuse into the worker group
+ """
if self.wg_dict is None:
self.wg_dict = self.spawn(prefix_set)
for role_name, role_wg in self.wg_dict.items():
@@ -380,6 +468,17 @@ def fuse(self, prefix_set):
self.method_names = self._bind_worker_method(self.ray_cls_with_init.cls, func_generator)
def _execute_remote_single_worker(self, worker, method_name: str, *args, **kwargs):
+ """Execute a method on a single worker remotely.
+
+ Args:
+ worker: The worker actor handle
+ method_name: Name of the method to execute
+ *args: Positional arguments for the method
+ **kwargs: Keyword arguments for the method
+
+ Returns:
+ Remote object reference to the method execution
+ """
if self.fused_worker_used and method_name not in self.method_names:
remote_call = getattr(worker, self.fused_worker_execute_fn_name)
return remote_call.remote(f"{self.sub_cls_name}_fwmn_{method_name}", *args, **kwargs)
@@ -388,21 +487,81 @@ def _execute_remote_single_worker(self, worker, method_name: str, *args, **kwarg
return remote_call.remote(*args, **kwargs)
def execute_rank_zero_sync(self, method_name: str, *args, **kwargs):
+ """Execute a method on rank zero worker synchronously.
+
+ Args:
+ method_name: Name of the method to execute
+ *args: Positional arguments for the method
+ **kwargs: Keyword arguments for the method
+
+ Returns:
+ Result of the method execution
+ """
return ray.get(self.execute_rank_zero_async(method_name, *args, **kwargs))
def execute_rank_zero_async(self, method_name: str, *args, **kwargs):
+ """Execute a method on rank zero worker asynchronously.
+
+ Args:
+ method_name: Name of the method to execute
+ *args: Positional arguments for the method
+ **kwargs: Keyword arguments for the method
+
+ Returns:
+ Remote object reference to the method execution
+ """
return self._execute_remote_single_worker(self._workers[0], method_name, *args, **kwargs)
def execute_rank_zero(self, method_name: str, *args, **kwargs):
+ """Alias for execute_rank_zero_async.
+
+ Args:
+ method_name: Name of the method to execute
+ *args: Positional arguments for the method
+ **kwargs: Keyword arguments for the method
+
+ Returns:
+ Remote object reference to the method execution
+ """
return self.execute_rank_zero_async(method_name, *args, **kwargs)
def execute_all(self, method_name: str, *args, **kwargs):
+ """Alias for execute_all_async.
+
+ Args:
+ method_name: Name of the method to execute
+ *args: Positional arguments for the method
+ **kwargs: Keyword arguments for the method
+
+ Returns:
+ List of remote object references to the method executions
+ """
return self.execute_all_async(method_name, *args, **kwargs)
def execute_all_sync(self, method_name: str, *args, **kwargs):
+ """Execute a method on all workers synchronously.
+
+ Args:
+ method_name: Name of the method to execute
+ *args: Positional arguments for the method
+ **kwargs: Keyword arguments for the method
+
+ Returns:
+ List of results from all workers
+ """
return ray.get(self.execute_all_async(method_name, *args, **kwargs))
def execute_all_async(self, method_name: str, *args, **kwargs):
+ """Execute a method on all workers asynchronously.
+
+ Args:
+ method_name: Name of the method to execute
+ *args: Positional arguments for the method
+ **kwargs: Keyword arguments for the method
+
+ Returns:
+ List of remote object references to the method executions
+ """
# Here, we assume that if all arguments in args and kwargs are lists,
# and their lengths match len(self._workers), we'll distribute each
# element in these lists to the corresponding worker
diff --git a/verl/single_controller/ray/megatron.py b/verl/single_controller/ray/megatron.py
index 8baf03e6f17..4f56ac1bfab 100644
--- a/verl/single_controller/ray/megatron.py
+++ b/verl/single_controller/ray/megatron.py
@@ -30,6 +30,14 @@ class NVMegatronRayWorkerGroup(RayWorkerGroup, MegatronWorkerGroup):
"""
def __init__(self, resource_pool: RayResourcePool, ray_cls_with_init: RayClassWithInitArgs, **kwargs):
+ """
+ Initialize the NVMegatronRayWorkerGroup.
+
+ Args:
+ resource_pool (RayResourcePool): The resource pool containing worker resources
+ ray_cls_with_init (RayClassWithInitArgs): The Ray class with initialization arguments
+ **kwargs: Additional keyword arguments to pass to the parent class
+ """
super().__init__(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init, **kwargs)
self._megatron_rank_info: DistRankInfo = self.execute_all_sync(method_name="get_megatron_rank_info")
self._megatron_global_info: DistGlobalInfo = ray.get(self.execute_rank_zero_async(method_name="get_megatron_global_info"))