[Feat.][EEP] vLLM Ascend adaptation for Elastic EP Milestone2#21
[Feat.][EEP] vLLM Ascend adaptation for Elastic EP Milestone2#211542305589 wants to merge 7 commits intofangyuchu:eep-m2-adaptationfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers the foundational implementation for Elastic Expert Parallelism (EP) on Ascend platforms within vLLM, marking a significant step towards dynamic resource scaling for MoE models. It encompasses the core mechanisms for managing expert weight distribution, reconfiguring parallel groups, and adapting expert placement strategies to accommodate changes in the number of available devices. The changes enable seamless scaling operations, ensuring model continuity and efficient resource utilization. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for Elastic Expert Parallelism (EP) on Ascend NPUs, a significant feature involving extensive changes to distributed communication, model execution, and worker lifecycle management. My review identified several critical issues that would prevent this feature from functioning correctly. These include incorrect API usage for Ascend hardware, bugs in data structure handling that would lead to crashes, and incorrect initialization logic. Addressing these issues is crucial for the stability and correctness of the elastic EP implementation.
| dp_group=standby_dp_group, | ||
| expert_weights=model.expert_weights, | ||
| ) | ||
| torch.cuda.synchronize() |
There was a problem hiding this comment.
| dp_group=dp_group, | ||
| expert_weights=model.expert_weights, | ||
| ) | ||
| torch.cuda.synchronize() |
| new_expert_maps_clone = new_expert_maps.clone() | ||
|
|
||
| if scale: | ||
| shape = list(new_expert_maps_clone) |
There was a problem hiding this comment.
The expression list(new_expert_maps_clone) will not return the shape of the tensor. It will iterate over the first dimension of the tensor and return a list of tensors. To get the shape as a list of integers, you should use list(new_expert_maps_clone.shape). Otherwise, the subsequent call to torch.full(shape, ...) will fail.
| shape = list(new_expert_maps_clone) | |
| shape = list(new_expert_maps_clone.shape) |
| self.shared_dict = self.manager.dict( | ||
| { | ||
| "expert_map": None, | ||
| "moe_load": None, | ||
| "expert_maps": None, | ||
| "scale": False, | ||
| "old_ep_size": None, | ||
| "new_ep_size": None, | ||
| } | ||
| ) | ||
| self.shared_dict = self.manager.dict({"expert_map": None, "moe_load": None, "expert_maps": None}) |
There was a problem hiding this comment.
The self.shared_dict is initialized with keys required for elastic scaling, but it is immediately overwritten on line 352 with an initialization that is missing these keys (scale, old_ep_size, new_ep_size). This will cause the elastic scaling feature to fail. Please remove the second, incorrect assignment.
| self.shared_dict = self.manager.dict( | |
| { | |
| "expert_map": None, | |
| "moe_load": None, | |
| "expert_maps": None, | |
| "scale": False, | |
| "old_ep_size": None, | |
| "new_ep_size": None, | |
| } | |
| ) | |
| self.shared_dict = self.manager.dict({"expert_map": None, "moe_load": None, "expert_maps": None}) | |
| self.shared_dict = self.manager.dict( | |
| { | |
| "expert_map": None, | |
| "moe_load": None, | |
| "expert_maps": None, | |
| "scale": False, | |
| "old_ep_size": None, | |
| "new_ep_size": None, | |
| } | |
| ) |
| self.comm_group.cpu_group.all_gather(gather_buffer, local_load) | ||
| self.comm_group.cpu_group.all_gather(gather_buffer, local_load).wait() |
There was a problem hiding this comment.
The all_gather operation is called twice. The first call on line 137 is redundant as its result is discarded. The second call on line 138 correctly initiates the operation and waits for it to complete. Please remove the redundant first call.
| self.comm_group.cpu_group.all_gather(gather_buffer, local_load) | |
| self.comm_group.cpu_group.all_gather(gather_buffer, local_load).wait() | |
| self.comm_group.cpu_group.all_gather(gather_buffer, local_load).wait() |
There was a problem hiding this comment.
The "done" comment indicates that the suggested fix for the redundant all_gather call has been applied. Thank you for addressing that.
Regarding the summary request, this pull request introduces significant enhancements to support Elastic Expert Parallelism (EP) on Ascend devices within vLLM. The core objective is to enable dynamic scaling of expert parallelism, allowing workers to efficiently join or leave the EP group.
Key changes and features introduced include:
- Dynamic Communication Group Management: The system now utilizes custom communication groups for more flexible distributed operations, moving away from static
torch.distributedcalls in several EPLB components. - Expert Resharding and Weight Transfer: New mechanisms are implemented to efficiently re-distribute experts and transfer model weights when the EP group size changes. This includes specific logic for new workers to receive weights and potentially load dummy weights during scale-up scenarios.
- Adaptive Expert Placement Policy: The Expert Placement Load Balancer (EPLB) policy has been updated to be aware of and adapt to changes in the EP group size. This involves handling redundant experts, ensuring correct expert placement, and managing expert movement within NPUs during scaling operations.
- Resource Allocation and Context Management: The platform layer now dynamically allocates ports for various stateless communication groups (e.g.,
mc2,dynamic_eplb,fc3_quant_x) to prevent conflicts during elastic EP. Memory pool contexts are also managed for sleep mode during these operations. - Integrated Elastic EP Execution: A new
AscendElasticEPScalingExecutoris introduced to orchestrate the entire elastic EP lifecycle, including creating standby groups, transferring weights, switching active groups, reconfiguring MoE modules, and managing expert resharding. - Model Loading Adaptations: The model loading process (
ModelRunner) has been modified to support loading dummy weights and receiving expert mappings during elastic EP scale-up, ensuring a smooth transition for new workers.
In essence, this PR lays the groundwork for a more flexible and scalable vLLM deployment on Ascend by enabling dynamic adjustments to the expert parallelism configuration.
|
/gemini summary |
Summary of ChangesThis pull request introduces the foundational implementation for Elastic Expert Parallelism (EP) on Ascend platforms within vLLM, enabling dynamic scaling of Mixture-of-Experts (MoE) models. It provides the core mechanisms for managing expert weight distribution, reconfiguring parallel groups, and adapting expert placement strategies to efficiently handle changes in the number of available devices, ensuring seamless scaling operations and optimized resource utilization. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
813118e to
8f38b81
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces significant functionality for elastic expert parallelism (EP) on Ascend NPUs, enabling dynamic scaling of resources. The changes span across distributed execution logic, expert placement load balancing (EPLB), and the worker/model runner infrastructure to support this new capability. Key additions include the AscendElasticEPScalingExecutor for managing scaling operations, modifications to the EPLB policy for improved robustness, and integration into the worker's model loading and execution flow.
My review has identified a few critical issues. The most significant concerns are related to the use of constant tags in asynchronous point-to-point communication calls, which can lead to race conditions and deadlocks. I have also found a bug in the expert placement policy that could cause an IndexError. I've provided detailed comments and suggestions for these issues.
Following the repository's style guide, I'm also suggesting an updated pull request title and summary:
Suggested PR Title:
[main][EP][Feat] Elastic EP adaptation for AscendSuggested PR Summary:
### What this PR does / why we need it?
This PR introduces support for elastic expert parallelism (EP) on Ascend NPUs. This allows for dynamic scaling of EP resources to optimize performance and resource utilization. The key changes include:
- A new `AscendElasticEPScalingExecutor` to handle scaling logic, including weight transfers and MoE module reconfiguration during scale-up and scale-down operations.
- Modifications to the Expert Placement Load Balancing (EPLB) core components (`eplb_worker`, `eplb_device_transfer_loader`, `policy_default_eplb`) to support dynamic changes in the EP size.
- Updates to the worker and model runner to integrate the elastic execution flow, such as loading dummy weights for newly added workers.
- Refactoring of communication logic to utilize vLLM's device communicators instead of `torch.distributed` directly.
- Platform-level changes to support stateless process group initialization required for elastic operations on Ascend.
### Does this PR introduce _any_ user-facing change?
No, this is a backend feature for performance and resource management and does not introduce user-facing API changes.
### How was this patch tested?
CI passed with new added/existing test.Note: Security Review did not run due to the size of the PR.
| if self.comm_op_list: | ||
| ret_list = dist.batch_isend_irecv(self.comm_op_list) | ||
| reqs.extend(ret_list) | ||
| for op_info in self.comm_op_list: | ||
| peer_rank = op_info["peer_rank"] | ||
| tensor = op_info["tensor"] | ||
| op = op_info["op"] | ||
| if op == "send": | ||
| worker = get_ep_group().device_group.send([tensor], peer_rank, tag=0) | ||
| else: | ||
| worker = get_ep_group().device_group.recv([tensor], peer_rank, tag=0) | ||
| reqs.append(worker) |
There was a problem hiding this comment.
Using a constant tag=0 for multiple asynchronous point-to-point communication operations between the same pair of ranks is unsafe. It can lead to race conditions where messages are received in an incorrect order, causing data corruption or deadlocks. The previous implementation using dist.batch_isend_irecv handled this correctly.
To fix this, a unique tag should be used for each send/receive pair. You could generate a unique tag based on the layer, expert, and tensor index. This would require passing the tensor index into the comm_op_list and then using it to construct a unique tag during the send/recv operations.
| if min_box_index == -1: | ||
| # Try to place in the last box first | ||
| if box_counts[-1] < items_per_box or (box_counts[-1] == items_per_box and remaining_items > 0): | ||
| min_box_index = -1 |
There was a problem hiding this comment.
There appears to be a bug in the fallback logic for expert placement. When min_box_index is -1, and the last box is found to have capacity, min_box_index is incorrectly set to -1 again. This will cause an IndexError on line 121 when trying to access boxes[min_box_index]. It should be set to card_num - 1 to correctly reference the last box.
| min_box_index = -1 | |
| min_box_index = card_num - 1 |
| workers = [] | ||
| for op in comm_op_list: | ||
| src_rank = op["src_rank"] | ||
| dst_rank = op["dst_rank"] | ||
| tensor = op["tensor"] | ||
| if src_rank == self.rank_id: | ||
| continue | ||
| comm_op_list.append(dist.P2POp(dist.irecv, src_tensor, src_rank)) | ||
| if comm_op_list: | ||
| reqs = dist.batch_isend_irecv(comm_op_list) | ||
| workers.append(self.comm_group.device_group.send([tensor], dst_rank, tag=0)) | ||
| elif dst_rank == self.rank_id: | ||
| workers.append(self.comm_group.device_group.recv([tensor], src_rank, tag=0)) | ||
|
|
||
| for req in reqs: | ||
| req.wait() | ||
| for worker in workers: | ||
| worker.wait() |
There was a problem hiding this comment.
Using a constant tag=0 for all point-to-point communications in warm_up_eplb is unsafe and can lead to race conditions or deadlocks. Since the comm_op_list is identical on all ranks and is processed in a deterministic order, you can use the loop index as a unique tag for each send/receive pair to ensure correctness.
| workers = [] | |
| for op in comm_op_list: | |
| src_rank = op["src_rank"] | |
| dst_rank = op["dst_rank"] | |
| tensor = op["tensor"] | |
| if src_rank == self.rank_id: | |
| continue | |
| comm_op_list.append(dist.P2POp(dist.irecv, src_tensor, src_rank)) | |
| if comm_op_list: | |
| reqs = dist.batch_isend_irecv(comm_op_list) | |
| workers.append(self.comm_group.device_group.send([tensor], dst_rank, tag=0)) | |
| elif dst_rank == self.rank_id: | |
| workers.append(self.comm_group.device_group.recv([tensor], src_rank, tag=0)) | |
| for req in reqs: | |
| req.wait() | |
| for worker in workers: | |
| worker.wait() | |
| workers = [] | |
| for i, op in enumerate(comm_op_list): | |
| src_rank = op["src_rank"] | |
| dst_rank = op["dst_rank"] | |
| tensor = op["tensor"] | |
| if src_rank == self.rank_id: | |
| workers.append(self.comm_group.device_group.send([tensor], dst_rank, tag=i)) | |
| elif dst_rank == self.rank_id: | |
| workers.append(self.comm_group.device_group.recv([tensor], src_rank, tag=i)) | |
| for worker in workers: | |
| worker.wait() |
There was a problem hiding this comment.
Pull request overview
This PR adds Ascend-specific support for elastic Expert Parallel (EP) scaling, including “dummy weight” model loading for scale-up launches, stateless group port allocation, and EPLB (expert load balancing) updates to support resharding when DP/EP membership changes.
Changes:
- Add an Ascend elastic-EP scaling executor and integrate it into the worker/model loading lifecycle.
- Update EPLB data collection and expert weight transfers to use stateless group coordinators and support scale up/down reshuffling.
- Add Ascend platform support for allocating extra stateless group ports and creating stateless HCCL process groups.
Reviewed changes
Copilot reviewed 10 out of 12 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| vllm_ascend/worker/worker.py | Hooks in Ascend elastic-EP executor; supports dummy-weight load and EPLB warmup during compile/warmup. |
| vllm_ascend/worker/model_runner_v1.py | Adds dummy-weight model loading path; extends EPLB shared state; allows re-warmup for dynamic EPLB. |
| vllm_ascend/worker/block_table.py | Adjusts block table clearing to explicitly clear both GPU/CPU buffers. |
| vllm_ascend/platform.py | Allocates extra stateless ports for elastic EP; introduces stateless HCCL ProcessGroup initialization helper. |
| vllm_ascend/ops/fused_moe/fused_moe.py | Skips MoE comm setup during scale-up dummy launch via env var gate. |
| vllm_ascend/eplb/eplb_updator.py | Switches workload gathering/warmup comms to dynamic EPLB stateless groups (CPU + device groups). |
| vllm_ascend/eplb/core/policy/policy_default_eplb.py | Adds EP-size override for scaling; modifies redundancy packing and duplicate-removal logic. |
| vllm_ascend/eplb/core/eplb_worker.py | Adds scaling-aware EPLB update behavior and placement checks. |
| vllm_ascend/eplb/core/eplb_device_transfer_loader.py | Reworks D2D transfer task representation to use EP device_group send/recv workers. |
| vllm_ascend/distributed/elastic_ep/elastic_execute.py | New Ascend elastic-EP scaling executor implementation (weight transfer, group switching, reshuffling). |
| vllm_ascend/distributed/elastic_ep/standby_state.py | New/updated elastic-EP standby state module (file present in PR). |
| vllm_ascend/distributed/elastic_ep/init.py | Package init for elastic-EP Ascend modules (file present in PR). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # when scale up, ensure that new ranks do not own any experts | ||
| # by setting their expert_map to all -1 | ||
| new_rank_expert_maps = torch.full(shape, -1, dtype=new_expert_maps.dtype) | ||
| self.old_expert_maps = torch.cat([self.old_expert_maps, new_rank_expert_maps]) |
There was a problem hiding this comment.
In the scale-up path, torch.cat([self.old_expert_maps, new_rank_expert_maps]) omits dim=1. Given the expert map tensor shape is [L, ep_size, E], concatenating without dim=1 will concatenate along the layer dimension and produce an invalid shape (or raise). Add dim=1 here to extend the EP/rank dimension.
| self.old_expert_maps = torch.cat([self.old_expert_maps, new_rank_expert_maps]) | |
| self.old_expert_maps = torch.cat([self.old_expert_maps, new_rank_expert_maps], dim=1) |
| else: | ||
| # Find any box with capacity | ||
| for i in range(card_num): | ||
| if box_weights[i] < items_per_box or (box_counts[i] == items_per_box and remaining_items > 0): |
There was a problem hiding this comment.
This capacity check uses box_weights[i] < items_per_box, but box_weights is the sum of weights (float) while items_per_box is a count (int). This will select boxes based on an unrelated comparison and can produce incorrect packing decisions. It looks like this should be checking box_counts[i] (or another count-based capacity condition) instead of box_weights[i].
| if box_weights[i] < items_per_box or (box_counts[i] == items_per_box and remaining_items > 0): | |
| if box_counts[i] < items_per_box or (box_counts[i] == items_per_box and remaining_items > 0): |
| parallel_config._platform_stateless_groups_ports["dynamic_eplb"] = [ | ||
| [all_ports.pop() for _ in range(3)] for _ in range(len(group_ranks)) | ||
| ] | ||
| parallel_config._platform_stateless_groups_ports["fc3_quant_x"] = [ | ||
| [all_ports.pop() for _ in range(3)] for _ in range(len(group_ranks)) | ||
| ] |
There was a problem hiding this comment.
num_ports is increased conditionally (only when dynamic_eplb / multistream_overlap_gate are enabled), but ports are always popped for both "dynamic_eplb" and "fc3_quant_x" groups. When either feature flag is disabled, all_ports may not contain enough entries and .pop() can fail (or you end up consuming ports you didn't reserve). Make these allocations conditional to match the num_ports calculation (or always reserve ports for all groups).
| parallel_config._platform_stateless_groups_ports["dynamic_eplb"] = [ | |
| [all_ports.pop() for _ in range(3)] for _ in range(len(group_ranks)) | |
| ] | |
| parallel_config._platform_stateless_groups_ports["fc3_quant_x"] = [ | |
| [all_ports.pop() for _ in range(3)] for _ in range(len(group_ranks)) | |
| ] | |
| if ascend_config.eplb_config.dynamic_eplb: | |
| parallel_config._platform_stateless_groups_ports["dynamic_eplb"] = [ | |
| [all_ports.pop() for _ in range(3)] for _ in range(len(group_ranks)) | |
| ] | |
| if ascend_config.multistream_overlap_gate: | |
| parallel_config._platform_stateless_groups_ports["fc3_quant_x"] = [ | |
| [all_ports.pop() for _ in range(3)] for _ in range(len(group_ranks)) | |
| ] |
| def stateless_init_device_torch_dist_pg( | ||
| cls, | ||
| backend: str, | ||
| prefix_store: PrefixStore, | ||
| group_rank: int, | ||
| group_size: int, | ||
| timeout: timedelta, | ||
| **kwargs, | ||
| ): | ||
| """ | ||
| Initialize a stateless HCCL process group for CUDA devices. | ||
| This method creates a ProcessGroup with the specified backend configuration, | ||
| typically used for GPU communication. It sets up the necessary backend | ||
| options and registers the backend with the process group. | ||
| Args: | ||
| backend: The distributed backend to use (e.g., 'hccl') | ||
| prefix_store: The prefix store for distributed coordination | ||
| group_rank: The rank of the current process within the group | ||
| group_size: The total number of processes in the group | ||
| timeout: Maximum time to wait for the operation to complete | ||
| **kwargs: Additional backend-specific options | ||
| warning: | ||
| Uses internal PyTorch API (torch._C._distributed_c10d.ProcessGroupHCCL) | ||
| which may change in future PyTorch versions. Compatibility should be | ||
| verified with each PyTorch upgrade. | ||
| Compatibility Risk: | ||
| - High risk of breakage in PyTorch 2.4+ | ||
| - No semantic versioning guarantees | ||
| - Requires testing with new PyTorch releases | ||
| Returns: | ||
| A ProcessGroup object configured with the specified backend | ||
| """ | ||
|
|
||
| # INTERNAL API USAGE - COMPATIBILITY RISK | ||
| # This internal import is necessary for stateless process group functionality | ||
| # but carries compatibility risks. Monitor PyTorch release notes for changes. | ||
| # TODO: Migrate to public API when available in future PyTorch versions | ||
| from torch_npu._C._distributed_c10d import ProcessGroupHCCL | ||
|
|
||
| pg = ProcessGroup(prefix_store, group_rank, group_size) | ||
|
|
||
| backend_options = ProcessGroupHCCL.Options() | ||
| backend_options._timeout = timeout | ||
|
|
||
| # Create Backend object | ||
| backend = Backend("hccl") | ||
|
|
||
| # Set default backend for ProcessGroup | ||
| pg._set_default_backend(Backend.backend_type_map[backend]) | ||
|
|
||
| device = torch.device("npu") |
There was a problem hiding this comment.
stateless_init_device_torch_dist_pg docstring says "for CUDA devices" / "GPU communication", but the implementation hardcodes torch.device("npu") and Backend("hccl"). Also, the backend parameter is shadowed and effectively ignored. Update the docstring to match NPU/HCCL behavior, and either honor the backend argument (validate and use it) or remove it to avoid misleading callers.
| new_ep_size = old_ep_size | ||
| self.worker.model_runner.shared_dict["scale"] = True |
There was a problem hiding this comment.
In perform_eplb_reshuffle, the scale-up branch (new_dp_size is None) sets new_ep_size = old_ep_size and does not populate shared_dict["old_ep_size"]. But scale-up is exactly when EP size increases; keeping new_ep_size equal to old_ep_size means the EPLB worker/policy never sees the new EP size and cannot compute the right redundancy/placement for the expanded group. Pass both old/new EP sizes (or derive new_ep_size from the post-reconfig group sizes) so the policy can rebalance for the actual scaled-up EP size.
| new_ep_size = old_ep_size | |
| self.worker.model_runner.shared_dict["scale"] = True | |
| # Derive the new EP size from the current EP group after reconfiguration | |
| new_ep_size = get_ep_group().size() | |
| self.worker.model_runner.shared_dict["scale"] = True | |
| self.worker.model_runner.shared_dict["old_ep_size"] = old_ep_size |
There was a problem hiding this comment.
不,因为在scale up场景下,EPLB启动时,新引擎和原有引擎已经完成了parallel_config的更新,此时前面得到的old_ep_size就是scale up后的ep_size,而且eplb_worker.py中也定义了old_ep_size = self.shared_dict["old_ep_size"] or self.old_expert_maps.shape[1]。但为了可读性可以改为:
def perform_eplb_reshuffle(self, new_dp_size: int | None = None) -> None:
if get_ep_group().rank == 0:
logger.info("[Elastic EP] Starting expert resharding...")
parallel_config = self.worker.vllm_config.parallel_config
tp_size = parallel_config.tensor_parallel_size
ep_size = parallel_config.data_parallel_size * tp_size
if new_dp_size is None:
# scale up
new_ep_size = ep_size
self.worker.model_runner.shared_dict["scale"] = True
self.worker.model_runner.shared_dict["new_ep_size"] = new_ep_size
else:
# scale down
new_ep_size = new_dp_size * tp_size
self.worker.model_runner.shared_dict["scale"] = True
self.worker.model_runner.shared_dict["old_ep_size"] = ep_size
self.worker.model_runner.shared_dict["new_ep_size"] = new_ep_size
self.reshard_experts()
self.worker.model_runner.shared_dict["scale"] = False
self.worker.model_runner.shared_dict["old_ep_size"] = None
self.worker.model_runner.shared_dict["new_ep_size"] = None
# NOTE(yongji): check whether we need to synchronize here
torch_npu.npu.synchronize()
if get_ep_group().rank == 0:
logger.info("[Elastic EP] Expert resharding completed")
| for layer_id in range(num_layers): | ||
| # check if any logical expert is not placed on any rank | ||
| if torch.unique(new_placement[layer_id]).numel() < torch.unique(old_placement[layer_id]).numel(): | ||
| if torch.unique(new_placement[layer_id]).numel() < self.num_experts: |
There was a problem hiding this comment.
torch.unique(new_placement[layer_id]).numel() counts -1 as a distinct value. In scale up/down flows you explicitly introduce -1 placeholders, so this check can incorrectly pass even if some logical experts are missing (e.g., one missing expert + -1 still yields num_experts uniques). Filter out -1 before counting uniques (or explicitly verify that all expert IDs [0..num_experts-1] are present).
| if torch.unique(new_placement[layer_id]).numel() < self.num_experts: | |
| layer_placement = new_placement[layer_id] | |
| valid_expert_mask = layer_placement != -1 | |
| unique_valid_experts = torch.unique(layer_placement[valid_expert_mask]) if valid_expert_mask.any() else torch.tensor([], device=layer_placement.device, dtype=layer_placement.dtype) | |
| if unique_valid_experts.numel() < self.num_experts: |
There was a problem hiding this comment.
new_placment是new_placement[layer_id][rank_id][phy_pos] = expert_id,因此不存在-1值
There was a problem hiding this comment.
感谢您的回复。关于 new_placement 中是否存在 -1 值,我注意到在 vllm_ascend/eplb/core/eplb_worker.py 的 DIFF 5 中,当进行扩缩容操作时,代码会显式地将 expert_map(最终会影响 new_placement 的内容)设置为 -1,以表示某些 rank 上没有放置专家。例如,在第 98 行和第 103 行:
98: shutdown_rank_expert_maps = torch.full(shape, -1, dtype=new_expert_maps.dtype)
99: new_expert_maps = torch.cat([self.old_expert_maps, shutdown_rank_expert_maps], dim=1)
...
103: new_rank_expert_maps = torch.full(shape, -1, dtype=new_expert_maps.dtype)
104: self.old_expert_maps = torch.cat([self.old_expert_maps, new_rank_expert_maps])因此,new_placement 确实可能包含 -1 值。Copilot 的建议是为了确保在计算 torch.unique(new_placement[layer_id]).numel() 时,-1 不会被错误地计为一个有效的专家ID,从而导致对实际放置的逻辑专家数量的错误判断。过滤掉 -1 值是确保此检查正确性的关键。
| if torch.unique(new_placement[layer_id]).numel() < self.num_experts: | |
| layer_placement = new_placement[layer_id] | |
| valid_expert_mask = layer_placement != -1 | |
| unique_valid_experts = torch.unique(layer_placement[valid_expert_mask]) if valid_expert_mask.any() else torch.tensor([], device=layer_placement.device, dtype=layer_placement.dtype) | |
| if unique_valid_experts.numel() < self.num_experts: |
| logger = init_logger(__name__) | ||
|
|
||
|
|
||
| def batch_transfer_weights( |
| standby_mc2_group = get_standby_mc2_group() | ||
| assert standby_mc2_group is not None | ||
|
|
||
| def transfer_weights(self, old_dp_size: int, new_dp_size: int) -> None: |
| self.worker.model_runner.model.compile(fullgraph=True, backend=backend) | ||
|
|
||
| # release all previously captured CUDA graphs | ||
| if isinstance(self.worker.model_runner.model, ACLGraphWrapper): |
There was a problem hiding this comment.
check if this is the only change needed for acl graph
| if get_ep_group().rank == 0: | ||
| logger.info("[Elastic EP] Expert resharding completed") | ||
|
|
||
| def receive_weights(self) -> None: |
1f51394 to
7619dfb
Compare
Signed-off-by: nifeng <1542305589@qq.com>
Signed-off-by: nifeng <1542305589@qq.com>
Signed-off-by: nifeng <1542305589@qq.com>
7619dfb to
04ba16f
Compare
Signed-off-by: nifeng <1542305589@qq.com>
…arallel_state.py Signed-off-by: nifeng <1542305589@qq.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 18 changed files in this pull request and generated 12 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if enable_elastic_ep: | ||
| world_size = get_world_group.world_size | ||
| tp_pp_pcp_size = global_tp_size * global_pp_size * parallel_config.prefill_context_parallel_size | ||
| local_all_ranks = torch.arange(tp_pp_pcp_size).reshape( | ||
| global_pp_size, parallel_config.prefill_context_parallel_size, global_tp_size | ||
| ) | ||
| backend = "hccl" |
There was a problem hiding this comment.
get_world_group is a function; get_world_group.world_size will raise at runtime. This branch should call get_world_group() (e.g., get_world_group().world_size) to retrieve the coordinator before accessing world_size/local_rank.
| """ | ||
|
|
||
| _all_instances: ClassVar[weakref.WeakSet["ACLGraphWrapper"]] = weakref.WeakSet() | ||
| graph_pool = tuple[int, int] = current_platform.get_global_graph_pool() |
There was a problem hiding this comment.
This class attribute assignment is invalid Python syntax (graph_pool = tuple[int, int] = ...) and will prevent the module from importing. Use a normal annotation (e.g., graph_pool: tuple[int, int] = ...) or declare it as a ClassVar.
| graph_pool = tuple[int, int] = current_platform.get_global_graph_pool() | |
| graph_pool: ClassVar[tuple[int, int]] = current_platform.get_global_graph_pool() |
| for buffer_tensor_id, recv_info in enumerate(expert_recv_info): | ||
| recv_rank, global_expert_id_to_recv = recv_info | ||
| op_info = {"peer_rank": recv_rank, "tensors": [], "expert_id": global_expert_id_to_recv, "op": "send"} | ||
| for buffer_tensor in self.eplb_adaptor.buffer_tensor_list[buffer_tensor_id]: | ||
| self.comm_op_list.append(dist.P2POp(dist.irecv, buffer_tensor, recv_rank)) | ||
| op_info["tensors"].append(buffer_tensor) | ||
| self.comm_op_list.append(op_info) | ||
| local_expert_to_replace = self.updated_expert_map[global_expert_id_to_recv].item() |
There was a problem hiding this comment.
In the recv task creation, op is set to "send". This makes asyn_expert_weight_transfer call device_group.send(...) for receive buffers, so the transfer will never receive data. Set the recv-side op to "recv".
| for i, tensor in enumerate(tensors): | ||
| if op == "send": | ||
| worker = get_ep_group().device_group.send([tensor], peer_rank, tag=expert_id * (i + 1)) | ||
| else: | ||
| worker = get_ep_group().device_group.recv([tensor], peer_rank, tag=expert_id * (i + 1)) |
There was a problem hiding this comment.
The message tag is computed as expert_id * (i + 1). When expert_id == 0, all tensors for that expert use tag 0, which can collide and mismatch send/recv pairs. Use a tag scheme that is unique per (layer, expert_id, tensor_index) or at least per (expert_id, tensor_index) even when expert_id is 0 (e.g., expert_id * N + i).
| for i, tensor in enumerate(tensors): | |
| if op == "send": | |
| worker = get_ep_group().device_group.send([tensor], peer_rank, tag=expert_id * (i + 1)) | |
| else: | |
| worker = get_ep_group().device_group.recv([tensor], peer_rank, tag=expert_id * (i + 1)) | |
| num_tensors = len(tensors) | |
| for i, tensor in enumerate(tensors): | |
| tag = expert_id * num_tensors + i | |
| if op == "send": | |
| worker = get_ep_group().device_group.send([tensor], peer_rank, tag=tag) | |
| else: | |
| worker = get_ep_group().device_group.recv([tensor], peer_rank, tag=tag) |
| dynamic_eplb_ports = [group_ports["dynamic_eplb"].pop() for _ in standby_ep_ranks] | ||
| if get_ascend_config().eplb_config.dynamic_eplb: | ||
| _STANDBY_DYNAMIC_EPLB = _init_stateless_group( | ||
| standby_ep_ranks, | ||
| "dynamic_eplb", | ||
| dynamic_eplb_ports, | ||
| master_ip, | ||
| backend, | ||
| ) |
There was a problem hiding this comment.
dynamic_eplb_ports is popped from group_ports["dynamic_eplb"] unconditionally, even when dynamic_eplb is disabled. This can exhaust the port pool and break later group creation. Only pop/consume ports inside the if get_ascend_config().eplb_config.dynamic_eplb: block.
| index = 0 | ||
| while index < len(weights): | ||
| if ( | ||
| len(route_expert_redundancy[weights[index][0]]) < card_num - 1 | ||
| and weights[index][0] != item_id | ||
| and weights[index][0] not in boxes[i] | ||
| ): | ||
| break | ||
| index += 1 | ||
| boxes[i][cur_position] = weights[index][0] | ||
| tmp_raw_weight = weights[index][1] * (len(route_expert_redundancy[weights[index][0]]) + 1) | ||
| route_expert_redundancy[weights[index][0]].append(0) | ||
| avg_weight = tmp_raw_weight / (len(route_expert_redundancy[weights[index][0]]) + 1) |
There was a problem hiding this comment.
In the duplicate-elimination loop, index can reach len(weights) if no replacement candidate satisfies the constraints, but the code still accesses weights[index], which will raise IndexError. Handle the "no candidate" case explicitly (e.g., break/continue without replacement, or relax constraints) before indexing.
| break | ||
| new_placement_this_rank = new_placement_check.clone() | ||
| new_placement_this_rank[old_indices] = expert_not_move | ||
| available_positions = list(set(range(len(new_placement_check))) - set(old_indices)) |
There was a problem hiding this comment.
available_positions = list(set(range(len(new_placement_check))) - set(old_indices)) loses ordering because sets are unordered. This can make the rearrangement non-deterministic and potentially shuffle experts differently across runs. Build available_positions deterministically (e.g., a list comprehension over range(...)).
| available_positions = list(set(range(len(new_placement_check))) - set(old_indices)) | |
| available_positions = [ | |
| idx for idx in range(len(new_placement_check)) if idx not in old_indices | |
| ] |
| with DeviceMemoryProfiler() as m: # noqa: SIM117 | ||
| self.model = get_model(vllm_config=self.vllm_config) | ||
| if load_dummy_weights: | ||
| self.load_config.load_format = "dummy" | ||
| self.model = get_model(vllm_config=self.vllm_config, load_config=self.load_config) |
There was a problem hiding this comment.
load_model(load_dummy_weights=True) mutates self.load_config.load_format to "dummy" but never restores it. If load_model is called again in the same process (e.g., reload, different engine lifecycle), it may keep loading dummy weights unintentionally. Preserve the previous value and restore it after model creation (or pass a copied LoadConfig).
| client = OpenAI(base_url="http://localhost:8009/v1", api_key="EMPTY") | ||
|
|
||
| messages = [ | ||
| {"role": "system", "content": "You are a helpful assistant."}, | ||
| {"role": "user", "content": "写一句随机的名言警句。"}, | ||
| ] | ||
|
|
||
|
|
||
| def send_request(i): | ||
| try: | ||
| response = client.chat.completions.create( | ||
| model="/AIdata/JW/Qwen3-30B-A3B", | ||
| messages=messages, | ||
| max_tokens=10000, |
There was a problem hiding this comment.
This example hardcodes a local endpoint (http://localhost:8009/v1) and a machine-specific model path (/AIdata/JW/Qwen3-30B-A3B). For a repo example, these should be configurable via CLI args or environment variables so it’s runnable in other environments.
| return f"[{i}] 请求失败: {e}" | ||
|
|
||
|
|
||
| # 10 个线程并发,每个线程跑 100 次 = 总共 1000 次 |
There was a problem hiding this comment.
The comment says "10 threads" but the code uses ThreadPoolExecutor(max_workers=100). Update the comment or the value to avoid misleading readers.
| # 10 个线程并发,每个线程跑 100 次 = 总共 1000 次 | |
| # 100 个线程并发,每个线程跑 100 次 = 总共 1000 次 |
Signed-off-by: nifeng <1542305589@qq.com>
fangyuchu
left a comment
There was a problem hiding this comment.
Overall, since this PR involves a large amount of code, especially sections with substantial copied logic from EEP, it would be helpful to add comments explaining those parts. In particular, a brief note on why the code was copied (instead of reused or refactored) would provide useful context. This would make it easier for reviewers to focus on the actual changes introduced in this PR.
| unique_name: str = "", | ||
| global_ranks: list[int] | None = None, | ||
| global_world_size: int | None = None, | ||
| tcp_store_group: StatelessProcessGroup | None = None, |
There was a problem hiding this comment.
Are these parameters added by EEP in vLLM as well?
| self.worker.model_runner.model.compile(fullgraph=True, backend=backend) | ||
|
|
||
| # release all previously captured ACL graphs | ||
| ACLGraphWrapper.clear_all_graphs() |
There was a problem hiding this comment.
Can we call ascend related methods first and then call super().switch_and_prepare()?
There was a problem hiding this comment.
No, super().switch_and_prepare() uses class instances like EplbState that don't exist in vLLM Ascend.Calling it directly will cause error.
Signed-off-by: nifeng <1542305589@qq.com>
What this PR does / why we need it?
This PR introduces the adaptation for vLLM's feature — Elastic EP Milestone 2 on the Ascend side. This feature #34861 has been recently merged into vLLM.
Key contributions
Added an
AscendElasticEPScalingExecutorclass inheriting fromElasticEPScalingExecutorto implement vLLM Ascend operations during elastic scaling;Under the
--enable-elastic-epargument, replaced vLLM Ascend-specific communication group coordinators (such as MC2) fromGroupCoordinatortoStatelessGroupCoordinator. Additionally, added port allocation and creation methods for these stateless groups inNPUPlatform;Implemented creation of standby communication group coordinators via
vllm_ascend/distributed/standby_state.py;Refactored the vLLM Ascend EPLB module to support expert rebalancing during elastic scaling.
Why this is needed
Currently, expert parallelism in vllm_ascend is static after deployment. This PR lays the foundation for dynamically scaling up and down expert parallelism, adapting based on existing vLLM features without modifying the vLLM codebase.
Does this PR introduce any user-facing change?
No. Simply add
--enable-elastic-ep,--enable-eplb, vLLM Ascend-specific Dynamic EPLB parameters, and environment variables to the startup command to enable elastic scaling functionality. All features are non-intrusive and require no modifications to the vLLM codebase.Summary per file
vllm_ascend/worker/worker.pyAscendElasticEPScalingExecutor, supporting virtual weight loading during scale-upvllm_ascend/worker/model_runner_v1.pyvllm_ascend/platform.pyvllm_ascend/ops/fused_moe/fused_moe.pyAscendFusedMoEinitialization during scalingvllm_ascend/distributed/elastic_ep/elastic_execute.pyAscendElasticEPScalingExecutorclass to handle operations during elastic scalingvllm_ascend/distributed/elastic_ep/standby_state.pyvllm_ascend/distributed/parallel_statevllm_ascend/eplb/core/policy/policy_default_eplb.pyvllm_ascend/eplb/core/eplb_worker.pyvllm_ascend/eplb/core/eplb_device_transfer_loader.pyvllm_ascend/eplb/eplb_updator.pyHow was this patch tested?
Models tested
Test environment (manual elastic scale-up)
Workflow
python vllm.entrypoints.openai.api_server \ --model /AIdata/JW/Qwen3-30B-A3B \ --trust-remote-code \ --enable-expert-parallel \ --enable-eplb \ --enable-elastic-ep \ --tensor-parallel-size 1 \ --data-parallel-size <dp size> \ --data-parallel-size-local <dp size> \ --data-parallel-start-rank 0 \ --data-parallel-backend ray \ --gpu-memory-utilization 0.9 \ --host 0.0.0.0 \ --port <port> \ --additional-config "{"eplb_config": {"dynamic_eplb": true, "num_redundant_experts": <num redundant experts>}}"Test scenarios