Skip to content

Commit c01a6cb

Browse files
rkooo567youkaichao
andauthored
[Ray backend] Better error when pg topology is bad. (#7584)
Co-authored-by: youkaichao <[email protected]>
1 parent b903e1b commit c01a6cb

File tree

3 files changed

+197
-9
lines changed

3 files changed

+197
-9
lines changed

.buildkite/test-pipeline.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ steps:
293293
commands:
294294
- # the following commands are for the first node, with ip 192.168.10.10 (ray environment already set up)
295295
- VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py
296+
- VLLM_MULTI_NODE=1 pytest -v -s distributed/test_multi_node_assignment.py
296297
- VLLM_MULTI_NODE=1 pytest -v -s distributed/test_pipeline_parallel.py
297298
- # the following commands are for the second node, with ip 192.168.10.11 (ray environment already set up)
298299
- VLLM_TEST_SAME_HOST=0 torchrun --nnodes 2 --nproc-per-node=2 --rdzv_backend=c10d --rdzv_endpoint=192.168.10.10 distributed/test_same_node.py
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""Make sure ray assigns GPU workers to the correct node.
2+
3+
Run:
4+
```sh
5+
cd $VLLM_PATH/tests
6+
7+
pytest distributed/test_multi_node_assignment.py
8+
```
9+
"""
10+
11+
import os
12+
13+
import pytest
14+
import ray
15+
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
16+
17+
from vllm import initialize_ray_cluster
18+
from vllm.config import ParallelConfig
19+
from vllm.executor.ray_utils import _wait_until_pg_removed
20+
from vllm.utils import get_ip
21+
22+
VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"
23+
24+
25+
@pytest.mark.skipif(not VLLM_MULTI_NODE,
26+
reason="Need at least 2 nodes to run the test.")
27+
def test_multi_node_assignment() -> None:
28+
29+
# NOTE: important to keep this class definition here
30+
# to let ray use cloudpickle to serialize it.
31+
class Actor:
32+
33+
def get_ip(self):
34+
return get_ip()
35+
36+
for _ in range(10):
37+
config = ParallelConfig(1, 2)
38+
initialize_ray_cluster(config)
39+
40+
current_ip = get_ip()
41+
workers = []
42+
for bundle_id, bundle in enumerate(
43+
config.placement_group.bundle_specs):
44+
if not bundle.get("GPU", 0):
45+
continue
46+
scheduling_strategy = PlacementGroupSchedulingStrategy(
47+
placement_group=config.placement_group,
48+
placement_group_capture_child_tasks=True,
49+
placement_group_bundle_index=bundle_id,
50+
)
51+
52+
worker = ray.remote(
53+
num_cpus=0,
54+
num_gpus=1,
55+
scheduling_strategy=scheduling_strategy,
56+
)(Actor).remote()
57+
worker_ip = ray.get(worker.get_ip.remote())
58+
assert worker_ip == current_ip
59+
workers.append(worker)
60+
61+
for worker in workers:
62+
ray.kill(worker)
63+
64+
_wait_until_pg_removed(config.placement_group)

vllm/executor/ray_utils.py

+132-9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import List, Optional, Tuple, Union
1+
import time
2+
from collections import defaultdict
3+
from typing import Dict, List, Optional, Tuple, Union
24

35
import msgspec
46

@@ -11,9 +13,13 @@
1113
from vllm.worker.worker_base import WorkerWrapperBase
1214

1315
logger = init_logger(__name__)
16+
PG_WAIT_TIMEOUT = 1800
1417

1518
try:
1619
import ray
20+
from ray._private.state import available_resources_per_node
21+
from ray.util import placement_group_table
22+
from ray.util.placement_group import PlacementGroup
1723

1824
class RayWorkerWrapper(WorkerWrapperBase):
1925
"""Ray wrapper for vllm.worker.Worker, allowing Worker to be
@@ -98,6 +104,106 @@ def assert_ray_available():
98104
"`pip install ray`.") from ray_import_err
99105

100106

107+
def _verify_bundles(placement_group: "PlacementGroup",
108+
parallel_config: ParallelConfig, device_str: str):
109+
"""Verify a given placement group has bundles located in the right place.
110+
111+
There are 2 rules.
112+
- Warn if all tensor parallel workers cannot fit in a single node.
113+
- Fail if driver node is not included in a placement group.
114+
"""
115+
assert ray.is_initialized(), (
116+
"Ray is not initialized although distributed-executor-backend is ray.")
117+
pg_data = placement_group_table(placement_group)
118+
# bundle_idx -> node_id
119+
bundle_to_node_ids = pg_data["bundles_to_node_id"]
120+
# bundle_idx -> bundle (e.g., {"GPU": 1})
121+
bundles = pg_data["bundles"]
122+
# node_id -> List of bundle (e.g., {"GPU": 1})
123+
node_id_to_bundle: Dict[str, List[Dict[str, float]]] = defaultdict(list)
124+
125+
for bundle_idx, node_id in bundle_to_node_ids.items():
126+
node_id_to_bundle[node_id].append(bundles[bundle_idx])
127+
driver_node_id = ray.get_runtime_context().get_node_id()
128+
129+
if driver_node_id not in node_id_to_bundle:
130+
raise RuntimeError(
131+
f"driver node id {driver_node_id} is not included in a placement "
132+
f"group {placement_group.id}. Node id -> bundles "
133+
f"{node_id_to_bundle}. "
134+
"You don't have enough GPUs available in a current node. Check "
135+
"`ray status` to see if you have available GPUs in a node "
136+
f"{driver_node_id} before starting an vLLM engine.")
137+
138+
for node_id, bundles in node_id_to_bundle.items():
139+
if len(bundles) < parallel_config.tensor_parallel_size:
140+
logger.warning(
141+
"tensor_parallel_size=%d "
142+
"is bigger than a reserved number of %ss (%d "
143+
"%ss) in a node %s. Tensor parallel workers can be "
144+
"spread out to 2+ nodes which can degrade the performance "
145+
"unless you have fast interconnect across nodes, like "
146+
"Infiniband. To resolve this issue, make sure you have more "
147+
"than %d GPUs available at each node.",
148+
parallel_config.tensor_parallel_size, device_str, len(bundles),
149+
device_str, node_id, parallel_config.tensor_parallel_size)
150+
151+
152+
def _wait_until_pg_ready(current_placement_group: "PlacementGroup"):
153+
"""Wait until a placement group is ready.
154+
155+
It prints the informative log messages if the placement group is
156+
not created within time.
157+
158+
"""
159+
# Wait until PG is ready - this will block until all
160+
# requested resources are available, and will timeout
161+
# if they cannot be provisioned.
162+
placement_group_specs = current_placement_group.bundle_specs
163+
164+
s = time.time()
165+
pg_ready_ref = current_placement_group.ready()
166+
wait_interval = 10
167+
while time.time() - s < PG_WAIT_TIMEOUT:
168+
ready, _ = ray.wait([pg_ready_ref], timeout=wait_interval)
169+
if len(ready) > 0:
170+
break
171+
172+
# Exponential backoff for warning print.
173+
wait_interval *= 2
174+
logger.info(
175+
"Waiting for creating a placement group of specs for "
176+
"%d seconds. specs=%s. Check "
177+
"`ray status` to see if you have enough resources.",
178+
int(time.time() - s), placement_group_specs)
179+
180+
try:
181+
ray.get(pg_ready_ref, timeout=0)
182+
except ray.exceptions.GetTimeoutError:
183+
raise ValueError(
184+
"Cannot provide a placement group of "
185+
f"{placement_group_specs=} within {PG_WAIT_TIMEOUT} seconds. See "
186+
"`ray status` to make sure the cluster has enough resources."
187+
) from None
188+
189+
190+
def _wait_until_pg_removed(current_placement_group: "PlacementGroup"):
191+
ray.util.remove_placement_group(current_placement_group)
192+
s = time.time()
193+
wait_interval = 10
194+
while time.time() - s < PG_WAIT_TIMEOUT:
195+
pg = ray.util.get_current_placement_group()
196+
if pg is None:
197+
break
198+
199+
# Exponential backoff for warning print.
200+
wait_interval *= 2
201+
logger.info(
202+
"Waiting for removing a placement group of specs for "
203+
"%d seconds.", int(time.time() - s))
204+
time.sleep(wait_interval)
205+
206+
101207
def initialize_ray_cluster(
102208
parallel_config: ParallelConfig,
103209
ray_address: Optional[str] = None,
@@ -156,15 +262,32 @@ def initialize_ray_cluster(
156262
f"The number of required {device_str}s exceeds the total "
157263
f"number of available {device_str}s in the placement group.")
158264
# Create a new placement group
159-
placement_group_specs = ([{
160-
device_str: 1
161-
}] * parallel_config.world_size)
265+
placement_group_specs: List[Dict[str, float]] = ([{
266+
device_str: 1.0
267+
} for _ in range(parallel_config.world_size)])
268+
269+
# vLLM engine is also a worker to execute model with an accelerator,
270+
# so it requires to have the device in a current node. Check if
271+
# the current node has at least one device.
272+
current_ip = get_ip()
273+
current_node_id = ray.get_runtime_context().get_node_id()
274+
current_node_resource = available_resources_per_node()[current_node_id]
275+
if current_node_resource.get(device_str, 0) < 1:
276+
raise ValueError(
277+
f"Current node has no {device_str} available. "
278+
f"{current_node_resource=}. vLLM engine cannot start without "
279+
f"{device_str}. Make sure you have at least 1 {device_str} "
280+
f"available in a node {current_node_id=} {current_ip=}.")
281+
# This way, at least bundle is required to be created in a current
282+
# node.
283+
placement_group_specs[0][f"node:{current_ip}"] = 0.001
284+
285+
# By default, Ray packs resources as much as possible.
162286
current_placement_group = ray.util.placement_group(
163-
placement_group_specs)
164-
# Wait until PG is ready - this will block until all
165-
# requested resources are available, and will timeout
166-
# if they cannot be provisioned.
167-
ray.get(current_placement_group.ready(), timeout=1800)
287+
placement_group_specs, strategy="PACK")
288+
_wait_until_pg_ready(current_placement_group)
168289

290+
assert current_placement_group is not None
291+
_verify_bundles(current_placement_group, parallel_config, device_str)
169292
# Set the placement group in the parallel config
170293
parallel_config.placement_group = current_placement_group

0 commit comments

Comments
 (0)