diff --git a/.buildkite/test-amd.yaml b/.buildkite/test-amd.yaml index eb331aaf9d43..a4a8778fe620 100644 --- a/.buildkite/test-amd.yaml +++ b/.buildkite/test-amd.yaml @@ -1573,7 +1573,7 @@ steps: - tests/compile/fullgraph/test_basic_correctness.py - examples/offline_inference/rlhf.py - examples/offline_inference/rlhf_colocate.py - - examples/offline_inference/new_weight_syncing/ + - examples/rl/ - tests/examples/offline_inference/data_parallel.py - tests/v1/distributed - tests/v1/engine/test_engine_core_client.py @@ -1615,7 +1615,7 @@ steps: - VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py - popd # NEW rlhf examples - - pushd ../examples/offline_inference/new_weight_syncing + - pushd ../examples/rl - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_async_new_apis.py @@ -2660,7 +2660,7 @@ steps: - tests/v1/entrypoints/openai/test_multi_api_servers.py - tests/v1/shutdown - tests/v1/worker/test_worker_memory_snapshot.py - - examples/offline_inference/new_weight_syncing/ + - examples/rl/ commands: # Work around HIP bug tracked here: https://github.com/ROCm/hip/issues/3876 # TODO: Remove when the bug is fixed in a future ROCm release @@ -3325,7 +3325,7 @@ steps: - tests/compile/fullgraph/test_basic_correctness.py - examples/offline_inference/rlhf.py - examples/offline_inference/rlhf_colocate.py - - examples/offline_inference/new_weight_syncing/ + - examples/rl/ - tests/examples/offline_inference/data_parallel.py - tests/v1/distributed - tests/v1/engine/test_engine_core_client.py @@ -3367,7 +3367,7 @@ steps: - VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py - popd # NEW rlhf examples - - pushd ../examples/offline_inference/new_weight_syncing + - pushd ../examples/rl - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_async_new_apis.py diff --git a/.buildkite/test_areas/distributed.yaml b/.buildkite/test_areas/distributed.yaml index 331103ceebd1..03ffc5a274a3 100644 --- a/.buildkite/test_areas/distributed.yaml +++ b/.buildkite/test_areas/distributed.yaml @@ -82,7 +82,7 @@ steps: - label: Distributed Torchrun + Examples (4 GPUs) timeout_in_minutes: 30 - working_dir: "/vllm-workspace/tests" + working_dir: "/vllm-workspace" num_devices: 4 source_file_dependencies: - vllm/distributed/ @@ -90,33 +90,28 @@ steps: - tests/distributed/test_torchrun_example_moe.py - examples/offline_inference/rlhf.py - examples/offline_inference/rlhf_colocate.py - - examples/offline_inference/new_weight_syncing/ + - examples/rl/ - tests/examples/offline_inference/data_parallel.py commands: # https://github.com/NVIDIA/nccl/issues/1838 - export NCCL_CUMEM_HOST_ENABLE=0 # test with torchrun tp=2 and external_dp=2 - - torchrun --nproc-per-node=4 distributed/test_torchrun_example.py + - torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example.py # test with torchrun tp=2 and pp=2 - - PP_SIZE=2 torchrun --nproc-per-node=4 distributed/test_torchrun_example.py + - PP_SIZE=2 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example.py # test with torchrun tp=4 and dp=1 - - TP_SIZE=4 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py + - TP_SIZE=4 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py # test with torchrun tp=2, pp=2 and dp=1 - - PP_SIZE=2 TP_SIZE=2 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py + - PP_SIZE=2 TP_SIZE=2 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py # test with torchrun tp=1 and dp=4 with ep - - DP_SIZE=4 ENABLE_EP=1 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py + - DP_SIZE=4 ENABLE_EP=1 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py # test with torchrun tp=2 and dp=2 with ep - - TP_SIZE=2 DP_SIZE=2 ENABLE_EP=1 torchrun --nproc-per-node=4 distributed/test_torchrun_example_moe.py + - TP_SIZE=2 DP_SIZE=2 ENABLE_EP=1 torchrun --nproc-per-node=4 tests/distributed/test_torchrun_example_moe.py # test with internal dp - - python3 ../examples/offline_inference/data_parallel.py --enforce-eager - # OLD rlhf examples - - cd ../examples/offline_inference - - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf.py - - VLLM_ALLOW_INSECURE_SERIALIZATION=1 RAY_DEDUP_LOGS=0 python3 rlhf_colocate.py - # NEW rlhf examples - - cd new_weight_syncing - - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_nccl.py - - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 rlhf_ipc.py + - python3 examples/offline_inference/data_parallel.py --enforce-eager + # rlhf examples + - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 examples/rl/rlhf_nccl.py + - VLLM_ALLOW_INSECURE_SERIALIZATION=1 python3 examples/rl/rlhf_ipc.py - label: Distributed DP Tests (4 GPUs) timeout_in_minutes: 30 diff --git a/docs/mkdocs/hooks/generate_examples.py b/docs/mkdocs/hooks/generate_examples.py index e886a91e6573..194db05e395e 100644 --- a/docs/mkdocs/hooks/generate_examples.py +++ b/docs/mkdocs/hooks/generate_examples.py @@ -23,15 +23,18 @@ def title(text: str) -> str: # Custom substitutions subs = { "io": "IO", - "api": "API", + "rl": "RL", + "api(s?)": r"API\1", "cli": "CLI", "cpu": "CPU", + "ipc": "IPC", "llm": "LLM", "mae": "MAE", "ner": "NER", "tpu": "TPU", "gguf": "GGUF", "lora": "LoRA", + "nccl": "NCCL", "rlhf": "RLHF", "vllm": "vLLM", "openai": "OpenAI", @@ -196,6 +199,11 @@ def generate(self) -> str: def on_startup(command: Literal["build", "gh-deploy", "serve"], dirty: bool): + # Monkey-patch dirname_to_title in awesome-nav so that sub-directory names are + # title-cased (e.g. "Offline Inference" instead of "Offline inference"). + import mkdocs_awesome_nav.nav.directory as _nav_dir + + _nav_dir.dirname_to_title = title logger.info("Generating example documentation") logger.debug("Root directory: %s", ROOT_DIR.resolve()) logger.debug("Example directory: %s", EXAMPLE_DIR.resolve()) diff --git a/docs/training/async_rl.md b/docs/training/async_rl.md new file mode 100644 index 000000000000..172466f89039 --- /dev/null +++ b/docs/training/async_rl.md @@ -0,0 +1,63 @@ +# Async Reinforcement Learning + +## Overview + +In a standard RL training loop, generation and training happen sequentially: the policy generates rollouts, then training runs on those rollouts, and the cycle repeats. During generation the training accelerators sit idle, and vice versa. + +The **one-off pipelining** approach separates the generation and training phases into two parallel coroutines, allowing the model to generate new samples while simultaneously training on previously generated data. This can lead to better GPU utilization and greater training throughput. + +However, this overlap introduces a complication: weights must be updated in the inference engine mid-flight, while requests may still be in progress. + +## The Pause and Resume API + +To safely update weights while the inference engine is running, vLLM provides `pause_generation` and `resume_generation` methods. These let the trainer coordinate a clean window for weight synchronization without losing in-flight work. + +### pause_generation + +```python +await engine.pause_generation(mode="keep", clear_cache=True) +``` + +The `mode` parameter controls how in-flight requests are handled: + +| Mode | Behavior | +| ---- | -------- | +| `"abort"` | Abort all in-flight requests immediately and return partial results (default) | +| `"wait"` | Wait for all in-flight requests to finish before pausing | +| `"keep"` | Freeze requests in the queue; they resume when `resume_generation` is called | + +The `clear_cache` parameter controls whether to clear the KV cache and prefix cache after pausing. + +### resume_generation + +```python +await engine.resume_generation() +``` + +Resumes the scheduler after a pause. Any requests frozen with `mode="keep"` will continue generating. + +### HTTP Endpoints + +When using the vLLM HTTP server, the same functionality is available via: + +- `POST /pause?mode=keep` - Pause generation +- `POST /resume` - Resume generation + +!!! note "Data Parallelism" + When using data parallelism with vLLM's **internal load balancer** (i.e. `data_parallel_backend="ray"`), pause and resume are handled automatically across all DP ranks -- a single call is sufficient. When using an **external load balancer** (i.e. multiple independent vLLM instances behind a proxy), you must send pause and resume requests to **every** engine instance individually before and after the weight update. + +## Typical Async RL Flow + +A typical async RL loop with weight syncing looks like this: + +1. Start generating rollouts from the current policy +2. Once trainer has new weights to update to, pause generation with `mode="keep"` +3. Sync the updated weights from the trainer to the inference engine (see [Weight Transfer](weight_transfer/README.md)) +4. Resume generation -- in-flight requests continue with the new weights +5. Repeat + +The key insight is that requests paused with `mode="keep"` will produce tokens from the **old** weights before the pause and tokens from the **new** weights after resume. The `clear_cache` parameter controls whether the KV cache is invalidated during the pause. When `clear_cache=True`, previously cached key-value entries are discarded, so all tokens generated after resume will be computed entirely with the new weights. When `clear_cache=False`, existing KV cache entries are retained, meaning some tokens in context may still reflect the old weights (stale KV cache). + +## Example + +The [async RLHF example](../examples/rl/rlhf_async_new_apis.md) demonstrates this pattern with `vllm.AsyncLLMEngine`, NCCL weight transfer, and mid-flight pause/resume with validation. diff --git a/docs/training/rlhf.md b/docs/training/rlhf.md index 0b7e384dc8d6..3eddd4fbecfb 100644 --- a/docs/training/rlhf.md +++ b/docs/training/rlhf.md @@ -16,11 +16,9 @@ The following open-source RL libraries use vLLM for fast rollouts (sorted alphab - [Unsloth](https://github.com/unslothai/unsloth) - [verl](https://github.com/volcengine/verl) -See the following basic examples to get started if you don't want to use an existing library: +For weight synchronization between training and inference, see the [Weight Transfer](weight_transfer/README.md) documentation, which covers the pluggable backend system with [NCCL](weight_transfer/nccl.md) (multi-GPU) and [IPC](weight_transfer/ipc.md) (same-GPU) engines. -- [Training and inference processes are located on separate GPUs (inspired by OpenRLHF)](../examples/offline_inference/rlhf.md) -- [Training and inference processes are colocated on the same GPUs using Ray](../examples/offline_inference/rlhf_colocate.md) -- [Utilities for performing RLHF with vLLM](../examples/offline_inference/rlhf_utils.md) +For pipelining generation and training to improve GPU utilization and throughput, see the [Async Reinforcement Learning](async_rl.md) guide, which covers the pause/resume API for safely updating weights mid-flight. See the following notebooks showing how to use vLLM for GRPO: diff --git a/docs/training/weight_transfer/README.md b/docs/training/weight_transfer/README.md new file mode 100644 index 000000000000..17afd2bc8965 --- /dev/null +++ b/docs/training/weight_transfer/README.md @@ -0,0 +1,78 @@ +# Weight Transfer + +vLLM provides a pluggable weight transfer system for synchronizing model weights from a training process to the inference engine during reinforcement learning (RL) workflows. This is essential for RLHF, GRPO, and other online RL methods where the policy model is iteratively updated during training and the updated weights must be reflected in the inference engine for rollout generation. + +## Architecture + +The weight transfer system follows a **two-phase protocol** with a pluggable backend design: + +1. **Initialization** (`init_weight_transfer_engine`): Establishes the communication channel between the trainer and inference workers. Called once before the training loop begins. +2. **Weight Update** (`update_weights`): Transfers updated weights from the trainer to the inference engine. Called after each training step (or batch of steps). + +## Available Backends + +| Backend | Transport | Use Case | +| ------- | --------- | -------- | +| [NCCL](nccl.md) | NCCL broadcast | Separate GPUs for training and inference | +| [IPC](ipc.md) | CUDA IPC handles | Colocated training and inference on same GPU | + +## Configuration + +Specify the weight transfer backend through `WeightTransferConfig`. The backend determines which engine handles the weight synchronization. + +### Programmatic (Offline Inference) + +```python +from vllm import LLM +from vllm.config import WeightTransferConfig + +llm = LLM( + model="my-model", + weight_transfer_config=WeightTransferConfig(backend="nccl"), # or "ipc" +) +``` + +### CLI (Online Serving) + +```bash +vllm serve my-model \ + --weight-transfer-config '{"backend": "nccl"}' +``` + +The `backend` field accepts `"nccl"` (default) or `"ipc"`. + +## API Endpoints + +When running vLLM as an HTTP server, the following endpoints are available for weight transfer: + +| Endpoint | Method | Description | +| -------- | ------ | ----------- | +| `/init_weight_transfer_engine` | POST | Initialize the weight transfer engine with backend-specific info | +| `/update_weights` | POST | Trigger a weight update with backend-specific metadata | +| `/pause` | POST | Pause generation before weight sync to handle inflight requests | +| `/resume` | POST | Resume generation after weight sync | +| `/get_world_size` | GET | Get the number of inference workers (useful for NCCL world size calculation) | + +!!! note + The HTTP weight transfer endpoints require `VLLM_SERVER_DEV_MODE=1` to be set. + +## Trainer-Side API + +Both backends provide static methods that the trainer calls to send weights. The general pattern is: + +```python +# 1. Initialize the transfer engine (backend-specific) +EngineClass.trainer_init(init_info) + +# 2. Send weights to inference workers +EngineClass.trainer_send_weights( + iterator=model.named_parameters(), + trainer_args=backend_specific_args, +) +``` + +See the [NCCL](nccl.md) and [IPC](ipc.md) pages for backend-specific trainer APIs and full examples. + +## Extending the System + +The weight transfer system is designed to be extensible. You can implement custom backends by subclassing `WeightTransferEngine` and registering them with the factory. See the [Base Class](base.md) page for details. diff --git a/docs/training/weight_transfer/base.md b/docs/training/weight_transfer/base.md new file mode 100644 index 000000000000..973ec8ad9f55 --- /dev/null +++ b/docs/training/weight_transfer/base.md @@ -0,0 +1,162 @@ +# Base Class and Custom Engines + +The weight transfer system is built on an abstract base class that defines the contract between vLLM's worker infrastructure and the transport backend. You can implement custom backends by subclassing `WeightTransferEngine` and registering them with the `WeightTransferEngineFactory`. + +## WeightTransferEngine + +The `WeightTransferEngine` is a generic abstract class parameterized by two dataclass types: + +- **`TInitInfo`** (extends `WeightTransferInitInfo`): Backend-specific initialization parameters. +- **`TUpdateInfo`** (extends `WeightTransferUpdateInfo`): Backend-specific weight update metadata. + +### Abstract Methods + +Subclasses must implement these four methods: + +| Method | Side | Description | +| ------ | ---- | ----------- | +| `init_transfer_engine(init_info)` | Inference | Initialize the communication channel on each inference worker | +| `receive_weights(update_info, load_weights)` | Inference | Receive weights and call `load_weights` incrementally | +| `shutdown()` | Inference | Clean up resources | +| `trainer_send_weights(iterator, trainer_args)` | Trainer | Static method to send weights from the trainer process | + +### Request Classes + +The API-level request classes provide backend-agnostic serialization using plain dictionaries. The engine's `parse_init_info` and `parse_update_info` methods convert these dictionaries into typed dataclasses. + +```python +from vllm.distributed.weight_transfer.base import ( + WeightTransferInitRequest, + WeightTransferUpdateRequest, +) + +# Init request (dict is converted to backend-specific TInitInfo) +init_request = WeightTransferInitRequest( + init_info={"master_address": "10.0.0.1", "master_port": 29500, ...} +) + +# Update request (dict is converted to backend-specific TUpdateInfo) +update_request = WeightTransferUpdateRequest( + update_info={"names": [...], "dtype_names": [...], "shapes": [...]} +) +``` + +### WeightTransferUpdateInfo + +The base `WeightTransferUpdateInfo` includes an `is_checkpoint_format` flag: + +```python +@dataclass +class WeightTransferUpdateInfo(ABC): + is_checkpoint_format: bool = True +``` + +When `is_checkpoint_format=True` (the default), vLLM applies layerwise weight processing (repacking, renaming, etc.) on the received weights before loading them. Set to `False` if the trainer has already converted weights to the kernel format expected by the model. + +## Implementing a Custom Engine + +To create a custom weight transfer backend: + +### 1. Define Info Dataclasses + +```python +from dataclasses import dataclass +from vllm.distributed.weight_transfer.base import ( + WeightTransferEngine, + WeightTransferInitInfo, + WeightTransferUpdateInfo, +) + +@dataclass +class MyInitInfo(WeightTransferInitInfo): + endpoint: str + token: str + +@dataclass +class MyUpdateInfo(WeightTransferUpdateInfo): + names: list[str] + dtype_names: list[str] + shapes: list[list[int]] + # Add custom fields as needed +``` + +### 2. Implement the Engine + +```python +from collections.abc import Callable, Iterator +from typing import Any +import torch + +class MyWeightTransferEngine(WeightTransferEngine[MyInitInfo, MyUpdateInfo]): + init_info_cls = MyInitInfo + update_info_cls = MyUpdateInfo + + def init_transfer_engine(self, init_info: MyInitInfo) -> None: + # Set up connection to trainer using init_info.endpoint, etc. + ... + + def receive_weights( + self, + update_info: MyUpdateInfo, + load_weights: Callable[[list[tuple[str, torch.Tensor]]], None], + ) -> None: + # Receive each weight and call load_weights incrementally + for name, dtype_name, shape in zip( + update_info.names, update_info.dtype_names, update_info.shapes + ): + dtype = getattr(torch, dtype_name) + weight = self._fetch_weight(name, shape, dtype) + load_weights([(name, weight)]) + + def shutdown(self) -> None: + # Clean up resources + ... + + @staticmethod + def trainer_send_weights( + iterator: Iterator[tuple[str, torch.Tensor]], + trainer_args: dict[str, Any], + ) -> None: + # Send weights from the trainer process + for name, tensor in iterator: + # Send tensor via custom transport + ... +``` + +!!! important + The `load_weights` callable passed to `receive_weights` should be called **incrementally** (one or a few weights at a time) rather than accumulating all weights first. This avoids GPU out-of-memory errors with large models. + +### 3. Register with the Factory + +```python +from vllm.distributed.weight_transfer.factory import WeightTransferEngineFactory + +# Option 1: Lazy loading (recommended for built-in engines) +WeightTransferEngineFactory.register_engine( + "my_backend", + "my_package.my_module", + "MyWeightTransferEngine", +) + +# Option 2: Direct class registration +WeightTransferEngineFactory.register_engine( + "my_backend", + MyWeightTransferEngine, +) +``` + +Once registered, users can select your backend via `WeightTransferConfig(backend="my_backend")`. + +## WeightTransferEngineFactory + +The factory uses a registry pattern with lazy loading. Built-in engines (`nccl` and `ipc`) are registered at import time but their modules are only loaded when the backend is actually requested. This avoids importing heavy dependencies (like NCCL communicators) when they aren't needed. + +```python +from vllm.distributed.weight_transfer.factory import WeightTransferEngineFactory + +# Create an engine from config +engine = WeightTransferEngineFactory.create_engine( + config=weight_transfer_config, + parallel_config=parallel_config, +) +``` diff --git a/docs/training/weight_transfer/ipc.md b/docs/training/weight_transfer/ipc.md new file mode 100644 index 000000000000..8e19fa7b429b --- /dev/null +++ b/docs/training/weight_transfer/ipc.md @@ -0,0 +1,73 @@ +# IPC Engine + +The IPC weight transfer engine uses **CUDA IPC** (Inter-Process Communication) handles to share GPU memory directly between the trainer and inference workers on the **same node and same GPU**. This avoids any data copying, making it a efficient option when colocating training and inference. + +## When to Use IPC + +- Training and inference on the **same GPU** (colocated) +- You want to minimize memory overhead by sharing tensors in-place + +## How It Works + +1. The trainer creates CUDA tensors for each weight and generates IPC handles using `torch.multiprocessing.reductions.reduce_tensor`. +2. IPC handles are sent to the inference engine via **Ray.remote()** or **HTTP POST**. +3. The inference worker reconstructs the tensors from the handles, reading directly from the trainer's GPU memory. + +!!! warning + IPC handles involve sending serialized Python objects. When using HTTP transport, you must set `VLLM_ALLOW_INSECURE_SERIALIZATION=1` on both the server and client. This is because IPC handles are pickled and base64-encoded for HTTP transmission. + +## Initialization + +The IPC backend requires no initialization on either side. The `init_transfer_engine` call is a no-op for IPC. + +## Sending Weights + +IPC supports two transport modes for delivering the handles: + +### Ray Mode + +Used when vLLM is running as a Ray actor: + +```python +from vllm.distributed.weight_transfer.ipc_engine import ( + IPCTrainerSendWeightsArgs, + IPCWeightTransferEngine, +) + +trainer_args = IPCTrainerSendWeightsArgs( + mode="ray", + llm_handle=llm_actor_handle, +) + +IPCWeightTransferEngine.trainer_send_weights( + iterator=model.named_parameters(), + trainer_args=trainer_args, +) +``` + +In Ray mode, the engine calls `llm_handle.update_weights.remote(...)` directly, passing the IPC handles via Ray's serialization. + +### HTTP Mode + +Used when vLLM is running as an HTTP server: + +```python +trainer_args = IPCTrainerSendWeightsArgs( + mode="http", + url="http://localhost:8000", +) + +IPCWeightTransferEngine.trainer_send_weights( + iterator=model.named_parameters(), + trainer_args=trainer_args, +) +``` + +In HTTP mode, IPC handles are pickled, base64-encoded, and sent as JSON to the `/update_weights` endpoint. + +See [`IPCTrainerSendWeightsArgs`](https://github.com/vllm-project/vllm/blob/main/vllm/distributed/weight_transfer/ipc_engine.py) for the full list of configurable fields. + +## Examples + +- [RLHF with IPC weight syncing (offline, Ray)](../../examples/rl/rlhf_ipc.md) - Colocated training and inference on a single GPU using Ray placement groups and CUDA IPC handles +- [RLHF with IPC weight syncing (online serving, HTTP)](../../examples/rl/rlhf_http_ipc.md) - Weight transfer with a vLLM HTTP server where both server and trainer share the same GPU diff --git a/docs/training/weight_transfer/nccl.md b/docs/training/weight_transfer/nccl.md new file mode 100644 index 000000000000..a50b3664d89d --- /dev/null +++ b/docs/training/weight_transfer/nccl.md @@ -0,0 +1,110 @@ +# NCCL Engine + +The NCCL weight transfer engine uses [NCCL](https://developer.nvidia.com/nccl) broadcast operations to transfer weights from the trainer to inference workers. It supports **multi-node** and **multi-GPU** setups where the trainer and inference engine run on separate GPUs. + +## When to Use NCCL + +- Training and inference on **separate GPUs** (possibly across nodes) +- **Tensor-parallel** inference with multiple workers that all need the updated weights +- You need high-bandwidth, low-latency weight transfer over NVLink or InfiniBand + +## How It Works + +1. The trainer and all inference workers join a shared NCCL process group using `StatelessProcessGroup` (vLLM's torch.distributed-independent group abstraction). +2. The trainer broadcasts weights to all workers simultaneously. Each worker receives and loads weights incrementally. +3. Optionally, **packed tensor broadcasting** batches multiple small tensors into larger buffers with double/triple buffering and CUDA stream overlap for higher throughput. This implementation is based on [NeMo-RL's packed tensor](https://github.com/NVIDIA-NeMo/RL/blob/main/nemo_rl/utils/packed_tensor.py). + +## Initialization + +NCCL requires explicit process group setup. The trainer and inference workers must agree on a master address, port, and world size. + +### Inference Side + +```python +from vllm.distributed.weight_transfer.base import WeightTransferInitRequest + +# rank_offset accounts for the trainer occupying rank 0 +llm.init_weight_transfer_engine( + WeightTransferInitRequest( + init_info=dict( + master_address=master_address, + master_port=master_port, + rank_offset=1, + world_size=world_size, # trainer + all inference workers + ) + ) +) +``` + +### Trainer Side + +```python +from vllm.distributed.weight_transfer.nccl_engine import ( + NCCLWeightTransferEngine, +) + +group = NCCLWeightTransferEngine.trainer_init( + dict( + master_address=master_address, + master_port=master_port, + world_size=world_size, + ) +) +``` + +!!! note + `trainer_init` always assigns the trainer to rank 0. Inference workers start at `rank_offset` (typically 1). + +## Sending Weights + +```python +from vllm.distributed.weight_transfer.nccl_engine import ( + NCCLTrainerSendWeightsArgs, + NCCLWeightTransferEngine, +) + +trainer_args = NCCLTrainerSendWeightsArgs( + group=group, + packed=True, # use packed broadcasting for efficiency +) + +NCCLWeightTransferEngine.trainer_send_weights( + iterator=model.named_parameters(), + trainer_args=trainer_args, +) +``` + +See [`NCCLTrainerSendWeightsArgs`](https://github.com/vllm-project/vllm/blob/main/vllm/distributed/weight_transfer/nccl_engine.py) for the full list of configurable fields. + +### Packed Tensor Broadcasting + +When `packed=True`, multiple weight tensors are packed into large contiguous buffers before broadcasting. This reduces the number of NCCL operations and uses double/triple buffering with dedicated CUDA streams for overlap between packing, broadcasting, and unpacking. + +Both the trainer (`NCCLTrainerSendWeightsArgs`) and inference side (`NCCLWeightTransferUpdateInfo`) must use matching `packed_buffer_size_bytes` and `packed_num_buffers` values. + +## Receiving Weights (Inference Side) + +The inference side triggers weight reception by calling `update_weights`: + +```python +from vllm.distributed.weight_transfer.base import WeightTransferUpdateRequest + +llm.update_weights( + WeightTransferUpdateRequest( + update_info=dict( + names=names, + dtype_names=dtype_names, + shapes=shapes, + packed=True, + ) + ) +) +``` + +The `names`, `dtype_names`, and `shapes` lists describe each parameter. These must match the order in which the trainer iterates over its parameters. + +## Examples + +- [RLHF with NCCL weight syncing (offline, Ray)](../../examples/rl/rlhf_nccl.md) - Trainer on one GPU, 2x tensor-parallel vLLM engine on two others, with packed NCCL weight broadcast +- [RLHF with async weight syncing (offline, Ray)](../../examples/rl/rlhf_async_new_apis.md) - Async generation with mid-flight pause, weight sync, resume, and validation against a fresh model +- [RLHF with NCCL weight syncing (online serving, HTTP)](../../examples/rl/rlhf_http_nccl.md) - Weight transfer with a running vLLM HTTP server using HTTP control plane and NCCL data plane diff --git a/examples/offline_inference/rlhf.py b/examples/offline_inference/rlhf.py deleted file mode 100644 index 6f05968ce065..000000000000 --- a/examples/offline_inference/rlhf.py +++ /dev/null @@ -1,147 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -""" -Demonstrates reinforcement learning from human feedback (RLHF) using vLLM and Ray. - -The script separates training and inference workloads onto distinct GPUs -so that Ray can manage process placement and inter-process communication. -A Hugging Face Transformer model occupies GPU 0 for training, whereas a -tensor-parallel vLLM inference engine occupies GPU 1–2. - -The example performs the following steps: - -* Load the training model on GPU 0. -* Split the inference model across GPUs 1–2 using vLLM's tensor parallelism - and Ray placement groups. -* Generate text from a list of prompts using the inference engine. -* Update the weights of the training model and broadcast the updated weights - to the inference engine by using a Ray collective RPC group. Note that - for demonstration purposes we simply zero out the weights. - -For a production-ready implementation that supports multiple training and -inference replicas, see the OpenRLHF framework: -https://github.com/OpenRLHF/OpenRLHF - -This example assumes a single-node cluster with three GPUs, but Ray -supports multi-node clusters. vLLM expects the GPUs are only used for vLLM -workloads. Residual GPU activity interferes with vLLM memory profiling and -causes unexpected behavior. -""" - -import os - -import ray -import torch -from ray.util.placement_group import placement_group -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from rlhf_utils import stateless_init_process_group -from transformers import AutoModelForCausalLM - -from vllm import LLM, SamplingParams -from vllm.utils.network_utils import get_ip, get_open_port - - -class MyLLM(LLM): - """Configure the vLLM worker for Ray placement group execution.""" - - def __init__(self, *args, **kwargs): - # Remove the top-level CUDA_VISIBLE_DEVICES variable set by Ray - # so that vLLM can manage its own device placement within the worker. - os.environ.pop("CUDA_VISIBLE_DEVICES", None) - super().__init__(*args, **kwargs) - - -# Load the OPT-125M model onto GPU 0 for the training workload. -train_model = AutoModelForCausalLM.from_pretrained("facebook/opt-125m") -train_model.to("cuda:0") - -# Initialize Ray and set the visible devices. The vLLM engine will -# be placed on GPUs 1 and 2. -os.environ["CUDA_VISIBLE_DEVICES"] = "1,2" -ray.init() - -# Create a placement group that reserves GPU 1–2 for the vLLM inference engine. -# Learn more about Ray placement groups: -# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html -pg_inference = placement_group([{"GPU": 1, "CPU": 0}] * 2) -ray.get(pg_inference.ready()) -scheduling_inference = PlacementGroupSchedulingStrategy( - placement_group=pg_inference, - placement_group_capture_child_tasks=True, - placement_group_bundle_index=0, -) - -# Launch the vLLM inference engine. The `enforce_eager` flag reduces -# start-up latency. -llm = ray.remote( - num_cpus=0, - num_gpus=0, - scheduling_strategy=scheduling_inference, -)(MyLLM).remote( - model="facebook/opt-125m", - enforce_eager=True, - worker_extension_cls="rlhf_utils.WorkerExtension", - tensor_parallel_size=2, - distributed_executor_backend="ray", -) - -# Generate text from the prompts. -prompts = [ - "Hello, my name is", - "The president of the United States is", - "The capital of France is", - "The future of AI is", -] - -sampling_params = SamplingParams(temperature=0) - -outputs = ray.get(llm.generate.remote(prompts, sampling_params)) - -print("-" * 50) -for output in outputs: - prompt = output.prompt - generated_text = output.outputs[0].text - print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}") - print("-" * 50) - -# Set up the communication channel between the training process and the -# inference engine. -master_address = get_ip() -master_port = get_open_port() - -handle = llm.collective_rpc.remote( - "init_weight_update_group", args=(master_address, master_port, 1, 3) -) - -model_update_group = stateless_init_process_group( - master_address, master_port, 0, 3, torch.device("cuda:0") -) -ray.get(handle) - -# Simulate a training step by zeroing out all model weights. -# In a real RLHF training loop the weights would be updated using the gradient -# from an RL objective such as PPO on a reward model. -for name, p in train_model.named_parameters(): - p.data.zero_() - -# Synchronize the updated weights to the inference engine. -for name, p in train_model.named_parameters(): - dtype_name = str(p.dtype).split(".")[-1] - handle = llm.collective_rpc.remote( - "update_weight", args=(name, dtype_name, p.shape) - ) - model_update_group.broadcast(p, src=0, stream=torch.cuda.current_stream()) - ray.get(handle) - -# Verify that the inference weights have been updated. -assert all(ray.get(llm.collective_rpc.remote("check_weights_changed"))) - -# Generate text with the updated model. The output is expected to be nonsense -# because the weights are zero. -outputs_updated = ray.get(llm.generate.remote(prompts, sampling_params)) -print("-" * 50) -for output in outputs_updated: - prompt = output.prompt - generated_text = output.outputs[0].text - print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}") - print("-" * 50) diff --git a/examples/offline_inference/rlhf_colocate.py b/examples/offline_inference/rlhf_colocate.py deleted file mode 100644 index ea4b3a6b911e..000000000000 --- a/examples/offline_inference/rlhf_colocate.py +++ /dev/null @@ -1,256 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -""" -Demonstrates how to co-locate a vLLM inference worker and training -actors on the same set of GPUs for reinforcement learning from human feedback -(RLHF) workloads. - -Ray serves as the distributed execution framework in this example. Ray -placement groups allocate both training actors and vLLM workers to the -same GPU bundles, enabling fast, in-GPU communication between the two -components. - -The script shows how to do the following: - -* Configure environment variables (`VLLM_RAY_PER_WORKER_GPUS` and - `VLLM_RAY_BUNDLE_INDICES`) so that vLLM workers land on the desired - devices. -* Exchange tensors between processes by means of CUDA inter-process - communication (IPC). CUDA IPC sidesteps NCCL limitations that occur - when multiple processes share a single GPU. - -Note that this example assumes a single-node cluster with four GPUs, but Ray -supports multi-node clusters. vLLM expects exclusive use of the GPUs during -its initialization for memory profiling. Residual GPU activity interferes -with vLLM memory profiling and causes unexpected behavior. - -Learn more about Ray placement groups: -https://docs.ray.io/en/latest/placement-groups.html -""" - -import gc -import os -import sys - -import ray -import torch -import zmq -from ray.util.placement_group import placement_group -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from torch.multiprocessing.reductions import reduce_tensor - -from vllm import LLM - -if torch.version.hip is not None: - print("Skipping test for ROCm. Ray is unsupported on vLLM ROCm.") - sys.exit(0) - - -class MyLLM(LLM): - """Configure the vLLM worker for Ray placement group execution. - - The constructor sets environment variables that allow multiple vLLM - workers to share a single physical GPU and that encode the bundle - indices assigned by the placement group. - - Args: - *args: Positional arguments forwarded to `vllm.LLM`. - bundle_indices (list[int]): Placement-group bundle indices - assigned to this worker. - **kwargs: Keyword arguments forwarded to `vllm.LLM`. - """ - - def __init__(self, *args, bundle_indices: list[int], **kwargs): - # Prevent Ray from manipulating the top-level CUDA_VISIBLE_DEVICES variable - # so that vLLM can its own device placement inside the worker. - os.environ.pop("CUDA_VISIBLE_DEVICES", None) - # Each worker uses 0.4 GPU so that two instances fit on the same GPUs. - os.environ["VLLM_RAY_PER_WORKER_GPUS"] = "0.4" - os.environ["VLLM_RAY_BUNDLE_INDICES"] = ",".join(map(str, bundle_indices)) - print(f"creating LLM with bundle_indices={bundle_indices}") - super().__init__(*args, **kwargs) - - -class RayTrainingActor: - """Training actor that hosts a Facebook OPT-125M model from Hugging Face. - - The model is loaded onto the first GPU assigned to this actor, and expose - the CUDA IPC handles so that colocated vLLM workers can map tensors - directly. - """ - - def __init__(self): - # Ray sets CUDA_VISIBLE_DEVICES to the GPUs assigned to this actor. - from transformers import AutoModelForCausalLM - - self.model = AutoModelForCausalLM.from_pretrained("facebook/opt-125m") - self.model.to("cuda:0") - # Zero out all the parameters. - for name, p in self.model.named_parameters(): - p.data.zero_() - torch.accelerator.synchronize() - # The argument for `get_device_uuid` is the index of the GPU in the - # list of visible devices. - from vllm.platforms import current_platform - - self.device_uuid = current_platform.get_device_uuid(0) - self.zmq_context = zmq.Context() - self.zmq_address_counter = 0 - self.zmq_handle = None - - def report_device_id(self) -> str: - return self.device_uuid - - def get_zmq_handles(self) -> dict[str, str]: - suffix = f"{self.device_uuid}-{self.zmq_address_counter}" - self.zmq_handle = f"ipc:///tmp/rl-colocate-zmq-{suffix}.sock" - self.zmq_address_counter += 1 - return {self.device_uuid: self.zmq_handle} - - def update_weights(self): - # align size to avoid misaligned address - align_size = 256 - - def get_size(p: torch.Tensor) -> int: - return (p.nbytes + align_size - 1) // align_size * align_size - - named_parameters: dict[str, torch.nn.Parameter] = dict( - self.model.named_parameters() - ) - max_tensor_size = max(get_size(p) for p in named_parameters.values()) - # use max_tensor_size * 2 as buffer size - buffer = torch.empty(max_tensor_size * 2, dtype=torch.uint8, device="cuda:0") - s = self.zmq_context.socket(zmq.REQ) - s.bind(self.zmq_handle) - handle = reduce_tensor(buffer) - - offset = 0 - buckets: list[tuple[list[dict], list[torch.Tensor]]] = [] - named_tensors: list[dict] = [] - real_tensors: list[torch.Tensor] = [] - for name, p in named_parameters.items(): - size = get_size(p) - if offset + size > buffer.numel(): - buckets.append((named_tensors, real_tensors)) - named_tensors, real_tensors = [], [] - offset = 0 - # assume tensors are contiguous - named_tensors.append( - {"name": name, "dtype": p.dtype, "shape": p.shape, "offset": offset} - ) - real_tensors.append(p) - offset += size - if named_tensors: - buckets.append((named_tensors, real_tensors)) - s.send_pyobj(handle) - s.recv() - for named_tensors, real_tensors in buckets: - offset = 0 - for p in real_tensors: - buffer[offset : offset + p.nbytes].data.copy_( - p.data.view(-1).view(dtype=torch.uint8), non_blocking=True - ) - offset += get_size(p) - torch.accelerator.synchronize() - s.send_pyobj(named_tensors) - s.recv() - s.send_pyobj(None) - s.recv() - s.close() - del buffer - gc.collect() - torch.accelerator.empty_cache() - - -# Ray manages four GPUs. - -os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3" -ray.init() - -# Co-locate vLLM instances and training actors on the same set of GPUs: -# * GPU 0 and 1: training actor 0, training actor 1, and vLLM instance 0 -# (tensor parallelism = 2). -# * GPU 2 and 3: training actor 2, training actor 3, and vLLM instance 1 -# (tensor parallelism = 2). - -pg = placement_group([{"GPU": 1, "CPU": 0}] * 4) -ray.get(pg.ready()) -print(f"placement group has bundles {pg.bundle_specs=}") - -training_actors = [] -training_actor_device_ids = [] -inference_engines = [] -inference_engine_device_ids = [] - -for bundle_index in [0, 1, 2, 3]: - training_actor = ray.remote( - num_cpus=0, - num_gpus=0.4, - scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=pg, - placement_group_capture_child_tasks=True, - placement_group_bundle_index=bundle_index, - ), - )(RayTrainingActor).remote() - training_actors.append(training_actor) - -for bundle_index, training_actor in enumerate(training_actors): - device_id = ray.get(training_actor.report_device_id.remote()) - print(f"training actor {bundle_index} is on {device_id}") - training_actor_device_ids.append(device_id) - -for i, bundle_indices in enumerate([[0, 1], [2, 3]]): - # Use the following syntax instead of the @ray.remote decorator so that - # the placement group is customized for each bundle. - llm = ray.remote( - num_cpus=0, - num_gpus=0, - scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=pg, - placement_group_capture_child_tasks=True, - ), - )(MyLLM).remote( - model="facebook/opt-125m", - enforce_eager=True, - worker_extension_cls="rlhf_utils.ColocateWorkerExtension", - tensor_parallel_size=2, - distributed_executor_backend="ray", - gpu_memory_utilization=0.4, - bundle_indices=bundle_indices, - ) - inference_engines.append(llm) - # Do not call any method on the inference engine at this point; the call - # blocks until the vLLM instance finishes initialization. - -for i, llm in enumerate(inference_engines): - inference_engine_device_ids.append( - ray.get(llm.collective_rpc.remote("report_device_id", args=tuple())) - ) - print(f"inference engine {i} is on {inference_engine_device_ids[-1]}") - -# Verify placement: the first two training actors share the same GPUs as -# the first inference engine. -assert training_actor_device_ids[:2] == inference_engine_device_ids[0] -# Verify placement: the last two training actors share the same GPUs as -# the second inference engine. -assert training_actor_device_ids[2:] == inference_engine_device_ids[1] - -print("Gather all the ZMQ handles from the training actors.") -zmq_handles = {} -for actor in training_actors: - zmq_handles.update(ray.get(actor.get_zmq_handles.remote())) - -print(f"ZMQ handles: {zmq_handles}") - -print("Update the weights of the inference engines.") -ray.get( - [actor.update_weights.remote() for actor in training_actors] - + [ - llm.collective_rpc.remote("update_weights_from_ipc", args=(zmq_handles,)) - for llm in inference_engines - ] -) - -print("Check if the weights are updated.") -for llm in inference_engines: - assert ray.get(llm.collective_rpc.remote("check_weights_changed", args=tuple())) diff --git a/examples/offline_inference/rlhf_online_quant.py b/examples/offline_inference/rlhf_online_quant.py deleted file mode 100644 index 2d98ad22c589..000000000000 --- a/examples/offline_inference/rlhf_online_quant.py +++ /dev/null @@ -1,162 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -""" -Demonstrates reinforcement learning from human feedback (RLHF) using vLLM and Ray. - -The script separates training and inference workloads onto distinct GPUs -so that Ray can manage process placement and inter-process communication. -A Hugging Face Transformer model occupies GPU 0 for training, whereas a -tensor-parallel vLLM inference engine occupies GPU 1–2. - -The example performs the following steps: - -* Load the training model on GPU 0. -* Split the inference model across GPUs 1–2 using vLLM's tensor parallelism - and Ray placement groups. -* Generate text from a list of prompts using the inference engine. -* Update the weights of the training model and broadcast the updated weights - to the inference engine by using a Ray collective RPC group. Note that - for demonstration purposes we simply zero out the weights. - -For a production-ready implementation that supports multiple training and -inference replicas, see the OpenRLHF framework: -https://github.com/OpenRLHF/OpenRLHF - -This example assumes a single-node cluster with three GPUs, but Ray -supports multi-node clusters. vLLM expects the GPUs are only used for vLLM -workloads. Residual GPU activity interferes with vLLM memory profiling and -causes unexpected behavior. -""" - -import json -import os - -import ray -import torch -from ray.util.placement_group import placement_group -from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from rlhf_utils import stateless_init_process_group -from torchao.core.config import config_to_dict -from torchao.quantization import ( - Float8DynamicActivationFloat8WeightConfig, - PerRow, -) -from transformers import AutoModelForCausalLM - -from vllm import LLM, SamplingParams -from vllm.utils.network_utils import get_ip, get_open_port - - -class MyLLM(LLM): - """Configure the vLLM worker for Ray placement group execution.""" - - def __init__(self, *args, **kwargs): - # Remove the top-level CUDA_VISIBLE_DEVICES variable set by Ray - # so that vLLM can manage its own device placement within the worker. - os.environ.pop("CUDA_VISIBLE_DEVICES", None) - super().__init__(*args, **kwargs) - - -# Load the OPT-125M model onto GPU 0 for the training workload. -train_model = AutoModelForCausalLM.from_pretrained("facebook/opt-125m") -train_model.to("cuda:0") - -# Initialize Ray and set the visible devices. The vLLM engine will -# be placed on GPUs 1 and 2. -os.environ["CUDA_VISIBLE_DEVICES"] = "1,2" -ray.init() - -# Create a placement group that reserves GPU 1–2 for the vLLM inference engine. -# Learn more about Ray placement groups: -# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html -pg_inference = placement_group([{"GPU": 1, "CPU": 0}] * 2) -ray.get(pg_inference.ready()) -scheduling_inference = PlacementGroupSchedulingStrategy( - placement_group=pg_inference, - placement_group_capture_child_tasks=True, - placement_group_bundle_index=0, -) - -# Launch the vLLM inference engine. The `enforce_eager` flag reduces -# start-up latency. - -# generate torchao quantization config for RL rollout -# see https://github.com/vllm-project/vllm/pull/23014 for instructions to -# use serialized config files instead of passing around json string -config = Float8DynamicActivationFloat8WeightConfig(granularity=PerRow()) - -json_str = json.dumps(config_to_dict(config)) - -llm = ray.remote( - num_cpus=0, - num_gpus=0, - scheduling_strategy=scheduling_inference, -)(MyLLM).remote( - model="facebook/opt-125m", - hf_overrides={"quantization_config_dict_json": json_str}, - enforce_eager=True, - worker_extension_cls="rlhf_utils.WorkerExtension", - tensor_parallel_size=2, - distributed_executor_backend="ray", -) - -# Generate text from the prompts. -prompts = [ - "Hello, my name is", - "The president of the United States is", - "The capital of France is", - "The future of AI is", -] - -sampling_params = SamplingParams(temperature=0) - -outputs = ray.get(llm.generate.remote(prompts, sampling_params)) - -print("-" * 50) -for output in outputs: - prompt = output.prompt - generated_text = output.outputs[0].text - print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}") - print("-" * 50) - -# Set up the communication channel between the training process and the -# inference engine. -master_address = get_ip() -master_port = get_open_port() - -handle = llm.collective_rpc.remote( - "init_weight_update_group", args=(master_address, master_port, 1, 3) -) - -model_update_group = stateless_init_process_group( - master_address, master_port, 0, 3, torch.device("cuda:0") -) -ray.get(handle) - -# Simulate a training step by zeroing out all model weights. -# In a real RLHF training loop the weights would be updated using the gradient -# from an RL objective such as PPO on a reward model. -for name, p in train_model.named_parameters(): - p.data.zero_() - -# Synchronize the updated weights to the inference engine. -for name, p in train_model.named_parameters(): - dtype_name = str(p.dtype).split(".")[-1] - handle = llm.collective_rpc.remote( - "update_weight", args=(name, dtype_name, p.shape) - ) - model_update_group.broadcast(p, src=0, stream=torch.cuda.current_stream()) - ray.get(handle) - -# Verify that the inference weights have been updated. -assert all(ray.get(llm.collective_rpc.remote("check_weights_changed"))) - -# Generate text with the updated model. The output is expected to be nonsense -# because the weights are zero. -outputs_updated = ray.get(llm.generate.remote(prompts, sampling_params)) -print("-" * 50) -for output in outputs_updated: - prompt = output.prompt - generated_text = output.outputs[0].text - print(f"Prompt: {prompt!r}\nGenerated text: {generated_text!r}") - print("-" * 50) diff --git a/examples/offline_inference/rlhf_utils.py b/examples/offline_inference/rlhf_utils.py deleted file mode 100644 index e9fc393bb549..000000000000 --- a/examples/offline_inference/rlhf_utils.py +++ /dev/null @@ -1,168 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# SPDX-FileCopyrightText: Copyright contributors to the vLLM project -import gc -from collections.abc import Callable -from typing import TypedDict - -import torch -import zmq - - -def stateless_init_process_group(master_address, master_port, rank, world_size, device): - """ - vLLM provides `StatelessProcessGroup` to create a process group - without considering the global process group in torch.distributed. - It is recommended to create `StatelessProcessGroup`, and then initialize - the data-plane communication (NCCL) between external (train processes) - and vLLM workers. - """ - from vllm.distributed.device_communicators.pynccl import PyNcclCommunicator - from vllm.distributed.utils import StatelessProcessGroup - - pg = StatelessProcessGroup.create( - host=master_address, port=master_port, rank=rank, world_size=world_size - ) - pynccl = PyNcclCommunicator(pg, device=device) - return pynccl - - -class WorkerExtension: - """ - The class for vLLM's worker to inherit from. - By defining an extension class, the code can work no matter what is - the underlying worker class. - - NOTE: we define this class in a separate module, and the main module - should pass the full qualified name as `worker_extension_cls` argument. - """ - - def init_weight_update_group( - self, master_address, master_port, rank_offset, world_size - ): - from vllm.distributed.parallel_state import get_world_group - - rank = get_world_group().rank + rank_offset - self.model_update_group = stateless_init_process_group( - master_address, - master_port, - rank, - world_size, - self.device, - ) - - def update_weight(self, name, dtype_name, shape): - dtype = getattr(torch, dtype_name) - weight = torch.empty(shape, dtype=dtype, device="cuda") - self.model_update_group.broadcast( - weight, src=0, stream=torch.cuda.current_stream() - ) - - self.model_runner.model.load_weights(weights=[(name, weight)]) - - del weight - - def check_weights_changed(self): - """ - Check if the weights are updated to 0. - """ - weights_updated = True - for name, p in self.model_runner.model.named_parameters(): - weights_updated = weights_updated and torch.allclose(p, torch.zeros_like(p)) - return weights_updated - - -def rebuild_ipc( - handle: tuple[Callable, tuple], device_id: int | None = None -) -> torch.Tensor: - func, args = handle - list_args = list(args) - if device_id is not None: - # the key is to change device id to the current device id - # in case two processes have different CUDA_VISIBLE_DEVICES - list_args[6] = device_id - buffer = func(*list_args) - return buffer - - -class FlattenedTensorMetadata(TypedDict): - name: str - shape: torch.Size - dtype: torch.dtype - # specify the start offset of this tensor in shared ipc_buffer tensor - offset: int - - -class ColocateWorkerExtension: - """ - The class for vLLM's worker to inherit from, in the colocate setting. - By defining an extension class, the code can work no matter what is - the underlying worker class. - - NOTE: we define this class in a separate module, and the main module - should pass the full qualified name as `worker_extension_cls` argument. - """ - - def update_weights_from_ipc(self, zmq_handles: dict[str, str]): - from vllm.model_executor.model_loader.utils import process_weights_after_loading - - assert self.device is not None - if not hasattr(self, "_zmq_ctx") or self._zmq_ctx is None: - self._zmq_ctx = zmq.Context() - socket = self._zmq_ctx.socket(zmq.REP) - socket.connect(zmq_handles[self.report_device_id()]) - buffer: torch.Tensor | None = None - while True: - payload: tuple[Callable, tuple] | list[FlattenedTensorMetadata] | None = ( - socket.recv_pyobj() - ) - if payload is None: - # means the update is done - process_weights_after_loading( - self.model_runner.model, self.model_config, self.device - ) - torch.accelerator.synchronize() - socket.send(b"") - break - if isinstance(payload, tuple): - # an ipc handle that vLLM can use `func, args = handle` - # and `func(*args)` to rebuild GPU tensor. - buffer = rebuild_ipc(payload, self.device.index) - assert buffer.dtype == torch.uint8 - socket.send(b"") - continue - assert isinstance(payload, list) - assert buffer is not None - weights = [] - for item in payload: - shape = item["shape"] - if isinstance(shape, (list, tuple)): - shape = torch.Size(shape) - assert isinstance(shape, torch.Size) - dtype, offset = item["dtype"], item["offset"] - size = dtype.itemsize * shape.numel() - tensor = buffer[offset : offset + size].view(dtype=dtype).view(shape) - weights.append((item["name"], tensor)) - self.model_runner.model.load_weights(weights=weights) - del weights - torch.accelerator.synchronize() - socket.send(b"") - - socket.close() - del buffer - gc.collect() - torch.accelerator.empty_cache() - - def report_device_id(self) -> str: - from vllm.platforms import current_platform - - self.device_uuid = current_platform.get_device_uuid(self.device.index) - return self.device_uuid - - def check_weights_changed(self): - """ - Check if the weights are updated to 0. - """ - weights_updated = True - for name, p in self.model_runner.model.named_parameters(): - weights_updated = weights_updated and torch.allclose(p, torch.zeros_like(p)) - return weights_updated diff --git a/examples/offline_inference/new_weight_syncing/rlhf_async_new_apis.py b/examples/rl/rlhf_async_new_apis.py similarity index 100% rename from examples/offline_inference/new_weight_syncing/rlhf_async_new_apis.py rename to examples/rl/rlhf_async_new_apis.py diff --git a/examples/online_serving/new_weight_syncing/rlhf_http_ipc.py b/examples/rl/rlhf_http_ipc.py similarity index 100% rename from examples/online_serving/new_weight_syncing/rlhf_http_ipc.py rename to examples/rl/rlhf_http_ipc.py diff --git a/examples/online_serving/new_weight_syncing/rlhf_http_nccl.py b/examples/rl/rlhf_http_nccl.py similarity index 100% rename from examples/online_serving/new_weight_syncing/rlhf_http_nccl.py rename to examples/rl/rlhf_http_nccl.py diff --git a/examples/offline_inference/new_weight_syncing/rlhf_ipc.py b/examples/rl/rlhf_ipc.py similarity index 100% rename from examples/offline_inference/new_weight_syncing/rlhf_ipc.py rename to examples/rl/rlhf_ipc.py diff --git a/examples/offline_inference/new_weight_syncing/rlhf_nccl.py b/examples/rl/rlhf_nccl.py similarity index 100% rename from examples/offline_inference/new_weight_syncing/rlhf_nccl.py rename to examples/rl/rlhf_nccl.py