|
28 | 28 | from vllm.v1.engine.coordinator import DPCoordinator |
29 | 29 | from vllm.v1.engine.core import EngineCore, EngineCoreProc |
30 | 30 | from vllm.v1.engine.exceptions import EngineDeadError |
31 | | -from vllm.v1.engine.utils import (CoreEngine, CoreEngineActorManager, |
| 31 | +from vllm.v1.engine.utils import (CoreEngineActorManager, |
32 | 32 | CoreEngineProcManager, EngineZmqAddresses, |
33 | 33 | launch_core_engines) |
34 | 34 | from vllm.v1.executor.abstract import Executor |
@@ -94,6 +94,8 @@ def make_async_mp_client( |
94 | 94 | # External load balancer - client per DP rank. |
95 | 95 | return DPAsyncMPClient(*client_args) |
96 | 96 | # Internal load balancer - client balances to all DP ranks. |
| 97 | + if parallel_config.data_parallel_backend == "ray": |
| 98 | + return RayDPClient(*client_args) |
97 | 99 | return DPLBAsyncMPClient(*client_args) |
98 | 100 | return AsyncMPClient(*client_args) |
99 | 101 |
|
@@ -1115,7 +1117,7 @@ def _init_engines_direct(self, vllm_config: VllmConfig, local_only: bool, |
1115 | 1117 |
|
1116 | 1118 | async def _send_reconfig_message( |
1117 | 1119 | self, reconfig_request: ReconfigureDistributedRequest, |
1118 | | - engine: CoreEngine) -> asyncio.Future: |
| 1120 | + engine: EngineIdentity) -> asyncio.Future: |
1119 | 1121 | """Send reconfiguration message and return the result future without |
1120 | 1122 | waiting for completion.""" |
1121 | 1123 | call_id = uuid.uuid1().int >> 64 |
@@ -1160,17 +1162,17 @@ async def scale_up(self, new_data_parallel_size: int) -> None: |
1160 | 1162 | # Phase 2: Create new engines now that reconfig messages have been sent |
1161 | 1163 | # self.resources.engine_manager is guaranteed to be |
1162 | 1164 | # CoreEngineActorManager for RayDPClient |
1163 | | - assert isinstance(self.resources.engine_manager, CoreEngineActorManager) |
| 1165 | + assert isinstance(self.resources.engine_manager, |
| 1166 | + CoreEngineActorManager) |
1164 | 1167 | self.resources.engine_manager.scale_up(self.vllm_config, |
1165 | 1168 | new_data_parallel_size) |
1166 | 1169 |
|
1167 | 1170 | # Create new CoreEngine objects for the new engines |
1168 | 1171 | new_engine_identities = set() |
1169 | 1172 | for i in range(current_dp_size, new_data_parallel_size): |
1170 | | - # TODO(yongji): check if the engine is local |
1171 | | - new_engine = CoreEngine(index=i, local=False) |
| 1173 | + new_engine = i.to_bytes(2, "little") |
1172 | 1174 | self.core_engines.append(new_engine) |
1173 | | - new_engine_identities.add(new_engine.identity) |
| 1175 | + new_engine_identities.add(new_engine) |
1174 | 1176 |
|
1175 | 1177 | # Wait for ready messages from new engines on the input socket |
1176 | 1178 | sync_input_socket = zmq.Socket.shadow(self.input_socket) |
@@ -1233,7 +1235,8 @@ async def scale_down(self, new_data_parallel_size: int) -> None: |
1233 | 1235 |
|
1234 | 1236 | await asyncio.gather(*reconfig_futures) |
1235 | 1237 |
|
1236 | | - assert isinstance(self.resources.engine_manager, CoreEngineActorManager) |
| 1238 | + assert isinstance(self.resources.engine_manager, |
| 1239 | + CoreEngineActorManager) |
1237 | 1240 | self.resources.engine_manager.scale_down(current_dp_size, |
1238 | 1241 | new_data_parallel_size) |
1239 | 1242 |
|
|
0 commit comments