Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/doc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: doc_test

on:
# Trigger the workflow on push or pull request,
# but only for the main branch
push:
branches:
- main
- v0.*
pull_request:
branches:
- main
- v0.*
paths:
- "**/*.py"
- "docs/**"
- .github/workflows/doc.yml

# Cancel jobs on the same ref if a new one is triggered
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ github.ref != 'refs/heads/main' }}

# Declare permissions just read content.
permissions:
contents: read # for checkout
pages: write # for deploy-pages
id-token: write # for deploy-pages

jobs:
doc_test:
runs-on: ubuntu-latest
timeout-minutes: 5 # Increase this timeout value as needed
strategy:
matrix:
python-version: ["3.10"]
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
with:
python-version: ${{ matrix.python-version }}
- name: Install the current repository
run: |
pip install -e .[test]
pip install -r docs/requirements-docs.txt

- name: Run doc make html
run: |
cd docs
make clean
make html
File renamed without changes.
26 changes: 26 additions & 0 deletions docs/api/single_controller.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Single Controller interface
============================

The Single Controller provides a unified interface for managing distributed workers
using Ray or other backends and executing functions across them.
It simplifies the process of dispatching tasks and collecting results, particularly
when dealing with data parallelism or model parallelism.


Core APIs
~~~~~~~~~~~~~~~~~

.. autoclass:: verl.single_controller.Worker
:members: __init__, __new__, get_master_addr_port, get_cuda_visible_devices, world_size, rank

.. autoclass:: verl.single_controller.WorkerGroup
:members: __init__, world_size

.. autoclass:: verl.single_controller.ClassWithInitArgs
:members: __init__, __call__

.. autoclass:: verl.single_controller.ResourcePool
:members: __init__, world_size, local_world_size_list, local_rank_list

.. automodule:: verl.single_controller.ray
:members: RayWorkerGroup, create_colocated_worker_cls
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ verl is fast with:
:maxdepth: 1
:caption: API References

data
api/data
api/utils
api/single_controller.rst


.. toctree::
Expand Down
9 changes: 4 additions & 5 deletions docs/start/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,10 @@ Install with AMD GPUs - ROCM kernel support
------------------------------------------------------------------

When you run on AMD GPUs (MI300) with ROCM platform, you cannot use the previous quickstart to run verl. You should follow the following steps to build a docker and run it.

If you encounter any issues in using AMD GPUs running verl, feel free to contact me - `Yusheng Su <https://yushengsu-thu.github.io/>`_.

Find the docker for AMD ROCm: `docker/Dockerfile.rocm <https://github.com/volcengine/verl/blob/main/docker/Dockerfile.rocm>`_
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::

.. code-block:: bash

Expand Down Expand Up @@ -267,15 +266,15 @@ Find the docker for AMD ROCm: `docker/Dockerfile.rocm <https://github.com/volcen
pybind11 && \
pip install -e . --no-deps

Build the image:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Build the image
::::::::::::::::::::::::

.. code-block:: bash

docker build -t verl-rocm .

Launch the container
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
::::::::::::::::::::::::::::

.. code-block:: bash

Expand Down
1 change: 1 addition & 0 deletions docs/workers/sglang_worker.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Installation
Please always follow the following command to install SGLang with verl.

.. code-block:: bash

pip install --upgrade pip
# Currently 0.4.6.post1, subject to updates at any time, please refer to the latest version specified in `setup.py`
pip install -e ".[sglang]"
Expand Down
33 changes: 33 additions & 0 deletions verl/single_controller/base/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@


class Dispatch(DynamicEnum):
"""Enum class defining different dispatch modes for distributed computation.

Each mode represents a specific strategy for distributing data across
different ranks in a distributed system. The modes are used to control
how data is partitioned and processed across different worker groups.
"""

_registry = {}
_next_value = 0

Expand All @@ -47,6 +54,12 @@ def init_predefined_dispatch_mode():


class Execute(DynamicEnum):
"""Enum class defining different execution modes for distributed computation.

These modes control how a function should be executed across different ranks
in a distributed system.
"""

_registry = {}
_next_value = 0

Expand Down Expand Up @@ -490,6 +503,26 @@ def _materialize_futures(*args, **kwargs):


def register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.ALL, blocking=True, materialize_futures=True):
"""Register a function with distributed execution configuration.

This decorator registers a function with specific dispatch and execution modes
for distributed computation. It handles both synchronous and asynchronous
functions, and optionally materializes futures before execution.

Args:
dispatch_mode:
Dispatch mode for computation distribution. Default: Dispatch.ALL_TO_ALL.
execute_mode:
Execute mode for computation distribution. Default: Execute.ALL.
blocking:
Whether the execution should be blocking. Defaults to True.
materialize_futures:
Whether to materialize the data before dispatching. Defaults to True.

Returns:
A decorator that wraps the original function with distributed execution
configuration.
"""
_check_dispatch_mode(dispatch_mode=dispatch_mode)
_check_execute_mode(execute_mode=execute_mode)

Expand Down
52 changes: 51 additions & 1 deletion verl/single_controller/base/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,17 @@ def _get_pid(self):

# we assume that in each WorkerGroup, there is a Master Worker
class Worker(WorkerHelper):
"""A (distributed) worker."""
"""A distributed worker that handles initialization and configuration for distributed training.

This class manages worker initialization, configuration, and provides methods for executing
distributed operations. It handles communication settings, device configuration, and worker
metadata management.
"""

fused_worker_attr_name = "fused_worker_dict"

def __new__(cls, *args, **kwargs):
"""Create a new Worker instance with proper initialization based on environment settings."""
instance = super().__new__(cls)

# note that here we use int to distinguish
Expand All @@ -95,6 +101,14 @@ def __new__(cls, *args, **kwargs):
return instance

def _configure_before_init(self, register_center_name: str, rank: int):
"""Configure worker settings before initialization.

Args:
register_center_name (str):
Name of the register center Ray actor for worker coordination
rank (int):
Rank of the worker in the distributed setup
"""
assert isinstance(rank, int), f"rank must be int, instead of {type(rank)}"

if rank == 0:
Expand Down Expand Up @@ -122,6 +136,12 @@ def env_keys(cls):
return ["WORLD_SIZE", "RANK", "LOCAL_WORLD_SIZE", "LOCAL_RANK", "MASTER_ADDR", "MASTER_PORT", "CUDA_VISIBLE_DEVICES"]

def __init__(self, cuda_visible_devices=None) -> None:
"""Initialize the worker with environment settings and device configuration.

Args:
cuda_visible_devices (str, optional):
CUDA visible devices configuration. Defaults to None.
"""
# construct a meta from environment variable. Note that the import must be inside the class because it is executed remotely
import os

Expand Down Expand Up @@ -175,6 +195,12 @@ def __init__(self, cuda_visible_devices=None) -> None:
self.fused_worker_dict = {}

def get_fused_worker_by_name(self, worker_name: str):
"""Get a fused worker by its name.

Args:
worker_name (str):
Name of the worker to retrieve
"""
return self.fused_worker_dict.get(worker_name, None)

def _configure_with_store(self, store: Dict):
Expand All @@ -192,28 +218,52 @@ def _configure_with_store(self, store: Dict):
os.environ["REDIS_STORE_SERVER_HOST"] = str(self._master_addr).replace("[", "").replace("]", "") if self._master_addr else ""

def get_master_addr_port(self):
"""Get the master address and port for distributed communication."""
return self._master_addr, self._master_port

def get_cuda_visible_devices(self):
"""Get the CUDA visible devices configuration."""
import os

cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "not set")
return cuda_visible_devices

@property
def world_size(self):
"""Get the total number of workers in the distributed setup."""
return self._world_size

@property
def rank(self):
"""Get the rank of this worker in the distributed setup."""
return self._rank

@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO_WITH_FUNC)
def execute_with_func_generator(self, func, *args, **kwargs):
"""Execute a function with function generator dispatch mode.

Args:
func:
Function to execute
*args:
Positional arguments for the function
**kwargs:
Keyword arguments for the function
"""
ret_proto = func(self, *args, **kwargs)
return ret_proto

@register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.RANK_ZERO)
def execute_func_rank_zero(self, func, *args, **kwargs):
"""Execute a function in rank zero execution mode.

Args:
func:
Function to execute
*args:
Positional arguments for the function
**kwargs:
Keyword arguments for the function
"""
result = func(*args, **kwargs)
return result
Loading
Loading