Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vllm/distributed/device_communicators/cuda_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def broadcast(self, tensor: torch.Tensor, src: int = 0) -> torch.Tensor:

def destroy(self):
if self.pynccl_comm is not None:
self.pynccl_comm.destroy()
self.pynccl_comm = None
if self.ca_comm is not None:
self.ca_comm = None
Expand Down
7 changes: 7 additions & 0 deletions vllm/distributed/device_communicators/pynccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ def __init__(
stream.synchronize()
del data

def destroy(self):
if self.available and not self.disabled:
with torch.accelerator.device_index(self.device.index):
self.nccl.ncclCommDestroy(self.comm)
self.available = False
self.disabled = True
Comment on lines +150 to +153

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

paranoia: consider setting self.comm to None to prevent use after destroy but before garbage collection


def all_reduce(
self,
in_tensor: torch.Tensor,
Expand Down
102 changes: 68 additions & 34 deletions vllm/distributed/elastic_ep/elastic_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,37 @@ def execute(self, execute_method: str, *args, **kwargs):
raise ValueError(f"Unknown execute method: {execute_method}")
return method(*args, **kwargs)

def _set_eplb_suppressed(self, suppressed: bool) -> None:
self.worker.model_runner.eep_eplb_suppressed = suppressed
ep_group = get_standby_ep_group() or get_ep_group()
if ep_group.rank == 0:
logger.info(
"[Elastic EP] EPLB %s elastic scaling transition",
"disabled during" if suppressed else "re-enabled after",
)

def load_model(self) -> None:
(
expanded_physical_to_logical,
num_logical_experts,
old_num_physical_experts,
) = self.receive_expert_mapping()
num_physical_experts = expanded_physical_to_logical.shape[1]
self.worker.parallel_config.eplb_config.num_redundant_experts = (
num_physical_experts - num_logical_experts
)
self.worker.load_model(load_dummy_weights=True)
self.worker.model_runner.setup_eplb_from_mapping(
expanded_physical_to_logical, old_num_physical_experts
)
self._set_eplb_suppressed(True)

def create_standby_groups(
self, reconfig_request: ReconfigureDistributedRequest
) -> None:
self.reconfig_request = reconfig_request
new_dp_size = reconfig_request.new_data_parallel_size
old_dp_size = get_dp_group().world_size
world_size = self.worker.vllm_config.parallel_config.world_size
new_world_size_across_dp = world_size * new_dp_size
updated_config = copy.copy(self.worker.vllm_config)
Expand All @@ -165,11 +191,8 @@ def create_standby_groups(
coord_store_port=reconfig_request.coord_store_port,
enable_eplb=updated_config.parallel_config.enable_eplb,
)
self.worker.model_runner.eep_eplb_suppressed = True
standby_ep_group = get_standby_ep_group()
assert standby_ep_group is not None
if standby_ep_group.rank == 0:
logger.info("[Elastic EP] EPLB disabled during elastic scaling transition")
if new_dp_size > old_dp_size:
self._set_eplb_suppressed(True)

def transfer_weights(self, old_dp_size: int, new_dp_size: int) -> None:
standby_dp_group = get_standby_dp_group()
Expand Down Expand Up @@ -237,13 +260,31 @@ def broadcast_expert_mapping(self) -> None:
device=self.worker.device,
)

def _release_cuda_graphs(self) -> None:
if isinstance(self.worker.model_runner.model, CUDAGraphWrapper):
wrapper = self.worker.model_runner.model
wrapper.concrete_cudagraph_entries = {}

elif isinstance(self.worker.model_runner.model, UBatchWrapper):
raise RuntimeError("DBO is not yet supported in elastic EP")

torch.compiler.reset()
with set_current_vllm_config(self.worker.vllm_config):
reset_compile_wrapper(self.worker.model_runner.get_model())

gc.collect()
torch.accelerator.synchronize()
torch.accelerator.empty_cache()

def switch_and_remove(self) -> None:
self._release_cuda_graphs()
_replace_active_groups(world=None, dp=None, ep=None, eplb=None, node_count=None)

def switch_and_prepare(self) -> None:
old_dp_size = get_dp_group().world_size
old_ep_size = get_ep_group().world_size

self._release_cuda_graphs()
_replace_active_groups(**pop_standby_groups())

parallel_config = self.worker.vllm_config.parallel_config
Expand Down Expand Up @@ -384,13 +425,6 @@ def switch_and_prepare(self) -> None:
compilation_counter.stock_torch_compile_count += 1
self.worker.model_runner.model.compile(fullgraph=True, backend=backend)

# release all previously captured CUDA graphs
if isinstance(self.worker.model_runner.model, CUDAGraphWrapper):
wrapper = self.worker.model_runner.model
wrapper.concrete_cudagraph_entries = {}
elif isinstance(self.worker.model_runner.model, UBatchWrapper):
raise RuntimeError("DBO is not yet supported in elastic EP")

multi_block_table = self.worker.model_runner.input_batch.block_table
saved_block_tables: list[tuple[torch.Tensor, torch.Tensor]] = []
for bt in multi_block_table.block_tables:
Expand All @@ -399,14 +433,6 @@ def switch_and_prepare(self) -> None:
)
multi_block_table.clear()

# reset the compile wrapper
torch.compiler.reset()
with set_current_vllm_config(self.worker.vllm_config):
reset_compile_wrapper(self.worker.model_runner.get_model())

gc.collect()
torch.accelerator.synchronize()
torch.accelerator.empty_cache()
unlock_workspace()
self.worker.compile_or_warm_up_model()
lock_workspace()
Expand All @@ -416,8 +442,12 @@ def switch_and_prepare(self) -> None:
):
bt.block_table.gpu.copy_(saved_gpu)
bt.block_table.cpu.copy_(saved_cpu)
if new_dp_size < old_dp_size:
self._set_eplb_suppressed(False)

def perform_eplb_reshuffle(self, new_dp_size: int | None = None) -> None:
def _perform_eplb_reshuffle(
self, rank_mapping: dict[int, int] | None = None
) -> None:
if get_ep_group().rank == 0:
logger.info("[Elastic EP] Starting expert resharding...")

Expand All @@ -428,20 +458,9 @@ def perform_eplb_reshuffle(self, new_dp_size: int | None = None) -> None:
eplb_model_state = eplb_state.model_states[model_config.compute_hash()]
is_async_enabled = eplb_state.is_async
eplb_state.is_async = False
if new_dp_size is None:
if rank_mapping is None:
eplb_state.rearrange()
else:
# scale down
parallel_config = self.worker.vllm_config.parallel_config
tp_size = parallel_config.tensor_parallel_size
old_ep_size = parallel_config.data_parallel_size * tp_size
new_ep_size = new_dp_size * tp_size

rank_mapping = {
old_ep_rank: old_ep_rank if old_ep_rank < new_ep_size else -1
for old_ep_rank in range(old_ep_size)
}

eplb_state.rearrange(rank_mapping=rank_mapping)
# NOTE(yongji): check whether we need to synchronize here
torch.accelerator.synchronize()
Expand All @@ -451,10 +470,25 @@ def perform_eplb_reshuffle(self, new_dp_size: int | None = None) -> None:
eplb_model_state.physical_to_logical_map.shape[1]
)
eplb_state.is_async = is_async_enabled
self.worker.model_runner.eep_eplb_suppressed = False
if get_ep_group().rank == 0:
logger.info("[Elastic EP] Expert resharding completed")

def perform_eplb_reshuffle(self) -> None:
self._perform_eplb_reshuffle()
self._set_eplb_suppressed(False)

def perform_scale_down_eplb_reshuffle(self, new_dp_size: int) -> None:
self._set_eplb_suppressed(True)
parallel_config = self.worker.vllm_config.parallel_config
tp_size = parallel_config.tensor_parallel_size
old_ep_size = parallel_config.data_parallel_size * tp_size
new_ep_size = new_dp_size * tp_size
rank_mapping = {
old_ep_rank: old_ep_rank if old_ep_rank < new_ep_size else -1
for old_ep_rank in range(old_ep_size)
}
self._perform_eplb_reshuffle(rank_mapping=rank_mapping)

def receive_weights(self) -> None:
dp_group = get_dp_group()
assert isinstance(dp_group, StatelessGroupCoordinator)
Expand Down
36 changes: 29 additions & 7 deletions vllm/distributed/elastic_ep/elastic_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ class ScaleUpExistingEngineState(enum.IntEnum):


class ScaleUpNewEngineState(enum.IntEnum):
PREPARE = 0
EPLB_RESHUFFLE = 1
COMPLETE = 2
PRE_KV_INIT = 0
PREPARE = 1
EPLB_RESHUFFLE = 2
COMPLETE = 3


class ScaleDownRemainingEngineState(enum.IntEnum):
Expand Down Expand Up @@ -104,7 +105,7 @@ def __init__(
self.state: EngineState
if scale_type == "scale_up":
self.state = (
ScaleUpNewEngineState.PREPARE
ScaleUpNewEngineState.PRE_KV_INIT
if worker_type == "new"
else ScaleUpExistingEngineState.WAIT_NEW_CORE_ENGINES_INIT
)
Expand Down Expand Up @@ -142,6 +143,12 @@ def progress(self) -> bool:
else self._progress_remaining_engine()
)

def run_pre_kv_init_states(self) -> None:
assert self.scale_type == "scale_up" and self.worker_type == "new"
assert self.state == ScaleUpNewEngineState.PRE_KV_INIT
assert self.progress()
assert self.state == ScaleUpNewEngineState.PREPARE

def _execute_tcp_store_barrier(
self, dp_store, group_rank, group_size, barrier_id, timeout=None
):
Expand Down Expand Up @@ -303,7 +310,23 @@ def _progress_new_engine(self) -> bool:
state = self.state
assert self.new_dp_group is not None and self.new_dp_store is not None

if state == ScaleUpNewEngineState.PREPARE:
if state == ScaleUpNewEngineState.PRE_KV_INIT:
self.engine_core._eep_send_engine_core_notification(
EEPNotificationType.NEW_CORE_ENGINES_WEIGHTS_INIT_READY
)
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("receive_weights",)
)
self.engine_core.available_gpu_memory_for_kv_cache = (
ParallelConfig.sync_kv_cache_memory_size(self.new_dp_group, -1)
)
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("prepare_new_worker",)
)
self.state = ScaleUpNewEngineState.PREPARE
return True

elif state == ScaleUpNewEngineState.PREPARE:
tensor = torch.tensor([0, 0, 0], dtype=torch.int32, device="cpu")
torch.distributed.all_reduce(
tensor,
Expand Down Expand Up @@ -403,7 +426,6 @@ def _progress_removing_engine(self) -> bool:
self.engine_core._eep_send_engine_core_notification(
EEPNotificationType.SHUTDOWN_COMPLETE
)
self.engine_core.shutdown()
return True

else:
Expand Down Expand Up @@ -525,7 +547,7 @@ def _eplb_reshuffle_before_scale_down(self):
self.model_executor.collective_rpc(
"elastic_ep_execute",
args=(
"perform_eplb_reshuffle",
"perform_scale_down_eplb_reshuffle",
self.reconfig_request.new_data_parallel_size,
),
)
Expand Down
17 changes: 3 additions & 14 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,8 @@ def run_busy_loop(self):
if self.eep_scaling_state is not None:
_ = self.eep_scaling_state.progress()
if self.eep_scaling_state.is_complete():
if self.eep_scaling_state.worker_type == "removing":
raise SystemExit
self.process_input_queue_block = True
self.eep_scaling_state = None

Expand Down Expand Up @@ -1857,20 +1859,7 @@ def _eep_scale_up_before_kv_init(self):
scale_type="scale_up",
reconfig_request=None,
)
self.model_executor.collective_rpc("init_device")
self.model_executor.collective_rpc("load_model")
self._eep_send_engine_core_notification(
EEPNotificationType.NEW_CORE_ENGINES_WEIGHTS_INIT_READY
)
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("receive_weights",)
)
self.available_gpu_memory_for_kv_cache = (
ParallelConfig.sync_kv_cache_memory_size(self.dp_group, -1)
)
self.model_executor.collective_rpc(
"elastic_ep_execute", args=("prepare_new_worker",)
)
self.eep_scaling_state.run_pre_kv_init_states()
self.process_input_queue_block = False


Expand Down
15 changes: 8 additions & 7 deletions vllm/v1/executor/multiproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,14 @@ def __init__(
)

# Load model
is_eep_new_worker = envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH
if not is_eep_new_worker:
self.worker.init_device()
# Update process title now that parallel groups are initialized
self.setup_proc_title_and_log_prefix(
enable_ep=vllm_config.parallel_config.enable_expert_parallel
)
self.worker.init_device()
# Update process title now that parallel groups are initialized
self.setup_proc_title_and_log_prefix(
enable_ep=vllm_config.parallel_config.enable_expert_parallel
)
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self.worker.elastic_ep_execute("load_model")
else:
self.worker.load_model()

scheduler_config = vllm_config.scheduler_config
Expand Down
7 changes: 4 additions & 3 deletions vllm/v1/executor/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,10 @@ def sort_by_driver_then_worker_ip(item: RayWorkerMetaData):
all_kwargs.append(kwargs)
self.collective_rpc("init_worker", args=(all_kwargs,))

is_eep_new_worker = envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH
if not is_eep_new_worker:
self.collective_rpc("init_device")
self.collective_rpc("init_device")
if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self.collective_rpc("elastic_ep_execute", args=("load_model",))
else:
self.collective_rpc("load_model")

def _update_block_size(worker):
Expand Down
10 changes: 6 additions & 4 deletions vllm/v1/executor/uniproc_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ def _init_executor(self) -> None:
max_workers=1, thread_name_prefix="WorkerAsyncOutput"
)

is_eep_new_worker = envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH
self.driver_worker.init_worker(all_kwargs=[kwargs])
if not is_eep_new_worker:
self.driver_worker.init_device()
self.driver_worker.init_device()

if envs.VLLM_ELASTIC_EP_SCALE_UP_LAUNCH:
self.driver_worker.elastic_ep_execute("load_model")
else:
self.driver_worker.load_model()
current_platform.update_block_size_for_backend(self.vllm_config)
current_platform.update_block_size_for_backend(self.vllm_config)

def _distributed_args(self) -> tuple[str, int, int]:
"""Return (distributed_init_method, rank, local_rank)."""
Expand Down
22 changes: 2 additions & 20 deletions vllm/v1/worker/gpu_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,30 +315,12 @@ def init_device(self):

# FIXME(youkaichao & ywang96): Use TorchDispatchMode instead of memory pool
# to hijack tensor allocation.
def load_model(self) -> None:
dummy_weights = os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") == "1"
if dummy_weights:
(
expanded_physical_to_logical,
num_logical_experts,
old_num_physical_experts,
) = self.elastic_ep_executor.receive_expert_mapping()
num_physical_experts = expanded_physical_to_logical.shape[1]
self.parallel_config.eplb_config.num_redundant_experts = (
num_physical_experts - num_logical_experts
)

def load_model(self, *, load_dummy_weights: bool = False) -> None:
with (
self._maybe_get_memory_pool_context(tag="weights"),
set_current_vllm_config(self.vllm_config),
):
self.model_runner.load_model(load_dummy_weights=dummy_weights)

if dummy_weights:
self.model_runner.setup_eplb_from_mapping(
expanded_physical_to_logical, old_num_physical_experts
)
self.model_runner.eep_eplb_suppressed = True
self.model_runner.load_model(load_dummy_weights=load_dummy_weights)

def update_config(self, overrides: dict[str, Any]) -> None:
self.model_runner.update_config(overrides)
Expand Down
Loading
Loading