diff --git a/tests/ut/device_allocator/test_cpu_binding.py b/tests/ut/device_allocator/test_cpu_binding.py index 89657f305ee..ea7bc6cfad2 100644 --- a/tests/ut/device_allocator/test_cpu_binding.py +++ b/tests/ut/device_allocator/test_cpu_binding.py @@ -1,5 +1,5 @@ import unittest -from unittest.mock import patch +from unittest.mock import mock_open, patch from vllm_ascend.cpu_binding import CpuAlloc, DeviceInfo, bind_cpus, is_arm_cpu from vllm_ascend.utils import AscendDeviceType @@ -70,6 +70,10 @@ def test_expand_cpu_list(self): result = self.device_info.expand_cpu_list("0-2, 4, 6-8") self.assertEqual(result, [0, 1, 2, 4, 6, 7, 8]) + def test_get_all_logic_npus(self): + self.assertEqual(self.device_info.all_logic_npus, [0, 1]) + self.assertEqual(self.device_info.total_logic_npus, 2) + class TestCpuAlloc(unittest.TestCase): @@ -107,19 +111,28 @@ def test_average_distribute(self): @patch('vllm_ascend.cpu_binding.get_ascend_device_type') def test_binding_mode_table(self, mock_get_device_type): mock_get_device_type.return_value = AscendDeviceType.A2 - self.assertEqual(self.cpu_alloc._binding_mode(), "affinity") + self.assertEqual(self.cpu_alloc._binding_mode(), "topo_affinity") mock_get_device_type.return_value = AscendDeviceType.A3 - self.assertEqual(self.cpu_alloc._binding_mode(), "numa_balanced") + self.assertEqual(self.cpu_alloc._binding_mode(), "global_slice") @patch('vllm_ascend.cpu_binding.get_ascend_device_type') - def test_build_cpu_pools_fallback_to_numa_balanced(self, mock_get_device_type): + def test_build_cpu_pools_fallback_to_global_slice(self, mock_get_device_type): mock_get_device_type.return_value = AscendDeviceType.A2 self.cpu_alloc.device_info.npu_affinity = {} with patch.object(self.cpu_alloc, "build_cpu_node_map") as mock_build_cpu_node_map, \ - patch.object(self.cpu_alloc, "handle_no_affinity") as mock_handle_no_affinity: + patch.object(self.cpu_alloc, "build_global_slice_cpu_pool") as mock_build_global_slice_cpu_pool: + self.cpu_alloc.build_cpu_pools() + mock_build_cpu_node_map.assert_called_once() + mock_build_global_slice_cpu_pool.assert_called_once() + + @patch('vllm_ascend.cpu_binding.get_ascend_device_type') + def test_build_cpu_pools_global_slice_mode(self, mock_get_device_type): + mock_get_device_type.return_value = AscendDeviceType.A3 + with patch.object(self.cpu_alloc, "build_cpu_node_map") as mock_build_cpu_node_map, \ + patch.object(self.cpu_alloc, "build_global_slice_cpu_pool") as mock_build_global_slice_cpu_pool: self.cpu_alloc.build_cpu_pools() mock_build_cpu_node_map.assert_called_once() - mock_handle_no_affinity.assert_called_once() + mock_build_global_slice_cpu_pool.assert_called_once() def test_extend_numa(self): result = self.cpu_alloc.extend_numa([]) @@ -146,18 +159,50 @@ def test_build_cpu_node_map(self, mock_execute_command): self.assertEqual(self.cpu_alloc.numa_to_cpu_map, expected_numa_to_cpu_map) - @patch('vllm_ascend.cpu_binding.get_ascend_device_type') - @patch('vllm_ascend.cpu_binding.execute_command') - def test_handle_no_affinity(self, mock_execute_command, mock_get_device_type): - mock_get_device_type.return_value = AscendDeviceType.A3 - mock_execute_command.side_effect = [("0 0\n1 1", 0), ("0 0\n1 1", 0)] + def test_build_global_slice_cpu_pool_uses_total_logic_npus(self): + self.cpu_alloc.device_info.running_npu_list = [1] + self.cpu_alloc.device_info.allowed_cpus = list(range(20)) + self.cpu_alloc.device_info.total_logic_npus = 2 + self.cpu_alloc.device_info.npu_affinity = {0: [0, 1], 1: [2, 3]} + + self.cpu_alloc.build_global_slice_cpu_pool() + self.assertEqual(self.cpu_alloc.npu_cpu_pool[1], list(range(10, 20))) + + def test_build_global_slice_cpu_pool_fallback_to_affinity_len(self): self.cpu_alloc.device_info.running_npu_list = [0, 1] - self.cpu_alloc.device_info.allowed_cpus = [0, 1, 2, 3] - self.cpu_alloc.device_info.affinity = {} - self.assertEqual(self.cpu_alloc.npu_cpu_pool, {}) - self.cpu_alloc.device_info.affinity = {0: [0, 1], 1: [2, 3]} - self.cpu_alloc.build_cpu_pools() - self.assertEqual(len(self.cpu_alloc.npu_cpu_pool), 2) + self.cpu_alloc.device_info.allowed_cpus = list(range(12)) + self.cpu_alloc.device_info.total_logic_npus = 0 + self.cpu_alloc.device_info.npu_affinity = {0: [0, 1], 1: [2, 3]} + + self.cpu_alloc.build_global_slice_cpu_pool() + self.assertEqual(self.cpu_alloc.npu_cpu_pool[0], [0, 1, 2, 3, 4, 5]) + self.assertEqual(self.cpu_alloc.npu_cpu_pool[1], [6, 7, 8, 9, 10, 11]) + + def test_build_global_slice_cpu_pool_fallback_to_running_len(self): + self.cpu_alloc.device_info.running_npu_list = [0, 1] + self.cpu_alloc.device_info.allowed_cpus = list(range(12)) + self.cpu_alloc.device_info.total_logic_npus = 0 + self.cpu_alloc.device_info.npu_affinity = {} + + self.cpu_alloc.build_global_slice_cpu_pool() + self.assertEqual(self.cpu_alloc.npu_cpu_pool[0], [0, 1, 2, 3, 4, 5]) + self.assertEqual(self.cpu_alloc.npu_cpu_pool[1], [6, 7, 8, 9, 10, 11]) + + def test_build_global_slice_cpu_pool_raises_when_cpu_insufficient(self): + self.cpu_alloc.device_info.running_npu_list = [0, 1] + self.cpu_alloc.device_info.allowed_cpus = list(range(8)) + self.cpu_alloc.device_info.total_logic_npus = 2 + + with self.assertRaises(RuntimeError): + self.cpu_alloc.build_global_slice_cpu_pool() + + def test_build_global_slice_cpu_pool_raises_invalid_npu_id(self): + self.cpu_alloc.device_info.running_npu_list = [2] + self.cpu_alloc.device_info.allowed_cpus = list(range(12)) + self.cpu_alloc.device_info.total_logic_npus = 2 + + with self.assertRaises(RuntimeError): + self.cpu_alloc.build_global_slice_cpu_pool() @patch('vllm_ascend.cpu_binding.execute_command') def test_allocate(self, mock_execute_command): @@ -182,6 +227,28 @@ def test_bind_threads(self, mock_execute_command): self.cpu_alloc.bind_threads() mock_execute_command.assert_called() + @patch('vllm_ascend.cpu_binding.get_ascend_device_type') + @patch('vllm_ascend.cpu_binding.os.listdir') + @patch('builtins.open', new_callable=mock_open, read_data='123: 0 0 0 0 sq_send_trigger_irq\n') + @patch('vllm_ascend.cpu_binding.shutil.which') + @patch('vllm_ascend.cpu_binding.os.access') + @patch('vllm_ascend.cpu_binding.execute_command') + def test_bind_npu_irq_a3_uses_card_chip_mapping(self, mock_execute_command, mock_access, + mock_which, _mock_open, mock_listdir, + mock_get_device_type): + mock_access.return_value = True + mock_which.return_value = None + mock_listdir.side_effect = FileNotFoundError + mock_get_device_type.return_value = AscendDeviceType.A3 + mock_execute_command.return_value = ("PCIe Bus Info 0000:03:00.0", 0) + self.cpu_alloc.rank_id = 0 + self.cpu_alloc.device_info.running_npu_list = [3] + self.cpu_alloc.npu_cpu_pool = {3: [0, 1, 2, 3, 4]} + + self.cpu_alloc.bind_npu_irq() + + mock_execute_command.assert_any_call(["npu-smi", "info", "-t", "board", "-i", "1", "-c", "1"]) + class TestBindingSwitch(unittest.TestCase): diff --git a/vllm_ascend/cpu_binding.py b/vllm_ascend/cpu_binding.py index dfcf624051b..abb80aa1c2d 100644 --- a/vllm_ascend/cpu_binding.py +++ b/vllm_ascend/cpu_binding.py @@ -12,9 +12,19 @@ from vllm_ascend.utils import AscendDeviceType, get_ascend_device_type MASK_BIT = 32 # Number of bits in a CPU affinity mask group +MIN_CPUS_PER_NPU = 5 # 2(IRQ) + 1(main, at least 1 CPU) + 1(acl) + 1(release) = 5 CPUs per NPU ALLOWED_CPUS_PATH = "/proc/self/status" ASCEND_RT_VISIBLE_DEVICES = os.getenv("ASCEND_RT_VISIBLE_DEVICES") +TOPO_AFFINITY_MODE = "topo_affinity" +GLOBAL_SLICE_MODE = "global_slice" + +DEVICE_BINDING_MODE: dict["AscendDeviceType", str] = { + AscendDeviceType.A2: TOPO_AFFINITY_MODE, + AscendDeviceType.A3: GLOBAL_SLICE_MODE, + AscendDeviceType._310P: TOPO_AFFINITY_MODE, +} + def is_arm_cpu() -> bool: arch = platform.machine().lower() @@ -38,6 +48,8 @@ def __init__(self): self.allowed_cpus: list[int] = self.parse_allowed_cpus() self.running_npu_list: list[int] = self.get_running_npus() self.npu_affinity: dict[int, list[int]] = self.parse_topo_affinity() + self.all_logic_npus: list[int] = self.get_all_logic_npus() + self.total_logic_npus: int = len(self.all_logic_npus) @staticmethod def expand_cpu_list(allowed_list_str: str) -> list[int]: @@ -50,6 +62,20 @@ def expand_cpu_list(allowed_list_str: str) -> list[int]: allowed_cpus_list.append(int(per_range)) return allowed_cpus_list + def get_all_logic_npus(self) -> list[int]: + """Collect all logical NPU IDs from the NPU mapping. + + self.npu_map_info maps a board_id (A3) or npu_id (A2) to a per-chip map. + The per-chip map uses chip_id as the key and the logical NPU ID string + as the value. + """ + logic_ids: set[int] = set() + for _, chip_map in self.npu_map_info.items(): + for _, logic_str in chip_map.items(): + if logic_str and logic_str.isdigit(): + logic_ids.add(int(logic_str)) + return sorted(logic_ids) + @staticmethod def get_npu_map_info() -> dict[str, dict[str, str]]: npu_map_info: dict[str, dict[str, str]] = {} @@ -208,95 +234,100 @@ def build_cpu_node_map(self) -> None: if len(self.numa_to_cpu_map) == 0: raise RuntimeError("lscpu command output error, no NUMA node available. Please check!") - def handle_no_affinity(self) -> None: + def build_global_slice_cpu_pool(self) -> None: """ - 1) Build available NUMA nodes after allowed_cpus filtering - 2) Assign NPUs to NUMA nodes by round-robin (npu_id % num_nodes) - 3) Within each NUMA node, split its CPU list into per-NPU disjoint slices + Build per-NPU CPU pools by slicing allowed_cpus using GLOBAL logical NPU ids. + + Why: + - Multiple processes/DP groups may share the SAME cpuset (same allowed_cpus). + - If each process slices only its visible NPUs, CPU ranges overlap across processes. + - Global slicing ensures deterministic, non-overlapping CPU partitions per logical NPU id. + + Notes: + - This strategy does NOT rely on npu-smi topo affinity. + - NUMA locality is achieved only if CPU numbering aligns with NUMA layout. + - Requires per-NPU slice size >= 5 (IRQ(2) + main(>=1) + acl(1) + release(1)). """ running = list(self.device_info.running_npu_list) - if not running or not self.numa_to_cpu_map: + if not running: return - # 1) Only keep NUMA nodes that still have CPUs after allowed_cpus filtering. - available_nodes: list[tuple[int, list[int]]] = [] - for node in sorted(self.numa_to_cpu_map): - cpus = [c for c in self.numa_to_cpu_map[node] if c in self.device_info.allowed_cpus] - if cpus: - available_nodes.append((node, cpus)) - if not available_nodes: + allowed = sorted(set(self.device_info.allowed_cpus)) + total_cpu = len(allowed) + if total_cpu == 0: return - num_nodes = len(available_nodes) - - # Infer "my_npu" from local rank + visible running_npu_list, assuming local rank is index into running_npu_list. - if 0 <= self.rank_id < len(running): - my_npu = running[self.rank_id] + # Prefer mapping info (npu-smi info -m), fallback to topo keys, then visible list + if self.device_info.total_logic_npus > 0: + total_npus = self.device_info.total_logic_npus + elif self.device_info.npu_affinity: + total_npus = len(self.device_info.npu_affinity) else: - # Fallback: modulo in case rank range is larger than visible list length. - my_npu = running[self.rank_id % len(running)] - - print( - f"[no_affinity_fine] rank:{self.rank_id} -> my_npu:{my_npu}; " - f"running_npu_list:{running}; num_available_nodes:{num_nodes}" - ) - - # 2) Round-robin assign NPUs to nodes based on NPU id (same as new logic). - # Build: node_index -> list[npu] - node_to_npus: dict[int, list[int]] = {i: [] for i in range(num_nodes)} - for npu in running: - node_index = npu % num_nodes - node_to_npus[node_index].append(npu) - - # 3) Within each node, split cpus among the NPUs assigned to this node. - for node_index, npus in node_to_npus.items(): - if not npus: - continue + total_npus = len(running) - node_id, cpus = available_nodes[node_index] - total_cpu_num = len(cpus) - n = len(npus) + if total_npus <= 0: + return - # Edge case: should not happen because we filtered cpus, but keep safe. - if total_cpu_num == 0: - continue + # Compute global per-NPU slicing + base = total_cpu // total_npus + extra = total_cpu % total_npus - # If CPUs are fewer than NPUs, we can only guarantee small (possibly duplicated) slices. - if total_cpu_num < n: - for i, npu in enumerate(npus): - cpu = cpus[i % total_cpu_num] - self.npu_cpu_pool[npu] = [cpu] - continue + logger.debug( + f"[cpu_global_slice] rank:{self.rank_id} ASCEND_RT_VISIBLE_DEVICES={ASCEND_RT_VISIBLE_DEVICES} " + f"running_npu_list:{running} total_npus:{total_npus} allowed_cpus:{total_cpu} " + f"base:{base} extra:{extra} allowed_cpus_head:{allowed[:16]} allowed_cpus_tail:{allowed[-16:]}" + ) - # Even split (disjoint slices), first 'extra' NPUs take 1 more CPU. - base = total_cpu_num // n - extra = total_cpu_num % n + # Enforce per-NPU slice length >= 5. + # Because with remainder distribution, some NPUs may get 'base' cores and some get 'base+1'. + # The minimum slice size is 'base'. + if base < MIN_CPUS_PER_NPU: + raise RuntimeError( + "Insufficient CPUs for binding with IRQ/ACL/REL reservations: " + f"total_allowed={total_cpu}, total_npus={total_npus}, " + f"min_per_npu={base} (<{MIN_CPUS_PER_NPU}). " + f"Need at least {total_npus * MIN_CPUS_PER_NPU} CPUs in cpuset." + ) - start = 0 - for i, npu in enumerate(npus): - take = base + (1 if i < extra else 0) - end = start + take - self.npu_cpu_pool[npu] = cpus[start:end] - start = end + def _slice_for_npu(global_npu_id: int) -> list[int]: + # start = global_npu_id*base + min(global_npu_id, extra) + start = global_npu_id * base + (global_npu_id if global_npu_id < extra else extra) + take = base + (1 if global_npu_id < extra else 0) + end = start + take + return allowed[start:end] - DEVICE_BINDING_MODE = { - AscendDeviceType.A3: "numa_balanced", - } + for npu in running: + if npu < 0 or npu >= total_npus: + raise RuntimeError(f"Invalid NPU id {npu}, total_npus={total_npus}.") + cpus = _slice_for_npu(npu) + # Extra safety: should always be >= base >= 5 + if len(cpus) < MIN_CPUS_PER_NPU: + raise RuntimeError( + f"NPU{npu} got too few CPUs: {len(cpus)} (<5). " + f"total_allowed={total_cpu}, total_npus={total_npus}, base={base}, extra={extra}" + ) + self.npu_cpu_pool[npu] = cpus - @classmethod - def _binding_mode(cls) -> str: + @staticmethod + def _binding_mode() -> str: device_type = get_ascend_device_type() - return cls.DEVICE_BINDING_MODE.get(device_type, "affinity") + return DEVICE_BINDING_MODE.get(device_type, TOPO_AFFINITY_MODE) def build_cpu_pools(self) -> None: self.build_cpu_node_map() - if self._binding_mode() == "numa_balanced": - self.handle_no_affinity() + + mode = self._binding_mode() + logger.info(f"[cpu_bind_mode] mode={mode} rank={self.rank_id} visible_npus={self.device_info.running_npu_list}") + if mode == GLOBAL_SLICE_MODE: + self.build_global_slice_cpu_pool() return + + # topo_affinity mode if not self.device_info.npu_affinity: - logger.warning("NPU affinity info not found, fallback to NUMA-balanced CPU binding.") - self.handle_no_affinity() + logger.warning("NPU topo affinity not found, fallback to global-slice CPU binding.") + self.build_global_slice_cpu_pool() return + for npu in self.device_info.running_npu_list: base_cpu_list = [ cpu for cpu in self.device_info.npu_affinity.get(npu, []) if cpu in self.device_info.allowed_cpus @@ -305,9 +336,11 @@ def build_cpu_pools(self) -> None: raise RuntimeError("CPUs available in 'Cpus_allowed_list' conflict with NUMA affinity.") extra_cpu_list = self.extend_numa(base_cpu_list) self.npu_cpu_pool[npu] = extra_cpu_list + groups = defaultdict(list) for npu, cpus in self.npu_cpu_pool.items(): groups[str(cpus)].append(npu) + final: dict[int, list[int]] = {} for key, npu_list in groups.items(): if len(npu_list) == 1: @@ -318,13 +351,13 @@ def build_cpu_pools(self) -> None: def allocate(self) -> None: for npu, pool in self.npu_cpu_pool.items(): - if len(pool) >= 3: - main = pool[2:-2] # Reserve first two CPUs for IRQ binding + if len(pool) >= MIN_CPUS_PER_NPU: + main = pool[2:-2] acl = [pool[-2]] rel = [pool[-1]] else: raise RuntimeError( - "The number of CPUs is insufficient to bind to the NPUs. Each NPU requires at least 3 CPUs." + f"The number of CPUs is insufficient. Each NPU requires at least {MIN_CPUS_PER_NPU} CPUs." ) self.assign_main[npu] = main self.assign_acl[npu] = acl @@ -383,6 +416,13 @@ def bind_threads(self) -> None: def bind_npu_irq(self) -> None: if not os.access("/proc/irq", os.W_OK): return + + # Only bind IRQ for current rank's NPU to avoid multi-process overwrite. + current_npu = self.device_info.running_npu_list[self.rank_id] + if current_npu not in self.npu_cpu_pool: + logger.warning(f"[irq] rank:{self.rank_id} -> NPU{current_npu} has no cpu pool, skip irq binding.") + return + if shutil.which("systemctl"): output, _ = execute_command(["systemctl", "list-unit-files"]) if "irqbalance.service" in output: @@ -393,48 +433,66 @@ def bind_npu_irq(self) -> None: "You can run the systemctl start irqbalance command to restart it." ) execute_command(["systemctl", "stop", "irqbalance"]) + sq_irqs = [] with open("/proc/interrupts") as f: for line in f: if "sq_send_trigger_irq" in line: irq = line.split(":")[0].strip() sq_irqs.append(irq) - for npu in sorted(self.npu_cpu_pool.keys()): - cpus = self.npu_cpu_pool[npu] - if len(cpus) < 2: - continue - sq_cpu, cq_cpu = cpus[0], cpus[1] # Reserved for IRQ binding + + npu = current_npu + cpus = self.npu_cpu_pool[npu] + if len(cpus) < 2: + logger.warning(f"[irq] NPU{npu} cpu pool too small (<2), skip irq binding.") + return + + sq_cpu, cq_cpu = cpus[0], cpus[1] # Reserved for IRQ binding + pci_addr = "" + + device_type = get_ascend_device_type() + if device_type == AscendDeviceType.A3: + # A3: logical npu_id = card_id*2 + chip_id + card_id = npu // 2 + chip_id = npu % 2 + info, _ = execute_command(["npu-smi", "info", "-t", "board", "-i", str(card_id), "-c", str(chip_id)]) + else: + # A2 / others: logical npu_id is card id info, _ = execute_command(["npu-smi", "info", "-t", "board", "-i", str(npu)]) - pci_addr = "" - for line in info.splitlines(): - if "PCIe Bus Info" in line: - pci_addr = line.split()[-1].lower() - break - if not pci_addr: - logger.warning(f"Can't find pci address of NPU{npu} .") - continue - try: - npu_irq_list = sorted(os.listdir(f"/sys/bus/pci/devices/{pci_addr}/msi_irqs/"), key=lambda x: int(x)) - except FileNotFoundError: - logger.warning(f"The msi_irqs folder cannot be found under /sys/bus/pci/devices/{pci_addr} .") - continue - sq_irq, cq_irq = "", "" - for irq in sq_irqs: - if irq in npu_irq_list: - sq_irq = irq - cq_irq = str(int(irq) + 1) - break - if not sq_irq: - logger.warning(f"The sq_send_trigger_irq of NPU{npu} is not found.") - continue - logger.info( - f"NPU{npu}(PCI {pci_addr}): sq_send_trigger_irq IRQ_ID={sq_irq} -> CPU{sq_cpu}, " - f"cq_update_irq IRQ_ID={cq_irq} -> CPU{cq_cpu}" - ) - with open(f"/proc/irq/{sq_irq}/smp_affinity", "w") as f: - f.write(self.cpu_to_mask(sq_cpu)) - with open(f"/proc/irq/{cq_irq}/smp_affinity", "w") as f: - f.write(self.cpu_to_mask(cq_cpu)) + + for line in info.splitlines(): + if "PCIe Bus Info" in line: + pci_addr = line.split()[-1].lower() + break + + if not pci_addr: + logger.warning(f"Can't find pci address of NPU{npu} .") + return + + try: + npu_irq_list = sorted(os.listdir(f"/sys/bus/pci/devices/{pci_addr}/msi_irqs/"), key=lambda x: int(x)) + except FileNotFoundError: + logger.warning(f"The msi_irqs folder cannot be found under /sys/bus/pci/devices/{pci_addr} .") + return + + sq_irq, cq_irq = "", "" + for irq in sq_irqs: + if irq in npu_irq_list: + sq_irq = irq + cq_irq = str(int(irq) + 1) + break + if not sq_irq: + logger.warning(f"The sq_send_trigger_irq of NPU{npu} is not found.") + return + + logger.info( + f"NPU{npu}(PCI {pci_addr}): sq_send_trigger_irq IRQ_ID={sq_irq} -> CPU{sq_cpu}, " + f"cq_update_irq IRQ_ID={cq_irq} -> CPU{cq_cpu}" + ) + with open(f"/proc/irq/{sq_irq}/smp_affinity", "w") as f: + f.write(self.cpu_to_mask(sq_cpu)) + with open(f"/proc/irq/{cq_irq}/smp_affinity", "w") as f: + f.write(self.cpu_to_mask(cq_cpu)) def run_all(self) -> None: self.build_cpu_pools()