Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ exclude = [
"vllm_ascend/distributed/kv_transfer/utils/**",
"vllm_ascend/kv_offload/**",
"vllm_ascend/lora/**",
# (6)
"vllm_ascend/eplb/**",
"vllm_ascend/model_loader/netloader/**",
"vllm_ascend/patch/**",
# (7)
"vllm_ascend/quantization/**",
"vllm_ascend/sample/*.py",
Expand Down Expand Up @@ -92,6 +88,7 @@ exclude = [
"vllm_ascend/distributed/parallel_state.py",
"vllm_ascend/distributed/utils.py",
"vllm_ascend/xlite/*.py",
"vllm_ascend/patch/worker/patch_*.py",
# (11)
"vllm_ascend/ops/fused_moe/**",
]
Expand Down
10 changes: 3 additions & 7 deletions vllm_ascend/eplb/adaptor/abstract_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
from typing import Any


class EplbAdaptor():

class EplbAdaptor:
def __init__(self, **args):
pass

Expand All @@ -29,12 +28,9 @@ def get_rank_expert_workload(self):
raise NotImplementedError

@abstractmethod
def do_update_expert_map(self, layer_id: Any,
updated_expert_map: Any) -> Any:
def do_update_expert_map(self, layer_id: Any, updated_expert_map: Any) -> Any:
raise NotImplementedError

@abstractmethod
def do_update_expert_weight(self, layer_id: Any,
local_expert_to_replace: Any,
buffer_tensor_id: Any) -> Any:
def do_update_expert_weight(self, layer_id: Any, local_expert_to_replace: Any, buffer_tensor_id: Any) -> Any:
raise NotImplementedError
100 changes: 45 additions & 55 deletions vllm_ascend/eplb/adaptor/vllm_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@


class VllmEplbAdaptor(EplbAdaptor):

def __init__(self, model, **args):
super().__init__(**args)
self.model = model
Expand All @@ -36,52 +35,53 @@ def __init__(self, model, **args):
self.num_dense_layers = getattr(self.model.config, "first_k_dense_replace", 0)
self.num_moe_layers = self.model.config.num_hidden_layers - self.num_dense_layers

for i in range(self.num_dense_layers,
self.model.config.num_hidden_layers):
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w13_weight_list"] = \
self.model.model.layers[i].mlp.experts.w13_weight_list
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w2_weight_list"] = \
self.model.model.layers[i].mlp.experts.w2_weight_list
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w13_weight_scale_fp32_list"] = \
for i in range(self.num_dense_layers, self.model.config.num_hidden_layers):
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w13_weight_list"] = self.model.model.layers[
i
].mlp.experts.w13_weight_list
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w2_weight_list"] = self.model.model.layers[
i
].mlp.experts.w2_weight_list
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w13_weight_scale_fp32_list"] = (
self.model.model.layers[i].mlp.experts.w13_weight_scale_fp32_list
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w2_weight_scale_list"] = \
)
self.param_dict["model.layers." + str(i) + ".mlp.experts." + "w2_weight_scale_list"] = (
self.model.model.layers[i].mlp.experts.w2_weight_scale_list
# TODO: init self.expert_weight_names depending on different model types, only deepseek v3 w8a8 and qwen3-moe is supported here
)
# TODO: init self.expert_weight_names depending on different model types.
# Only deepseek v3 w8a8 and qwen3-moe is supported here
if self.model.quant_config is not None:
self.expert_weight_names = [
"w13_weight_list", "w2_weight_list",
"w13_weight_scale_fp32_list", "w13_weight_offset",
"w2_weight_scale_list", "w2_weight_offset"
"w13_weight_list",
"w2_weight_list",
"w13_weight_scale_fp32_list",
"w13_weight_offset",
"w2_weight_scale_list",
"w2_weight_offset",
]
else:
self.expert_weight_names = ["w13_weight", "w2_weight"]

self.expert_map_per_layer_cpu = dict(
) # copy of expert map on CPU to avoid device synchronize frequently
self.expert_map_per_layer_cpu = dict() # copy of expert map on CPU to avoid device synchronize frequently

num_buffer_tensor = self.model.model.layers[-1].mlp.experts.local_num_experts
self.buffer_tensor_list: list[list[Any]] = [
[] for _ in range(num_buffer_tensor)
]
self.buffer_tensor_list: list[list[Any]] = [[] for _ in range(num_buffer_tensor)]
self.init_buffer_tensor(num_buffer_tensor)

self.expert_param_per_layer = dict()
self.init_expert_param_per_layer()

self.log2phy_map_per_layer = dict()
for layer_idx in range(self.num_moe_layers):
self.log2phy_map_per_layer[self.num_dense_layers + layer_idx] = \
self.model.get_log2phy_map(self.num_dense_layers + layer_idx)
self.log2phy_map_per_layer[self.num_dense_layers + layer_idx] = self.model.get_log2phy_map(
self.num_dense_layers + layer_idx
)

def init_buffer_tensor(self, num_buffer_tensor):
for buffer_id in range(num_buffer_tensor):
for name in self.expert_weight_names:
complete_name = "model.layers." + str(
self.num_dense_layers) + ".mlp.experts." + name
if name in [
"w13_weight_list", "w2_weight_list",
"w13_weight_scale_fp32_list", "w2_weight_scale_list"
]:
complete_name = "model.layers." + str(self.num_dense_layers) + ".mlp.experts." + name
if name in ["w13_weight_list", "w2_weight_list", "w13_weight_scale_fp32_list", "w2_weight_scale_list"]:
expert_tensor = self.param_dict[complete_name][0]
expert_tensor = expert_tensor.clone()
else:
Expand All @@ -99,19 +99,20 @@ def init_expert_param_per_layer(self):
per_expert_param = list()
for name in self.expert_weight_names:
if name in [
"w13_weight_list", "w2_weight_list",
"w13_weight_scale_fp32_list",
"w2_weight_scale_list"
"w13_weight_list",
"w2_weight_list",
"w13_weight_scale_fp32_list",
"w2_weight_scale_list",
]:
per_expert_param.append(
self.param_dict["model.layers." + str(layer_idx) +
".mlp.experts." +
name][local_expert_id])
self.param_dict["model.layers." + str(layer_idx) + ".mlp.experts." + name][local_expert_id]
)
else:
per_expert_param.append(
self.param_dict["model.layers." + str(layer_idx) +
".mlp.experts." +
name][0].data[local_expert_id])
self.param_dict["model.layers." + str(layer_idx) + ".mlp.experts." + name][0].data[
local_expert_id
]
)
self.expert_param_per_layer[layer_idx].append(per_expert_param)

def get_rank_expert_workload(self) -> torch.Tensor:
Expand All @@ -123,26 +124,18 @@ def _export_tensor_to_file(self, expert_maps, expert_map_record_path: str):
num_local_experts = expert_maps.max() + 1

expert_maps_list = expert_maps.tolist()
record: dict[str, Any] = {
"moe_layer_count": len(expert_maps_list),
"layer_list": []
}
record: dict[str, Any] = {"moe_layer_count": len(expert_maps_list), "layer_list": []}

for layer_idx, layer_data in enumerate(expert_maps_list):
layer_record: dict[str, Any] = {
"layer_id": layer_idx,
"device_count": len(layer_data),
"device_list": []
"device_list": [],
}

for device_idx, experts in enumerate(layer_data):
placement = [
experts.index(i) for i in range(num_local_experts)
]
device_record = {
"device_id": device_idx,
"device_expert": placement
}
placement = [experts.index(i) for i in range(num_local_experts)]
device_record = {"device_id": device_idx, "device_expert": placement}
layer_record["device_list"].append(device_record)

record["layer_list"].append(layer_record)
Expand All @@ -153,11 +146,10 @@ def _export_tensor_to_file(self, expert_maps, expert_map_record_path: str):
def do_update_expert_map(self, layer_id, updated_expert_map):
self.expert_map_per_layer_cpu[layer_id].copy_(updated_expert_map)

def do_update_expert_weight(self, layer_id, local_expert_to_replace,
buffer_tensor_id):
def do_update_expert_weight(self, layer_id, local_expert_to_replace, buffer_tensor_id):
for expert_tensor, buffer_tensor in zip(
self.expert_param_per_layer[layer_id][local_expert_to_replace],
self.buffer_tensor_list[buffer_tensor_id]):
self.expert_param_per_layer[layer_id][local_expert_to_replace], self.buffer_tensor_list[buffer_tensor_id]
):
expert_tensor.copy_(buffer_tensor)
logger.debug(f"Expert tensor shape is :{expert_tensor.shape}")

Expand All @@ -168,10 +160,8 @@ def do_update_log2phy_map(self, layer_id, updated_log2phy_map):
def get_global_expert_map(self):
all_layer_global_expert_map = []
for layer_id in range(self.num_moe_layers):
map_cpu = self.model.model.layers[
self.num_dense_layers + layer_id].mlp.experts.global_expert_map.cpu()
map_cpu = self.model.model.layers[self.num_dense_layers + layer_id].mlp.experts.global_expert_map.cpu()
all_layer_global_expert_map.append(map_cpu)
self.expert_map_per_layer_cpu[self.num_dense_layers +
layer_id] = map_cpu[self.rank_id]
self.expert_map_per_layer_cpu[self.num_dense_layers + layer_id] = map_cpu[self.rank_id]

return torch.stack(all_layer_global_expert_map)
47 changes: 14 additions & 33 deletions vllm_ascend/eplb/core/eplb_device_transfer_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class ExpertWeightUpdateState(Enum):


class D2DExpertWeightLoader:

def __init__(self):
self.comm_op_list = None
self.updated_expert_map = None
Expand All @@ -40,14 +39,10 @@ def __init__(self):
def set_adator(self, eplb_adaptor):
self.eplb_adaptor = eplb_adaptor

def generate_expert_d2d_transfer_task(self, expert_send_info,
expert_recv_info, updated_expert_map,
layer_id):
def generate_expert_d2d_transfer_task(self, expert_send_info, expert_recv_info, updated_expert_map, layer_id):
# When current send/recv and weight.expert_map update tasks are not finished, cannot accept new d2d task
if self.state != ExpertWeightUpdateState.WAITING:
logger.warning_once(
"current d2d weight update tasks are on-going, cannot accept new weight update task"
)
logger.warning_once("current d2d weight update tasks are on-going, cannot accept new weight update task")
return

self.updated_expert_map = updated_expert_map
Expand All @@ -56,25 +51,16 @@ def generate_expert_d2d_transfer_task(self, expert_send_info,
self.comm_op_list = []
for send_info in expert_send_info:
dst_rank, global_expert_id_to_send = send_info
local_expert_id = self.eplb_adaptor.expert_map_per_layer_cpu[
layer_id][global_expert_id_to_send].item()
for src_tensor in self.eplb_adaptor.expert_param_per_layer[
layer_id][local_expert_id]:
self.comm_op_list.append(
dist.P2POp(dist.isend, src_tensor, dst_rank))
local_expert_id = self.eplb_adaptor.expert_map_per_layer_cpu[layer_id][global_expert_id_to_send].item()
for src_tensor in self.eplb_adaptor.expert_param_per_layer[layer_id][local_expert_id]:
self.comm_op_list.append(dist.P2POp(dist.isend, src_tensor, dst_rank))

buffer_tensor_id = 0
for recv_info in expert_recv_info:
for buffer_tensor_id, recv_info in enumerate(expert_recv_info):
recv_rank, global_expert_id_to_recv = recv_info
for buffer_tensor in self.eplb_adaptor.buffer_tensor_list[
buffer_tensor_id]:
self.comm_op_list.append(
dist.P2POp(dist.irecv, buffer_tensor, recv_rank))
local_expert_to_replace = self.updated_expert_map[
global_expert_id_to_recv].item()
self.recv_expert_list.append(
(local_expert_to_replace, buffer_tensor_id))
buffer_tensor_id += 1
for buffer_tensor in self.eplb_adaptor.buffer_tensor_list[buffer_tensor_id]:
self.comm_op_list.append(dist.P2POp(dist.irecv, buffer_tensor, recv_rank))
local_expert_to_replace = self.updated_expert_map[global_expert_id_to_recv].item()
self.recv_expert_list.append((local_expert_to_replace, buffer_tensor_id))

self.state = ExpertWeightUpdateState.READY

Expand Down Expand Up @@ -106,23 +92,18 @@ def update_expert_map_and_weight(self, reqs):
self.comm_op_list = None

# update expert_map
self.eplb_adaptor.do_update_expert_map(self.layer_id,
self.updated_expert_map)
self.eplb_adaptor.do_update_expert_map(self.layer_id, self.updated_expert_map)

# update log2phy_map
self.eplb_adaptor.do_update_log2phy_map(self.layer_id,
self.updated_log2phy_map)
self.eplb_adaptor.do_update_log2phy_map(self.layer_id, self.updated_log2phy_map)

# update expert weight
buffer_tensor_id = 0
for recv_expert_info in self.recv_expert_list:
local_expert_to_replace, buffer_tensor_id = recv_expert_info
self.eplb_adaptor.do_update_expert_weight(self.layer_id,
local_expert_to_replace,
buffer_tensor_id)
self.eplb_adaptor.do_update_expert_weight(self.layer_id, local_expert_to_replace, buffer_tensor_id)

logger.debug(
f"[EPLB] finished update expert weight for layer: {self.layer_id}")
logger.debug(f"[EPLB] finished update expert weight for layer: {self.layer_id}")

self.recv_expert_list = []
self.updated_expert_map = None
Expand Down
32 changes: 14 additions & 18 deletions vllm_ascend/eplb/core/eplb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@


def expert_file_to_tensor(expert_map_path, layer_id):
with open(expert_map_path, "r") as f:
with open(expert_map_path) as f:
data = json.load(f)
physical_count = 0
device_data = []
Expand Down Expand Up @@ -61,38 +61,32 @@ def init_eplb_config(eplb_config, layer_id, moe_config):
eplb_enable = eplb_config.dynamic_eplb
n_redundant = eplb_config.num_redundant_experts if eplb_enable else 0
if expert_map_path:
if not (os.path.exists(expert_map_path)
and os.access(expert_map_path, os.R_OK)):
if not (os.path.exists(expert_map_path) and os.access(expert_map_path, os.R_OK)):
raise ValueError("Invalid EPLB path")
eplb_enable = True
global_placement, physical_count = expert_file_to_tensor(
expert_map_path, layer_id)
global_placement, physical_count = expert_file_to_tensor(expert_map_path, layer_id)
if physical_count is not None:
n_redundant = physical_count - n_experts
if not moe_config.supports_eplb:
raise ValueError(
"Eplb supports only w8a8_dynamic quantization.")
raise ValueError("Eplb supports only w8a8_dynamic quantization.")
else:
eplb_enable = False

if global_placement is None:
global_placement = generate_global_placement(n_experts, ep_size,
n_redundant)
global_placement = generate_global_placement(n_experts, ep_size, n_redundant)

if ep_size == 1:
assert not eplb_enable, "EPLB must used in expert parallelism."
return None, None, None, n_redundant
global_expert_map = []
for rankid in range(ep_size):
expert_map = torch.full((n_experts, ), -1, dtype=torch.int32)
expert_map = torch.full((n_experts,), -1, dtype=torch.int32)
local_placement = global_placement[rankid]
expert_map[local_placement] = torch.arange(local_placement.shape[0],
dtype=torch.int32)
expert_map[local_placement] = torch.arange(local_placement.shape[0], dtype=torch.int32)
global_expert_map.append(expert_map)
if rankid == moe_config.ep_rank:
local_expert_map = expert_map.npu()
log2phy = generate_log2phy_map(
global_expert_map, moe_config.ep_rank).npu() if eplb_enable else None
log2phy = generate_log2phy_map(global_expert_map, moe_config.ep_rank).npu() if eplb_enable else None

return torch.stack(global_expert_map), local_expert_map, log2phy, n_redundant

Expand All @@ -106,13 +100,15 @@ def generate_log2phy_map(global_expert_map, ep_rank):
if val != -1:
log2phy_map[idx].append(val + rankid * valid_count)

for key in log2phy_map.keys():
for key in log2phy_map:
num_of_duplications = len(log2phy_map[key])
log2phy_map[key] = log2phy_map[key][ep_rank % num_of_duplications]

log2phy_map = torch.scatter(
torch.zeros(len(log2phy_map.keys()), dtype=torch.int32), 0,
torch.tensor(list(log2phy_map.keys()), dtype=torch.int64),
torch.tensor(list(log2phy_map.values()), dtype=torch.int32))
torch.zeros(len(log2phy_map), dtype=torch.int32),
0,
torch.tensor(list(log2phy_map), dtype=torch.int64),
torch.tensor(list(log2phy_map.values()), dtype=torch.int32),
)

return log2phy_map
Loading