Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 88 additions & 32 deletions vllm_ascend/cpu_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,38 +209,75 @@ def build_cpu_node_map(self) -> None:
raise RuntimeError("lscpu command output error, no NUMA node available. Please check!")

def handle_no_affinity(self) -> None:
num_running_npu = len(self.device_info.running_npu_list)
num_numa_node = len(self.numa_to_cpu_map)
if num_numa_node == 0 or num_running_npu == 0:
"""
1) Build available NUMA nodes after allowed_cpus filtering
2) Assign NPUs to NUMA nodes by round-robin (npu_id % num_nodes)
3) Within each NUMA node, split its CPU list into per-NPU disjoint slices
"""
running = list(self.device_info.running_npu_list)
if not running or not self.numa_to_cpu_map:
return
if num_running_npu % num_numa_node != 0:
npu_num_per_node = num_running_npu // num_numa_node + 1
else:
npu_num_per_node = num_running_npu // num_numa_node
index = 0

# 1) Only keep NUMA nodes that still have CPUs after allowed_cpus filtering.
available_nodes: list[tuple[int, list[int]]] = []
for node in sorted(self.numa_to_cpu_map):
# Available CPUs on this NUMA (constrained by allowed_cpus)
cpus = [c for c in self.numa_to_cpu_map[node] if c in self.device_info.allowed_cpus]
if not cpus:
if cpus:
available_nodes.append((node, cpus))
if not available_nodes:
return

num_nodes = len(available_nodes)

# Infer "my_npu" from local rank + visible running_npu_list, assuming local rank is index into running_npu_list.
if 0 <= self.rank_id < len(running):
my_npu = running[self.rank_id]
else:
# Fallback: modulo in case rank range is larger than visible list length.
my_npu = running[self.rank_id % len(running)]

print(
f"[no_affinity_fine] rank:{self.rank_id} -> my_npu:{my_npu}; "
f"running_npu_list:{running}; num_available_nodes:{num_nodes}"
)
Comment on lines +232 to +242
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block of code appears to be for debugging purposes. The my_npu variable is calculated but only used in the following print statement, and not used anywhere else in the function. print statements should be avoided in library code as they write to standard output and can be noisy. It's better to use the existing logger for such information. Since this seems to be purely for debugging, this entire block can be removed to improve code clarity and remove dead code.


# 2) Round-robin assign NPUs to nodes based on NPU id (same as new logic).
# Build: node_index -> list[npu]
node_to_npus: dict[int, list[int]] = {i: [] for i in range(num_nodes)}
for npu in running:
node_index = npu % num_nodes
node_to_npus[node_index].append(npu)

# 3) Within each node, split cpus among the NPUs assigned to this node.
for node_index, npus in node_to_npus.items():
if not npus:
continue
# The actual number of NPUs to be allocated on this NUMA.
npu_num_this_node = min(npu_num_per_node, num_running_npu - index)
if npu_num_this_node <= 0:
break
# NUMA-balanced distribute the CPUs of this NUMA node among npu_num_this_node NPUs.

node_id, cpus = available_nodes[node_index]
total_cpu_num = len(cpus)
base_cpu_num = total_cpu_num // npu_num_this_node
extra_cpu_num = total_cpu_num % npu_num_this_node
start_index = 0
for i in range(npu_num_this_node):
take_cpu_num = base_cpu_num + (1 if i < extra_cpu_num else 0)
end_index = start_index + take_cpu_num
select_cpus_list = cpus[start_index:end_index]
if index < num_running_npu:
npu = self.device_info.running_npu_list[index]
self.npu_cpu_pool[npu] = select_cpus_list
index += 1
start_index = end_index
n = len(npus)

# Edge case: should not happen because we filtered cpus, but keep safe.
if total_cpu_num == 0:
continue

# If CPUs are fewer than NPUs, we can only guarantee small (possibly duplicated) slices.
if total_cpu_num < n:
for i, npu in enumerate(npus):
cpu = cpus[i % total_cpu_num]
self.npu_cpu_pool[npu] = [cpu]
continue

# Even split (disjoint slices), first 'extra' NPUs take 1 more CPU.
base = total_cpu_num // n
extra = total_cpu_num % n

start = 0
for i, npu in enumerate(npus):
take = base + (1 if i < extra else 0)
end = start + take
self.npu_cpu_pool[npu] = cpus[start:end]
start = end

DEVICE_BINDING_MODE = {
AscendDeviceType.A3: "numa_balanced",
Expand Down Expand Up @@ -302,15 +339,34 @@ def print_plan(self) -> None:
logger.info(f"NPU{current_npu}: main=[{main}] acl=[{acl}] release=[{rel}]")

def bind_memory(self, pid: str, npu: int) -> None:
def _get_npu_numa_node(npu_id: int) -> int | None:
cpu_pool = self.npu_cpu_pool.get(npu_id, [])
if not cpu_pool:
return None
anchor_cpu = cpu_pool[0]
return self.cpu_node.get(anchor_cpu)

if not shutil.which("migratepages"):
logger.info("The 'migratepages' command is not available, skipping memory binding.")
return
target_numa = _get_npu_numa_node(npu)
if target_numa is None:
logger.warning(f"[migrate] rank:{self.rank_id} -> NPU{npu} has no CPU pool, skip memory binding.")
return
all_numa_nodes = sorted(self.numa_to_cpu_map.keys())
target_cpu = self.assign_acl[npu][0]
target_numa = self.cpu_node[target_cpu]
bind_numa_list = [target_numa, target_numa + 1 if target_numa % 2 == 0 else target_numa - 1]
logger.info(f"[migrate] rank:{self.rank_id} -> NUMA {bind_numa_list}")
execute_command(["migratepages", pid, ",".join(map(str, all_numa_nodes)), ",".join(map(str, bind_numa_list))])
if target_numa not in all_numa_nodes:
logger.warning(f"[migrate] NPU:{npu} -> NUMA {target_numa} not found, skip memory binding.")
return
# Bind memory to the NPU's NUMA node only to minimize cross-NUMA traffic.
logger.info(f"[migrate] NPU:{npu} -> NUMA [{target_numa}]")
execute_command(
[
"migratepages",
pid,
",".join(map(str, all_numa_nodes)),
str(target_numa),
]
)

def bind_threads(self) -> None:
thread_message, _ = execute_command(["ps", "-Te"])
Expand Down