[Feat.][EEP] vLLM Ascend adaptation for Elastic EP Milestone2#29
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements support for Elastic Expert Parallelism (Elastic EP) Milestone 2 on the Ascend platform. By introducing a new executor and leveraging stateless communication group coordinators, the changes enable dynamic scaling of expert parallelism without requiring modifications to the core vLLM codebase. The implementation includes robust handling of communication group transitions, expert rebalancing, and weight transfers, ensuring seamless scaling operations. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
Suggested PR Title:\n\n\n[Distributed][Feature] Support Elastic Expert Parallelism (EP) for Ascend NPUs\n\n\nSuggested PR Summary:\n\nmarkdown\n### What this PR does / why we need it?\nThis PR implements Elastic Expert Parallelism (EP) for Ascend NPUs, enabling dynamic scaling for MoE models. It introduces the AscendElasticEPScalingExecutor to manage reconfiguration, expert redistribution (EPLB reshuffling), and ACL graph cleanup. It also adds HCCL P2P communication support and stateless process group initialization to facilitate dynamic membership changes. Additionally, it includes bug fixes for block table clearing and optimizations for expert weight transfers.\n\n### Does this PR introduce _any_ user-facing change?\nYes, it enables dynamic scaling of MoE models on Ascend hardware.\n\n### How was this patch tested?\nCI passed with existing tests.\n\n\nI have no feedback to provide.
33e8e96 to
30732dc
Compare
Signed-off-by: nifeng <1542305589@qq.com>
30732dc to
6a50853
Compare
There was a problem hiding this comment.
Pull request overview
Adds Ascend-side support for vLLM’s Elastic Expert Parallel (Elastic EP) milestone 2, enabling dynamic scale-up/scale-down by introducing an Ascend elastic scaling executor, stateless communication groups (incl. standby groups), and updating EPLB/comm plumbing to work under elastic reconfiguration.
Changes:
- Introduce
AscendElasticEPScalingExecutor+ standby-group management to reconfigure Ascend distributed groups during elastic scaling. - Switch key Ascend EP-related group coordinators (MC2 / Dynamic EPLB / FC3 quant) to stateless groups when elastic EP is enabled.
- Refactor EPLB (policy/worker/transfer) and MoE init paths to support expert rebalancing and communication changes during scaling.
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
vllm_ascend/worker/worker.py |
Adds elastic executor hook + dummy-weight load support and moves EPLB warmup invocation. |
vllm_ascend/worker/model_runner_v1.py |
Adds shared scaling state, dummy-weight loading, and scaling-aware EPLB warmup behavior. |
vllm_ascend/worker/block_table.py |
Adjusts block table clearing to explicitly reset CPU/GPU buffers. |
vllm_ascend/quantization/methods/w8a8_dynamic.py |
Makes W8A8_DYNAMIC MoE comm-name init scaling-aware. |
vllm_ascend/platform.py |
Adds stateless HCCL PG init helper for stateless group support. |
vllm_ascend/ops/fused_moe/token_dispatcher.py |
Uses group coordinator rank instead of torch.distributed.get_rank(group=...). |
vllm_ascend/ops/fused_moe/fused_moe.py |
Skips MoE comm-method setup during scale-up launch. |
vllm_ascend/eplb/eplb_updator.py |
Moves EPLB gather/warmup traffic to stateless group implementations. |
vllm_ascend/eplb/core/policy/policy_default_eplb.py |
Updates redundancy packing + adds elastic EP-size awareness for rebalancing. |
vllm_ascend/eplb/core/policy/policy_abstract.py |
Adds set_new_ep_size hook for elastic policies. |
vllm_ascend/eplb/core/eplb_worker.py |
Adds scaling-aware placement checks and EP-size adjustments. |
vllm_ascend/eplb/core/eplb_device_transfer_loader.py |
Reworks D2D transfer to use stateless-group P2P send/recv. |
vllm_ascend/eplb/adaptor/vllm_adaptor.py |
Adds temp tensor buffers for fused MC2 cases. |
vllm_ascend/distributed/parallel_state.py |
Creates stateless Ascend groups under --enable-elastic-ep and adds replacement helper. |
vllm_ascend/distributed/elastic_ep/standby_state.py |
New: creates and stores standby stateless Ascend groups for reconfiguration. |
vllm_ascend/distributed/elastic_ep/elastic_execute.py |
New: Ascend elastic EP scaling executor integrating group switching, MoE reconfig, and EPLB reshuffle. |
vllm_ascend/distributed/device_communicators/pyhccl_wrapper.py |
Adds HCCL Send/Recv bindings. |
vllm_ascend/distributed/device_communicators/pyhccl.py |
Adds send/recv/destroy and batch P2P helpers. |
vllm_ascend/distributed/device_communicators/npu_communicator.py |
Integrates PyHCCL communicator for stateless-group P2P ops and adds destroy. |
vllm_ascend/compilation/acl_graph.py |
Adds global ACLGraphWrapper clearing facility for elastic reconfiguration cleanup. |
Comments suppressed due to low confidence (2)
vllm_ascend/eplb/core/eplb_device_transfer_loader.py:73
generate_expert_d2d_transfer_tasknow appends dict-based send ops toself.comm_op_list, but the recv path still appendsdist.P2POpobjects.asyn_expert_weight_transferlater treats every entry as a dict (expectspeer_rank/tensors/op) and will fail at runtime. Use a single op representation for both send and recv (or branch on type).
for buffer_tensor_id, recv_info in enumerate(expert_recv_info):
recv_rank, global_expert_id_to_recv = recv_info
for buffer_tensor in self.eplb_adaptor.buffer_tensor_list[buffer_tensor_id]:
self.comm_op_list.append(
dist.P2POp(dist.irecv, buffer_tensor, recv_rank, group=self.comm_group.device_group)
)
vllm_ascend/worker/model_runner_v1.py:2577
- Changing
NPUModelRunner.load_modelto accept a positionalload_dummy_weightsarg breaks subclasses that overrideload_modelwithout this parameter. For example,vllm_ascend/xlite/xlite_model_runner.py:XliteModelRunner.load_model(self), combined withNPUWorker.load_modelnow callingself.model_runner.load_model(load_dummy_weights), will raiseTypeError. Update overrides to acceptload_dummy_weights: bool = False(and forward tosuper().load_model(load_dummy_weights=...)) or make the new argument keyword-only throughout.
def load_model(self, load_dummy_weights: bool = False) -> None:
logger.info("Starting to load model %s...", self.model_config.model)
if self.ascend_config.mix_placement:
# TODO: Enabling the mix placement in deepseek_v2.py
# remove this part after the mix placement merged into vllm
def mock_true():
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| new_expert_maps_clone = new_expert_maps.clone() | ||
|
|
||
| if scale: | ||
| shape = list(new_expert_maps_clone.shape) |
There was a problem hiding this comment.
new_expert_maps_clone = new_expert_maps.clone() is unused (shape can be derived from new_expert_maps.shape directly). This clone adds avoidable memory/time overhead in the EPLB worker process; remove it unless it’s needed for correctness.
| new_expert_maps_clone = new_expert_maps.clone() | |
| if scale: | |
| shape = list(new_expert_maps_clone.shape) | |
| if scale: | |
| shape = list(new_expert_maps.shape) |
| all_instances: ClassVar[weakref.WeakSet["ACLGraphWrapper"]] = weakref.WeakSet() | ||
| graph_pool: ClassVar[tuple[int, int]] = current_platform.get_global_graph_pool() | ||
|
|
||
| @classmethod | ||
| def clear_all_graphs(cls) -> None: | ||
| """Clear all graphs from all ACLGraphWrapper instances.""" | ||
| for instance in list(cls.all_instances): | ||
| instance.clear_graphs() | ||
| cls.graph_pool = (cls.graph_pool[0], cls.graph_pool[1] + 1) |
There was a problem hiding this comment.
ACLGraphWrapper.clear_all_graphs() relies on all_instances, but ACLGraphWrapper.__init__ never adds self to ACLGraphWrapper.all_instances, so graph cleanup during elastic reconfiguration will be a no-op. Register each instance in __init__ (WeakSet will drop it on GC).
| for op_info in self.comm_op_list: | ||
| peer_rank = op_info["peer_rank"] | ||
| tensors = op_info["tensors"] | ||
| expert_id = op_info["expert_id"] | ||
| op = op_info["op"] | ||
| for i, tensor in enumerate(tensors): | ||
| if op == "send": | ||
| worker = self.comm_group.device_group.send([tensor], peer_rank, tag=(expert_id + 1) * (i + 1)) | ||
| else: | ||
| worker = self.comm_group.device_group.recv([tensor], peer_rank, tag=(expert_id + 1) * (i + 1)) |
There was a problem hiding this comment.
The P2P tag calculation (expert_id + 1) * (i + 1) is not collision-free (e.g., (expert_id=1,i=1) and (expert_id=3,i=0) both produce tag=4). Tag collisions can cause send/recv mismatches and corrupted transfers. Use a bijective mapping from (expert_id, tensor_index) to tag (e.g., expert_id * num_tensors_per_expert + i, with a consistent num_tensors_per_expert).
| for op_info in self.comm_op_list: | |
| peer_rank = op_info["peer_rank"] | |
| tensors = op_info["tensors"] | |
| expert_id = op_info["expert_id"] | |
| op = op_info["op"] | |
| for i, tensor in enumerate(tensors): | |
| if op == "send": | |
| worker = self.comm_group.device_group.send([tensor], peer_rank, tag=(expert_id + 1) * (i + 1)) | |
| else: | |
| worker = self.comm_group.device_group.recv([tensor], peer_rank, tag=(expert_id + 1) * (i + 1)) | |
| # Compute a consistent upper bound on the number of tensors per expert | |
| max_tensors_per_expert = 0 | |
| for op_info in self.comm_op_list: | |
| tensors = op_info["tensors"] | |
| if tensors is not None: | |
| max_tensors_per_expert = max(max_tensors_per_expert, len(tensors)) | |
| for op_info in self.comm_op_list: | |
| peer_rank = op_info["peer_rank"] | |
| tensors = op_info["tensors"] | |
| expert_id = op_info["expert_id"] | |
| op = op_info["op"] | |
| for i, tensor in enumerate(tensors): | |
| tag = expert_id * max_tensors_per_expert + i | |
| if op == "send": | |
| worker = self.comm_group.device_group.send([tensor], peer_rank, tag=tag) | |
| else: | |
| worker = self.comm_group.device_group.recv([tensor], peer_rank, tag=tag) |
| if os.environ.get("VLLM_ELASTIC_EP_SCALE_UP_LAUNCH") != "1": | ||
| try: | ||
| device_group = get_mc2_group().device_group | ||
| # TODO: Try local_rank = ep_group.rank_in_group | ||
| local_rank = get_mc2_group().rank_in_group | ||
| backend = device_group._get_backend(torch.device("npu")) | ||
| self.moe_all_to_all_group_name = backend.get_hccl_comm_name(local_rank) | ||
| except AttributeError: | ||
| self.moe_all_to_all_group_name = "" |
There was a problem hiding this comment.
When VLLM_ELASTIC_EP_SCALE_UP_LAUNCH == "1", AscendW8A8DynamicFusedMoEMethod.__init__ skips the whole block that initializes self.moe_all_to_all_group_name, leaving the attribute undefined. Any later access will raise AttributeError. Set a safe default (e.g., empty string) outside the env-guard, and only skip the backend lookup when scaling up.
There was a problem hiding this comment.
这里需要所有rank集合来进行get_hccl_comm_name,在扩容时推迟这一步骤到AscendElasticEPScalingExecutor.prepare_new_worker
| old_ep_size = parallel_config.data_parallel_size * tp_size | ||
| new_ep_size = new_dp_size * tp_size | ||
|
|
There was a problem hiding this comment.
perform_scale_down_eplb_reshuffle computes old_ep_size/new_ep_size as dp_size * tp_size, but elsewhere in this executor EP size is treated as dp_size * tp_size * pcp_size (see load_model). If prefill_context_parallel_size > 1, the scale metadata sent to EPLB worker will be inconsistent. Consider deriving EP size from get_ep_group().world_size (old/new) or include PCP in the calculation.
| old_ep_size = parallel_config.data_parallel_size * tp_size | |
| new_ep_size = new_dp_size * tp_size | |
| # Use the actual current EP group size to capture all dimensions (dp * tp * pcp, etc.). | |
| old_ep_size = get_ep_group().world_size | |
| # Derive the new EP size including PCP so the EPLB worker sees consistent scale metadata. | |
| pcp_group = get_pcp_group() | |
| pcp_size = pcp_group.world_size if pcp_group is not None else 1 | |
| new_ep_size = new_dp_size * tp_size * pcp_size |
| @classmethod | ||
| def clear_all_graphs(cls) -> None: | ||
| """Clear all graphs from all ACLGraphWrapper instances.""" | ||
| for instance in list(cls.all_instances): | ||
| instance.clear_graphs() | ||
| cls.graph_pool = (cls.graph_pool[0], cls.graph_pool[1] + 1) |
There was a problem hiding this comment.
clear_all_graphs is a new public classmethod that is now used during elastic EP reconfiguration, but there’s no unit test asserting that instances are registered and that clear_all_graphs() actually clears concrete_aclgraph_entries / bumps the graph pool. Adding a focused test in tests/ut/compilation/test_acl_graph.py would prevent regressions here.
| if local_expert_id not in has_temp: | ||
| self.eplb_adaptor.temp_tensor_list[layer_id][index].copy_(src_tensor) | ||
| op_info["tensors"].append(self.eplb_adaptor.temp_tensor_list[layer_id][index]) |
There was a problem hiding this comment.
In the send-op construction, op_info["tensors"].append(self.eplb_adaptor.temp_tensor_list[layer_id][index]) runs unconditionally, even when temp_tensor_list doesn't exist (will raise) and it also indexes temp_tensor_list by layer_id. In VllmEplbAdaptor, temp_tensor_list is allocated by buffer_id (local expert), not by layer. Append the original src_tensor when no temp buffer is available, and index temp buffers consistently (likely by local_expert_id).
| if local_expert_id not in has_temp: | |
| self.eplb_adaptor.temp_tensor_list[layer_id][index].copy_(src_tensor) | |
| op_info["tensors"].append(self.eplb_adaptor.temp_tensor_list[layer_id][index]) | |
| # temp_tensor_list is allocated by buffer_id (local expert), not by layer | |
| temp_tensor = self.eplb_adaptor.temp_tensor_list[local_expert_id][index] | |
| if local_expert_id not in has_temp: | |
| temp_tensor.copy_(src_tensor) | |
| tensor_to_send = temp_tensor | |
| else: | |
| # When no temp buffer is available, send the original source tensor | |
| tensor_to_send = src_tensor | |
| op_info["tensors"].append(tensor_to_send) |
| if self.dynamic_eplb and not self.is_eplb_warmuped: | ||
| self.is_eplb_warmuped = True | ||
| self.eplb_adaptor = VllmEplbAdaptor(model=self.model) | ||
| self.eplb_loader.set_adator(self.eplb_adaptor) | ||
| self.eplb_updator.set_adaptor(self.eplb_adaptor) |
There was a problem hiding this comment.
eplb_warmup() sets self.is_eplb_warmuped = True before the VLLM_ELASTIC_EP_SCALE_UP_LAUNCH guard. When that env var is set, warmup is skipped but the flag still prevents any later warmup attempt in the same process. Consider setting is_eplb_warmuped only after warmup actually completes (or track a separate “adaptor initialized” vs “warmup done” state).
| Initialize a stateless HCCL process group for CUDA devices. | ||
| This method creates a ProcessGroup with the specified backend configuration, | ||
| typically used for GPU communication. It sets up the necessary backend | ||
| options and registers the backend with the process group. | ||
| Args: | ||
| backend: The distributed backend to use (e.g., 'hccl') | ||
| prefix_store: The prefix store for distributed coordination | ||
| group_rank: The rank of the current process within the group | ||
| group_size: The total number of processes in the group | ||
| timeout: Maximum time to wait for the operation to complete | ||
| **kwargs: Additional backend-specific options | ||
| warning: | ||
| Uses internal PyTorch API (torch._C._distributed_c10d.ProcessGroupHCCL) | ||
| which may change in future PyTorch versions. Compatibility should be | ||
| verified with each PyTorch upgrade. | ||
| Compatibility Risk: | ||
| - High risk of breakage in PyTorch 2.4+ | ||
| - No semantic versioning guarantees | ||
| - Requires testing with new PyTorch releases | ||
| Returns: | ||
| A ProcessGroup object configured with the specified backend | ||
| """ | ||
|
|
||
| # INTERNAL API USAGE - COMPATIBILITY RISK | ||
| # This internal import is necessary for stateless process group functionality | ||
| # but carries compatibility risks. Monitor PyTorch release notes for changes. | ||
| # TODO: Migrate to public API when available in future PyTorch versions | ||
| from torch_npu._C._distributed_c10d import ProcessGroupHCCL | ||
| import uuid | ||
|
|
||
| pg = ProcessGroup(prefix_store, group_rank, group_size) | ||
|
|
||
| backend_options = ProcessGroupHCCL.Options() | ||
| backend_options._timeout = timeout | ||
|
|
||
| # Create Backend object | ||
| backend = Backend("hccl") | ||
|
|
||
| # Set default backend for ProcessGroup | ||
| pg._set_default_backend(Backend.backend_type_map[backend]) |
There was a problem hiding this comment.
stateless_init_device_torch_dist_pg accepts a backend string but then hard-codes Backend("hccl"), effectively ignoring the argument. This can lead to surprising behavior if the caller passes anything else; at minimum validate backend == "hccl" or use the provided value. Also the docstring still references CUDA/GPU, which is misleading for Ascend/NPU.
| Initialize a stateless HCCL process group for CUDA devices. | |
| This method creates a ProcessGroup with the specified backend configuration, | |
| typically used for GPU communication. It sets up the necessary backend | |
| options and registers the backend with the process group. | |
| Args: | |
| backend: The distributed backend to use (e.g., 'hccl') | |
| prefix_store: The prefix store for distributed coordination | |
| group_rank: The rank of the current process within the group | |
| group_size: The total number of processes in the group | |
| timeout: Maximum time to wait for the operation to complete | |
| **kwargs: Additional backend-specific options | |
| warning: | |
| Uses internal PyTorch API (torch._C._distributed_c10d.ProcessGroupHCCL) | |
| which may change in future PyTorch versions. Compatibility should be | |
| verified with each PyTorch upgrade. | |
| Compatibility Risk: | |
| - High risk of breakage in PyTorch 2.4+ | |
| - No semantic versioning guarantees | |
| - Requires testing with new PyTorch releases | |
| Returns: | |
| A ProcessGroup object configured with the specified backend | |
| """ | |
| # INTERNAL API USAGE - COMPATIBILITY RISK | |
| # This internal import is necessary for stateless process group functionality | |
| # but carries compatibility risks. Monitor PyTorch release notes for changes. | |
| # TODO: Migrate to public API when available in future PyTorch versions | |
| from torch_npu._C._distributed_c10d import ProcessGroupHCCL | |
| import uuid | |
| pg = ProcessGroup(prefix_store, group_rank, group_size) | |
| backend_options = ProcessGroupHCCL.Options() | |
| backend_options._timeout = timeout | |
| # Create Backend object | |
| backend = Backend("hccl") | |
| # Set default backend for ProcessGroup | |
| pg._set_default_backend(Backend.backend_type_map[backend]) | |
| Initialize a stateless HCCL process group for Ascend NPUs. | |
| This method creates a ProcessGroup with the specified backend configuration, | |
| typically used for NPU communication via HCCL. It sets up the necessary | |
| backend options and registers the backend with the process group. | |
| Args: | |
| backend: The distributed backend to use. Currently only 'hccl' is | |
| supported for Ascend NPUs. | |
| prefix_store: The prefix store for distributed coordination. | |
| group_rank: The rank of the current process within the group. | |
| group_size: The total number of processes in the group. | |
| timeout: Maximum time to wait for the operation to complete. | |
| Warning: | |
| Uses internal PyTorch NPU API (torch_npu._C._distributed_c10d.ProcessGroupHCCL) | |
| which may change in future PyTorch / torch_npu versions. Compatibility | |
| should be verified with each upgrade. | |
| Compatibility Risk: | |
| - High risk of breakage in future PyTorch / torch_npu releases. | |
| - No semantic versioning guarantees for internal APIs. | |
| - Requires testing with new PyTorch / torch_npu releases. | |
| Returns: | |
| A ProcessGroup object configured with the specified backend. | |
| """ | |
| # INTERNAL API USAGE - COMPATIBILITY RISK | |
| # This internal import is necessary for stateless process group functionality | |
| # but carries compatibility risks. Monitor PyTorch / torch_npu release notes for changes. | |
| # TODO: Migrate to public API when available in future PyTorch versions. | |
| from torch_npu._C._distributed_c10d import ProcessGroupHCCL | |
| import uuid | |
| # Validate backend argument to avoid silently ignoring caller input. | |
| if backend.lower() != "hccl": | |
| raise ValueError( | |
| f"Only 'hccl' backend is supported for Ascend NPUs, got: {backend!r}" | |
| ) | |
| pg = ProcessGroup(prefix_store, group_rank, group_size) | |
| backend_options = ProcessGroupHCCL.Options() | |
| backend_options._timeout = timeout | |
| # Create Backend object for HCCL. | |
| backend_obj = Backend("hccl") | |
| # Set default backend for ProcessGroup | |
| pg._set_default_backend(Backend.backend_type_map[backend_obj]) |
67b5763 to
6eed499
Compare
Signed-off-by: nifeng <1542305589@qq.com>
6eed499 to
aac1c93
Compare
fangyuchu
left a comment
There was a problem hiding this comment.
个人还是觉得从vLLM里复制的有点太多了。我觉得如果实在没有办法复用vLLM的代码的话,可能就要看看vLLM需要怎么抽象一些接口出来,我们可以反向往vLLM里提。
| ep_size = dp_size * tp_size * pcp_size | ||
| get_ascend_config().eplb_config.num_redundant_experts = ep_size * num_local_experts - num_logical_experts | ||
| if get_ascend_config().eplb_config.dynamic_eplb: | ||
| self.worker.model_runner.shared_dict["expert_maps"] = expert_maps | ||
| self.worker.model_runner.shared_dict["old_ep_size"] = expert_maps.shape[1] | ||
| self.worker.load_model(load_dummy_weights=True) | ||
|
|
||
| def create_standby_groups(self, reconfig_request: ReconfigureDistributedRequest) -> None: | ||
| self.reconfig_request = reconfig_request | ||
| new_dp_size = reconfig_request.new_data_parallel_size | ||
| world_size = self.worker.vllm_config.parallel_config.world_size | ||
| new_world_size_across_dp = world_size * new_dp_size | ||
| updated_config = copy.copy(self.worker.vllm_config) | ||
| updated_config.parallel_config = copy.deepcopy(self.worker.vllm_config.parallel_config) | ||
| updated_config.parallel_config.data_parallel_size = new_dp_size | ||
| with set_current_vllm_config(updated_config): | ||
| create_standby_groups( | ||
| new_dp_size=new_dp_size, | ||
| new_world_size_across_dp=new_world_size_across_dp, | ||
| master_ip=reconfig_request.new_data_parallel_master_ip, | ||
| coord_store_port=reconfig_request.coord_store_port, | ||
| enable_eplb=updated_config.parallel_config.enable_eplb, | ||
| ) | ||
| create_ascend_standby_groups( | ||
| new_dp_size=new_dp_size, | ||
| new_world_size_across_dp=new_world_size_across_dp, | ||
| master_ip=reconfig_request.new_data_parallel_master_ip, | ||
| coord_store_port=reconfig_request.coord_store_port, | ||
| ) | ||
|
|
||
| def transfer_weights(self, old_dp_size: int, new_dp_size: int) -> None: |
There was a problem hiding this comment.
这部分和EEP的区别是啥,没看出来和super()中实现的区别
There was a problem hiding this comment.
主要是解决ascend量化扩容之前那个bug的时候,batch_transfer_weights函数里面加了点代码,如果不重写transfer_weights和receive_weights调用的还是原来的batch_transfer_weight,除非就是把batch_transfer_weight写到类里面
|
|
||
| self._perform_eplb_reshuffle() | ||
|
|
||
| def receive_weights(self) -> None: |
| quant_weight_names = ["aclnn_input_scale", "aclnn_input_scale_reciprocal", "aclnn_input_offset"] | ||
| for module in model.modules(): | ||
| for name in quant_weight_names: | ||
| if (param := getattr(module, name, None)) is not None: |
There was a problem hiding this comment.
这里是不是可以先调用EEP原始的batch_transfer_weigh,然后再发送ascend特有的这几个?
There was a problem hiding this comment.
文件整体写个注释吧,类似于:
NOTE:
This file is adapted from vLLM's elastic_execute.py
Key differences:
- xxx
- xxx
- xxx
============================================================
| module | ||
| for module in self.worker.model_runner.model.modules() | ||
| if (module.__class__.__name__ == "AscendFusedMoE" or module.__class__.__name__ == "AscendSharedFusedMoE") | ||
| ] |
There was a problem hiding this comment.
当前判断__class__.__name__不太好,昇腾有可能会修改类名。可以改成类似这样,因为他们一定是FusedMoE的子类。
from vllm.model_executor.layers.fused_moe import FusedMoE
moe_moules = [module for module in self.worker.model_runner.model.modules() if isinstance(module, FusedMoE)]
…ansfer_weights and receive_weights Signed-off-by: nifeng <1542305589@qq.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 21 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| with context, set_current_vllm_config(self.vllm_config): | ||
| self.model_runner.load_model() | ||
| self.model_runner.load_model(load_dummy_weights) |
There was a problem hiding this comment.
self.model_runner.load_model(load_dummy_weights) always passes a positional arg, which changes the call signature from the previous load_model() and breaks existing tests/mocks that assert load_model.assert_called_once() with no args. To keep backward-compat behavior, consider only passing the argument when load_dummy_weights is True (otherwise call self.model_runner.load_model()), or update the affected tests.
| self.model_runner.load_model(load_dummy_weights) | |
| if load_dummy_weights: | |
| self.model_runner.load_model(load_dummy_weights) | |
| else: | |
| self.model_runner.load_model() |
| src_tensor = torch.empty((1,), device=self.device) | ||
|
|
||
| comm_op_list = [] | ||
| for src_rank in range(self.world_size): | ||
| for dst_rank in range(self.world_size): | ||
| if src_rank != dst_rank: | ||
| comm_op_list.append({"src_rank": src_rank, "dst_rank": dst_rank, "tensor": src_tensor}) | ||
|
|
||
| for dst_rank in range(self.world_size): | ||
| if dst_rank == self.rank_id: | ||
| continue | ||
| comm_op_list.append(dist.P2POp(dist.isend, src_tensor, dst_rank, group=self.comm_group.device_group)) | ||
| comm_op_list = sorted(comm_op_list, key=lambda x: (x["src_rank"], x["dst_rank"])) | ||
|
|
||
| for src_rank in range(self.world_size): | ||
| workers = [] | ||
| for i, op in enumerate(comm_op_list): | ||
| src_rank = op["src_rank"] | ||
| dst_rank = op["dst_rank"] | ||
| tensor = op["tensor"] | ||
| if src_rank == self.rank_id: | ||
| continue | ||
| comm_op_list.append(dist.P2POp(dist.irecv, src_tensor, src_rank, group=self.comm_group.device_group)) | ||
| if comm_op_list: | ||
| reqs = dist.batch_isend_irecv(comm_op_list) | ||
| workers.append(self.comm_group.device_group.send([tensor], dst_rank, tag=i)) | ||
| elif dst_rank == self.rank_id: | ||
| workers.append(self.comm_group.device_group.recv([tensor], src_rank, tag=i)) | ||
|
|
||
| for req in reqs: | ||
| req.wait() | ||
| for worker in workers: | ||
| worker.wait() |
There was a problem hiding this comment.
warm_up_eplb posts multiple recv ops into the same src_tensor buffer (one per peer). Concurrent receives into the same tensor can cause undefined behavior / data races and can hang or corrupt memory depending on the ProcessGroup implementation. Allocate a distinct receive tensor per source rank (or serialize the recv loop) so each in-flight recv has its own buffer.
| for i in range(num_redundancy_expert): | ||
| sorted_indices = np.argsort([t[1] for t in origin_weights], kind="stable")[::-1] | ||
| weights = [origin_weights[idx] for idx in sorted_indices] | ||
| tmp_raw_weight = weights[0][1] * (len(route_expert_redundancy[weights[0][0]]) + 1) | ||
| route_expert_redundancy[weights[0][0]].append(route_expert_num + i) | ||
| avg_weight = tmp_raw_weight / (len(route_expert_redundancy[weights[0][0]]) + 1) | ||
| weights[0] = (weights[0][0], avg_weight) | ||
| index = 0 | ||
| while (len(route_expert_redundancy[weights[index][0]])) == card_num - 1: | ||
| index += 1 | ||
| tmp_raw_weight = weights[index][1] * (len(route_expert_redundancy[weights[index][0]]) + 1) | ||
| route_expert_redundancy[weights[index][0]].append(route_expert_num + i) | ||
| avg_weight = tmp_raw_weight / (len(route_expert_redundancy[weights[index][0]]) + 1) | ||
| weights[index] = (weights[index][0], avg_weight) |
There was a problem hiding this comment.
In original_compute_balanced_pack_redundancy, the while (len(route_expert_redundancy[weights[index][0]])) == card_num - 1: index += 1 loop has no bounds check. If all candidates are already replicated card_num-1 times, index will run past weights and raise IndexError. Add an index < len(weights) guard and raise a clear error (or early-exit) when no expert has remaining redundancy capacity.
| # Step 5: Eliminate duplicate experts within the same NPU through redundancy | ||
| # reallocation. Replace duplicates with redundant copies from other | ||
| # experts based on minimal weight difference. | ||
| for i in range(card_num): | ||
| arr = np.asarray(boxes[i]) | ||
| unique, inv, cnt = np.unique(arr, return_inverse=True, return_counts=True) | ||
| mask = cnt > 1 | ||
| dup_vals = unique[mask] | ||
| dup_cnts = cnt[mask] | ||
| for item_id, counts in zip(dup_vals, dup_cnts): | ||
| for _ in range(counts - 1): | ||
| cur_position = boxes[i].index(item_id) | ||
| cur_weight = boxes_weights[i][cur_position] | ||
| sorted_indices = np.argsort( | ||
| [ | ||
| abs( | ||
| t[1] | ||
| * (len(route_expert_redundancy[t[0]]) + 1) | ||
| / (len(route_expert_redundancy[t[0]]) + 2) | ||
| - cur_weight | ||
| ) | ||
| for t in origin_weights | ||
| ], | ||
| kind="stable", | ||
| ) | ||
| weights = [origin_weights[idx] for idx in sorted_indices] | ||
| index = 0 | ||
| while index < len(weights): | ||
| if ( | ||
| len(route_expert_redundancy[weights[index][0]]) < card_num - 1 | ||
| and weights[index][0] != item_id | ||
| and weights[index][0] not in boxes[i] | ||
| ): | ||
| break | ||
| index += 1 | ||
| boxes[i][cur_position] = weights[index][0] | ||
| tmp_raw_weight = weights[index][1] * (len(route_expert_redundancy[weights[index][0]]) + 1) |
There was a problem hiding this comment.
The duplicate-elimination logic (Step 5) is inside the main item placement loop, so it runs after every item placement and repeatedly re-sorts / mutates origin_weights, which is very expensive and can change the remaining placement order mid-iteration. Also, after while index < len(weights): ... index += 1, the code unconditionally uses weights[index] — if no candidate matches, this will raise IndexError. Consider moving duplicate elimination to a post-pass after packing, and handle the “no replacement candidate” case explicitly.
| tp_size = get_tp_group().world_size | ||
| pp_size = get_pp_group().world_size | ||
|
|
||
| all_ranks = torch.arange(new_world_size_across_dp).reshape(-1, new_dp_size * pp_size * tp_size) | ||
| group_ranks = all_ranks.unbind(0) | ||
| standby_ep_ranks = [x.tolist() for x in group_ranks] |
There was a problem hiding this comment.
create_ascend_standby_groups builds rank layout using only new_dp_size * pp_size * tp_size (omits prefill_context_parallel_size). Elsewhere in this PR (e.g., Ascend model-parallel init) rank layout explicitly includes PCP. If pcp_size > 1, this standby group rank mapping will be incorrect and can break elastic reconfiguration. Consider incorporating get_pcp_group().world_size into the reshape/stride so the standby group layout matches the active group layout.
Signed-off-by: nifeng <1542305589@qq.com>
What this PR does / why we need it?
This PR introduces the adaptation for vLLM's feature — Elastic EP Milestone 2 on the Ascend side. This feature #34861 has been recently merged into vLLM.
Key contributions
Added an
AscendElasticEPScalingExecutorclass inheriting fromElasticEPScalingExecutorto implement vLLM Ascend operations during elastic scaling;Under the
--enable-elastic-epargument, replaced vLLM Ascend-specific communication group coordinators (such as MC2) fromGroupCoordinatortoStatelessGroupCoordinator. Additionally, added port allocation and creation methods for these stateless groups inNPUPlatform;Implemented creation of standby communication group coordinators via
vllm_ascend/distributed/standby_state.py;Refactored the vLLM Ascend EPLB module to support expert rebalancing during elastic scaling.
Why this is needed
Currently, expert parallelism in vllm_ascend is static after deployment. This PR lays the foundation for dynamically scaling up and down expert parallelism, adapting based on existing vLLM features without modifying the vLLM codebase.
Does this PR introduce any user-facing change?
No. Simply add
--enable-elastic-ep,--enable-eplb, vLLM Ascend-specific Dynamic EPLB parameters, and environment variables to the startup command to enable elastic scaling functionality. All features are non-intrusive and require no modifications to the vLLM codebase.Summary per file
vllm_ascend/worker/worker.pyAscendElasticEPScalingExecutor, supporting virtual weight loading during scale-upvllm_ascend/worker/model_runner_v1.pyvllm_ascend/platform.pyvllm_ascend/ops/fused_moe/fused_moe.pyAscendFusedMoEinitialization during scalingvllm_ascend/distributed/elastic_ep/elastic_execute.pyAscendElasticEPScalingExecutorclass to handle operations during elastic scalingvllm_ascend/distributed/elastic_ep/standby_state.pyvllm_ascend/distributed/parallel_statevllm_ascend/eplb/core/policy/policy_default_eplb.pyvllm_ascend/eplb/core/eplb_worker.pyvllm_ascend/eplb/core/eplb_device_transfer_loader.pyvllm_ascend/eplb/eplb_updator.pyHow was this patch tested?
Models tested
Test environment (manual elastic scale up/down)
Workflow
python vllm.entrypoints.openai.api_server \ --model <model path> \ --trust-remote-code \ --enable-expert-parallel \ --enable-eplb \ --enable-elastic-ep \ --tensor-parallel-size 1 \ --data-parallel-size <dp size> \ --data-parallel-size-local <dp size> \ --data-parallel-start-rank 0 \ --data-parallel-backend ray \ --gpu-memory-utilization 0.9 \ --enforce-eager (optional) --host 0.0.0.0 \ --port <port> \ --additional-config "{"eplb_config": {"dynamic_eplb": true, "num_redundant_experts": <num redundant experts>}}"Test scenarios