diff --git a/nemo/lightning/run/plugins.py b/nemo/lightning/run/plugins.py index ce2d89f5feda..30c11e68b47a 100644 --- a/nemo/lightning/run/plugins.py +++ b/nemo/lightning/run/plugins.py @@ -145,6 +145,126 @@ def setup(self, task: run.Partial | run.Script, executor: run.Executor): _merge_callbacks(task, callbacks=callbacks) +@dataclass(kw_only=True) +class IsolationTestPlugin(run.Plugin): + """ + A plugin for adding a noise job to the workload job for measuring the impact of noisy neighbors + on the performance of the workload. + + Args: + num_of_noise_pairs: Number of noise pairs to run + min_victims_nodes: Minimum number of victims nodes for isolation test + extra_nodes_to_allocate: Extra nodes to allocate for the job to increase the chance of having enough nodes under single leaf switch + isolation_container_image: Container image to use for isolation test + """ + + num_of_noise_pairs: int = 1 + min_victims_nodes: int = 1 + extra_nodes_to_allocate: int = 0 + isolation_container_image: str = "nvcr.io/nvidia/pytorch:25.02-py3" + + def update_number_of_nodes_for_workload_and_noise(self, executor: run.Executor): + "Update the number of nodes for the workload and noise" + num_of_workload_nodes = executor.nodes + num_of_noise_nodes = 2 * self.num_of_noise_pairs + total_num_of_nodes = num_of_workload_nodes + num_of_noise_nodes + self.extra_nodes_to_allocate + + # set total number of nodes for sbatch + executor.nodes = total_num_of_nodes + + # initialize env vars for workload and noise + + executor.env_vars["NUM_OF_WORKLOAD_NODES"] = str(num_of_workload_nodes) + executor.env_vars["NUM_OF_NOISE_NODES"] = str(num_of_noise_nodes) + executor.env_vars["MIN_VICTIMS_NODES"] = str(self.min_victims_nodes) + executor.env_vars["NOISE_BUILD_TIME"] = str(120) + + def generate_noise_initialization_code(self, job_dir): + "Generate the noise preparation and execution code" + + current_file_path = os.path.abspath(__file__) + current_directory = os.path.dirname(current_file_path) + split_nodes_dir = os.path.join(current_directory, "scripts", "split_nodes") + + noise_cmd = f''' + +# Step 1: Save temporary input files +scontrol show hostnames $SLURM_NODELIST > {job_dir}/allocated_nodes.txt +scontrol show topo > {job_dir}/topology.txt + +# Step 2: Call external Python script +python {split_nodes_dir}/split_nodes_by_leaf.py --allocated-nodes-file {job_dir}/allocated_nodes.txt --topology-file {job_dir}/topology.txt --strategy compact --workload-a-nodes ${{NUM_OF_WORKLOAD_NODES}} --workload-b-nodes ${{NUM_OF_NOISE_NODES}} --victim-nodes ${{MIN_VICTIMS_NODES}} --output-file {job_dir}/split-nodes.txt + +readarray -t SPLIT_NODES_OUTPUT < <(cat {job_dir}/split-nodes.txt) +WORKLOAD_NODES=${{SPLIT_NODES_OUTPUT[0]}} +NOISE_NODES=${{SPLIT_NODES_OUTPUT[1]}} + +echo "WORKLOAD_NODES: ${{WORKLOAD_NODES}}" +echo "NOISE_NODES: ${{NOISE_NODES}}" + +IFS=',' read -ra NUM_ALLOCATED_WORKLOAD_NODES <<< "$WORKLOAD_NODES" +IFS=',' read -ra NUM_ALLOCATED_NOISE_NODES <<< "$NOISE_NODES" + +# This is here to debug the script and make sure the number of nodes is correct +NUM_ALLOCATED_WORKLOAD_NODES=${{#NUM_ALLOCATED_WORKLOAD_NODES[@]}} +NUM_ALLOCATED_NOISE_NODES=${{#NUM_ALLOCATED_NOISE_NODES[@]}} + +# Verify that the number of LLM nodes matches expected value +if [ "${{NUM_ALLOCATED_WORKLOAD_NODES}}" -ne ${{NUM_OF_WORKLOAD_NODES}} ]; then + echo "Error: Expected ${{NUM_OF_WORKLOAD_NODES}} for workload A, but got ${{NUM_ALLOCATED_WORKLOAD_NODES}}" + exit 1 +fi + +# Verify that the number of noisy neighbor nodes matches expected value +if [ "${{NUM_ALLOCATED_NOISE_NODES}}" -ne ${{NUM_OF_NOISE_NODES}} ]; then + echo "Error: Expected ${{NUM_OF_NOISE_NODES}} for workload B, but got ${{NUM_ALLOCATED_NOISE_NODES}}" + exit 1 +fi + + +# Step 3: Run the noise job +mkdir -p {job_dir}/noise +echo "Launching noise in the background" +NUMBER_OF_PAIRS=$(($NUM_OF_NOISE_NODES * 4)) # we have 8 GPUS per node, and we want to split to pairs + srun --nodes=${{NUM_OF_NOISE_NODES}} --nodelist=${{NOISE_NODES}} --label --ntasks-per-node=8 --output={job_dir}/noise/out.log --error={job_dir}/noise/err.log --export=ALL --mpi=pmix --container-image={self.isolation_container_image} bash -c "NCCL_TESTS_SPLIT=%${{NUMBER_OF_PAIRS}} NCCL_IB_QPS_PER_CONNECTION=4 sendrecv_perf_mpi --nthreads 1 --ngpus 1 --minbytes 4G --maxbytes 4G --stepbytes 1M --op sum --datatype float --root 0 --iters 25 --warmup_iters 1 --agg_iters 1 --average 1 --parallel_init 0 --check 0 --blocking 0 --cudagraph 0 --stepfactor 2 --run_cycles 0 -R 1" & + +# wait for the noise to build up +echo "Waiting ${{NOISE_BUILD_TIME}} seconds for the noise to build up" +sleep ${{NOISE_BUILD_TIME}} + +# Step 4: run the workload with the noise +echo "Launching workload with noise" + + ''' + return noise_cmd + + def setup(self, task: run.Partial | run.Script, executor: run.Executor): + """Enable the performance environment settings""" + + if isinstance(executor, run.SlurmExecutor): + if not executor.additional_parameters: + executor.additional_parameters = {} + if not executor.srun_args: + executor.srun_args = [] + if not executor.env_vars: + executor.env_vars = {} + + # add node numbers to srun args and requirements for workload and noise + self.update_number_of_nodes_for_workload_and_noise(executor) + + # add number of nodes and nodelist to srun args + executor.srun_args.insert(0, "--nodelist=${WORKLOAD_NODES}") + executor.srun_args.insert(0, "--nodes=${NUM_OF_WORKLOAD_NODES}") + + # add noise command + noise_cmd = self.generate_noise_initialization_code(executor.tunnel.job_dir) + executor.setup_lines = ( + executor.setup_lines + noise_cmd + if (executor.setup_lines and len(executor.setup_lines) > 0) + else noise_cmd + ) + + @dataclass(kw_only=True) class NsysPlugin(run.Plugin): """ diff --git a/nemo/lightning/run/scripts/split_nodes/node_allocation/__init__.py b/nemo/lightning/run/scripts/split_nodes/node_allocation/__init__.py new file mode 100644 index 000000000000..ff723371fe47 --- /dev/null +++ b/nemo/lightning/run/scripts/split_nodes/node_allocation/__init__.py @@ -0,0 +1,7 @@ +"""Node allocation package for distributing compute nodes between workloads. + +This package provides utilities and strategies for splitting allocated nodes +between workloads based on their topology information. +""" + +__version__ = "1.0.0" diff --git a/nemo/lightning/run/scripts/split_nodes/node_allocation/parsers.py b/nemo/lightning/run/scripts/split_nodes/node_allocation/parsers.py new file mode 100644 index 000000000000..8e4db70da18b --- /dev/null +++ b/nemo/lightning/run/scripts/split_nodes/node_allocation/parsers.py @@ -0,0 +1,199 @@ +"""Parsers and utility functions for node allocation. + +This module contains functions for parsing topology files, node lists, +and other utility functions needed for node allocation. +""" + +import re +from typing import Dict, List, Optional, Set, Tuple + + +def expand_nodes(raw_string: str) -> List[str]: + """Expand a string containing node specifications into a list of node names. + + Args: + raw_string: String containing node specifications, e.g. "pool1-1195,pool1-[2110-2111]" + or "hgx-isr1-[001-008]" + + Returns: + List of expanded node names + """ + # Extract the nodes part after "Nodes=" + nodes_part = raw_string.split("Nodes=")[1].split()[0] + + expanded = [] + # Split on commas to handle each node specification + for spec in nodes_part.split(','): + # Handle range format: prefix-[start-end] + if '[' in spec and ']' in spec: + # Extract prefix (e.g., "pool1-" or "hgx-isr1-") + prefix = spec.split('[')[0] + range_part = spec.split('[')[1].split(']')[0] + + if '-' in range_part: + start_str, end_str = range_part.split('-') + start = int(start_str) + end = int(end_str) + # Determine padding based on start number's string representation + padding = len(start_str) + for num in range(start, end + 1): + expanded.append(f"{prefix}{num:0{padding}d}") + else: + # Single number in brackets + num = int(range_part) + # Determine padding based on the number's length + padding = len(range_part) + expanded.append(f"{prefix}{num:0{padding}d}") + else: + # Handle individual node format without ranges + expanded.append(spec) + + return expanded + + +def parse_topology_file(topology_file: str) -> Tuple[Dict[str, str], Dict[str, Dict[str, str]]]: + """Parse a topology file and return node-to-switch mapping and switch relationships. + + Args: + topology_file: Path to the topology file + + Returns: + Tuple containing: + - node_to_switch: Dictionary mapping node names to their switch + - switch_hierarchy: Dict containing switch parent-child relationships + """ + with open(topology_file) as f: + topo_output = f.read().strip().splitlines() + + # Parse topology to map nodes to switches + node_to_switch: Dict[str, str] = {} + switch_hierarchy: Dict[str, Dict[str, str]] = { + 'parents': {}, # Maps switches to their parents + 'children': {}, # Maps switches to their children + } + + current_switch = None + + for line in topo_output: + # Look for switch definitions - match any switch name after SwitchName= + m = re.search(r'SwitchName=([^\s]+) Level=(\d+)', line) + if m: + current_switch = m.group(1) + switch_level = int(m.group(2)) + + # For leaf switches (Level 0), find their parents + if switch_level == 0: + parent_match = re.search(r'Switches=(.*?)$', line) + if parent_match: + parents = parent_match.group(1).strip() + switch_hierarchy['parents'][current_switch] = parents + + # Add this switch as a child of its parents + for parent in parents.split(','): + if parent not in switch_hierarchy['children']: + switch_hierarchy['children'][parent] = [] + switch_hierarchy['children'][parent].append(current_switch) + + # Look for node definitions and map them to the current switch + if "Nodes=" in line and current_switch: + expanded = expand_nodes(line) + for node in expanded: + node_to_switch[node] = current_switch + + return node_to_switch, switch_hierarchy + + +def parse_allocated_nodes(allocated_nodes_file: str) -> List[str]: + """Parse a file containing allocated nodes. + + Args: + allocated_nodes_file: Path to the file containing the list of allocated nodes + + Returns: + List of node names + """ + with open(allocated_nodes_file) as f: + allocated_nodes = f.read().strip().split() + return allocated_nodes + + +def parse_node_input(node_input: str, is_file: bool = False) -> List[str]: + """Parse node input, either from a file or directly from a string. + + Args: + node_input: Either a file path or a direct node list string + is_file: Whether the input is a file path + + Returns: + List of node names + """ + if is_file: + with open(node_input) as f: + nodes = f.read().strip().split() + return nodes + else: + # Direct input string, could be compressed, so don't split + return [node_input] + + +def group_nodes_by_switch(allocated_nodes: List[str], node_to_switch: Dict[str, str]) -> Dict[str, List[str]]: + """Group allocated nodes by their switch. + + Args: + allocated_nodes: List of allocated node names + node_to_switch: Dictionary mapping nodes to switches + + Returns: + Dictionary mapping switches to their list of allocated nodes + """ + switch_to_nodes: Dict[str, List[str]] = {} + missing_nodes: List[str] = [] + + for node in allocated_nodes: + switch = node_to_switch.get(node) + if switch: + switch_to_nodes.setdefault(switch, []).append(node) + else: + missing_nodes.append(node) + + if missing_nodes: + print(f"Warning: {len(missing_nodes)} node(s) not found in topology!") + + return switch_to_nodes + + +def calculate_switch_distance(switch1: str, switch2: str, switch_hierarchy: Dict[str, Dict[str, str]]) -> int: + """Calculate the 'distance' between two switches based on topology. + + Distance is defined as: + - 0 if switches are the same + - 1 if they share a direct parent + - 2 if they only share the core switch + + Args: + switch1: First switch name + switch2: Second switch name + switch_hierarchy: Dict containing switch parent-child relationships + + Returns: + Distance between the switches (0, 1, or 2) + """ + if switch1 == switch2: + return 0 + + # Get parents + parents = switch_hierarchy.get('parents', {}) + + # If we don't have parent info, assume maximum distance + if switch1 not in parents or switch2 not in parents: + return 2 + + parent1 = parents.get(switch1) + parent2 = parents.get(switch2) + + # If they share a direct parent, they're close + if parent1 and parent2 and parent1 == parent2: + return 1 + + # Otherwise, they meet at the core + return 2 diff --git a/nemo/lightning/run/scripts/split_nodes/node_allocation/strategies.py b/nemo/lightning/run/scripts/split_nodes/node_allocation/strategies.py new file mode 100644 index 000000000000..b6c96c0ccec9 --- /dev/null +++ b/nemo/lightning/run/scripts/split_nodes/node_allocation/strategies.py @@ -0,0 +1,284 @@ +"""Node allocation strategies. + +This module contains different strategies for allocating nodes between workloads. +""" + +import sys +from typing import Dict, List, Optional, Set, Tuple + +from .parsers import calculate_switch_distance + + +def _handle_single_switch_case( + switch_to_nodes: Dict[str, List[str]], workload_a_nodes: int +) -> Tuple[List[str], List[str]]: + """Handle the special case where all nodes are under a single leaf switch. + + In this case, we simply allocate the first N nodes to workload A and the rest to workload B. + + Args: + switch_to_nodes: Dictionary mapping switches to their allocated nodes + workload_a_nodes: Required number of nodes for workload A + + Returns: + tuple: (workload_a, workload_b) lists of nodes + """ + # Get all nodes and sort them for deterministic behavior + all_nodes = [] + for nodes in switch_to_nodes.values(): + all_nodes.extend(sorted(nodes)) + + # Allocate first N nodes to workload A, rest to workload B + workload_a = all_nodes[:workload_a_nodes] + workload_b = all_nodes[workload_a_nodes:] + + return workload_a, workload_b + + +def evenly_split_nodes_between_workloads( + switch_to_nodes: Dict[str, List[str]], workload_a_nodes: int, workload_b_nodes: int +) -> Tuple[List[str], List[str]]: + """Split nodes evenly between workloads, balancing across switches. + + This strategy distributes nodes between workloads A and B while trying to maintain + balance across switches. For each switch, it allocates nodes proportionally based + on the overall workload requirements. + + Args: + switch_to_nodes: Dictionary mapping switches to their allocated nodes + workload_a_nodes: Required number of nodes for workload A + workload_b_nodes: Required number of nodes for workload B + + Returns: + tuple: (workload_a, workload_b) lists of nodes + """ + # Special case: If all nodes are under a single leaf switch + if len(switch_to_nodes) == 1: + return _handle_single_switch_case(switch_to_nodes, workload_a_nodes) + + # Prepare workload lists + workload_a: List[str] = [] + workload_b: List[str] = [] + + # Track the balance of nodes between workloads + total_a: int = 0 + total_b: int = 0 + + # Calculate total nodes to allocate + total_nodes: int = workload_a_nodes + workload_b_nodes + + # Collect all nodes from all switches for allocation + all_nodes: List[str] = [] + for switch in sorted(switch_to_nodes.keys()): + nodes = sorted(switch_to_nodes[switch]) + all_nodes.extend(nodes) + + # Track allocated nodes to identify unallocated ones later + allocated_nodes: Set[str] = set() + + # Process switches in a deterministic order + for switch in sorted(switch_to_nodes.keys()): + nodes = sorted(switch_to_nodes[switch]) # Sort for deterministic behavior + num_nodes = len(nodes) + + # Skip if we don't need any more nodes for either workload + if total_a >= workload_a_nodes and total_b >= workload_b_nodes: + break + + # Calculate how many nodes should go to each workload + nodes_left_a = max(0, workload_a_nodes - total_a) + nodes_left_b = max(0, workload_b_nodes - total_b) + + # If we have enough nodes, allocate as needed + if num_nodes > nodes_left_a + nodes_left_b: + # Figure out how to divide this switch's nodes + if nodes_left_a == 0: + # All to B + a_count = 0 + b_count = min(num_nodes, nodes_left_b) + elif nodes_left_b == 0: + # All to A + a_count = min(num_nodes, nodes_left_a) + b_count = 0 + else: + # Divide proportionally based on remaining needs + a_ratio = nodes_left_a / (nodes_left_a + nodes_left_b) + a_count = round(num_nodes * a_ratio) + + # Adjust to ensure we don't exceed limits + a_count = min(a_count, nodes_left_a) + b_count = min(num_nodes - a_count, nodes_left_b) + else: + # Not enough nodes left, proportionally allocate what we have + if workload_a_nodes == 0: + a_count = 0 + b_count = num_nodes + elif workload_b_nodes == 0: + a_count = num_nodes + b_count = 0 + else: + a_ratio = workload_a_nodes / (workload_a_nodes + workload_b_nodes) + a_count = round(num_nodes * a_ratio) + a_count = min(a_count, nodes_left_a) + b_count = min(num_nodes - a_count, nodes_left_b) + + # Assign nodes to workloads + workload_a.extend(nodes[:a_count]) + workload_b.extend(nodes[a_count : a_count + b_count]) + + # Track which nodes have been allocated + allocated_nodes.update(nodes[: a_count + b_count]) + + # Update totals + total_a += a_count + total_b += b_count + + # Find unallocated nodes and assign them to workload B + # unallocated_nodes = sorted(set(all_nodes) - allocated_nodes) + # if unallocated_nodes: + # # print(f"Info: Assigning {len(unallocated_nodes)} unallocated nodes to workload B", file=sys.stderr) + # workload_b.extend(unallocated_nodes) + + return workload_a, workload_b + + +def compact_split_nodes_between_workloads( + switch_to_nodes: Dict[str, List[str]], + node_to_switch: Dict[str, str], + switch_hierarchy: Dict[str, Dict[str, str]], + workload_a_nodes: int, + workload_b_nodes: int, + victim_nodes: int = 1, +) -> Tuple[List[str], List[str]]: + """Split nodes between workloads using a compact allocation strategy. + + This strategy: + 1. Finds the leaf switch with the most allocated nodes + 2. Places 1 node from this switch in workload A + 3. Places nodes from the largest switch in workload B up to the number of nodes needed + 4. Fills workload A with nodes from nearby switches (based on topology) + 5. If needed, uses more nodes from the largest switch to meet workload A requirements + 6. Places remaining nodes in workload B up to the number of nodes needed + + Args: + switch_to_nodes: Dictionary mapping switches to their allocated nodes + node_to_switch: Dictionary mapping nodes to their switches + switch_hierarchy: Dict containing switch hierarchy information + workload_a_nodes: Number of nodes required for workload A + workload_b_nodes: Number of nodes required for workload B + Returns: + tuple: (workload_a, workload_b) lists of nodes + """ + # Special case: If all nodes are under a single leaf switch + if len(switch_to_nodes) == 1: + return _handle_single_switch_case(switch_to_nodes, workload_a_nodes) + + workload_a: List[str] = [] + workload_b: List[str] = [] + + # Collect all nodes for tracking unallocated ones + all_nodes: List[str] = [] + for nodes in switch_to_nodes.values(): + all_nodes.extend(nodes) + + # Find the leaf switch with the most allocated nodes + largest_switch: Optional[str] = None + most_nodes: int = 0 + for switch, nodes in switch_to_nodes.items(): + if len(nodes) > most_nodes: + most_nodes = len(nodes) + largest_switch = switch + + # If we have no nodes, return empty workloads + if not largest_switch: + print(f"Info: Unable to find the largest switch, returning empty workloads", file=sys.stderr) + return workload_a, workload_b + + # if len(switch_to_nodes[largest_switch]) < 27: + # print(f"Info: Got only {len(switch_to_nodes[largest_switch])} nodes from the largest switch, returning empty workloads", file=sys.stderr) + # return workload_a, workload_b + + # Sort nodes for deterministic behavior + sorted_nodes = sorted(switch_to_nodes[largest_switch]) + print(f"Info: Largest switch: {largest_switch}, node count: {len(sorted_nodes)}") + + # verifying that we have enough nodes in the largest switch + if len(sorted_nodes) < victim_nodes + workload_b_nodes // 2: + print(f"Info: Not enough nodes in the largest switch, returning empty workloads", file=sys.stderr) + return workload_a, workload_b + + # Place 1 node from the largest switch in workload A + nodes_needed_for_a = workload_a_nodes - len(workload_a) + required_victim_nodes = min(victim_nodes, nodes_needed_for_a, len(sorted_nodes)) + if required_victim_nodes > 0: + workload_a.extend(sorted_nodes[:required_victim_nodes]) + largest_switch_remaining_nodes = sorted_nodes[required_victim_nodes:] + else: + largest_switch_remaining_nodes = sorted_nodes + + # Keep a copy of the remaining nodes from the largest switch for potential later use + + # Take nodes from the largest switch for workload B up to the number of nodes needed + nodes_needed_for_b_from_largest_switch = min(workload_b_nodes // 2, len(largest_switch_remaining_nodes)) + print(f"Info: Taking {nodes_needed_for_b_from_largest_switch} nodes from the largest switch for workload B") + workload_b.extend(largest_switch_remaining_nodes[:nodes_needed_for_b_from_largest_switch]) + largest_switch_remaining_nodes = largest_switch_remaining_nodes[nodes_needed_for_b_from_largest_switch:] + + # Sort remaining switches by "distance" from the largest switch + remaining_switches = [ + (switch, calculate_switch_distance(largest_switch, switch, switch_hierarchy)) + for switch in switch_to_nodes.keys() + if switch != largest_switch + ] + remaining_switches.sort(key=lambda x: x[1]) # Sort by distance + print(f"Info: Remaining switches: {remaining_switches}") + + # Fill workload A with nodes from nearby switches + nodes_needed_for_a = workload_a_nodes - len(workload_a) + nodes_needed_for_b = workload_b_nodes - len(workload_b) + + for switch, _ in remaining_switches: + # Sort nodes in this switch for deterministic behavior + switch_nodes = sorted(switch_to_nodes[switch]) + print(f"Info: Switch: {switch}, node count: {len(switch_nodes)}") + + if nodes_needed_for_b > 0: + if len(switch_nodes) <= nodes_needed_for_b: + workload_b.extend(switch_nodes) + nodes_needed_for_b -= len(switch_nodes) + continue + else: + # Take only as many as we need + workload_b.extend(switch_nodes[:nodes_needed_for_b]) + switch_nodes = switch_nodes[nodes_needed_for_b:] + nodes_needed_for_b = 0 + + # If we can take all nodes from this switch + if nodes_needed_for_a > 0: + if len(switch_nodes) <= nodes_needed_for_a: + workload_a.extend(switch_nodes) + nodes_needed_for_a -= len(switch_nodes) + continue + else: + # Take only as many as we need + workload_a.extend(switch_nodes[:nodes_needed_for_a]) + switch_nodes = switch_nodes[nodes_needed_for_a:] + nodes_needed_for_a = 0 + + # If we have enough nodes, stop + if nodes_needed_for_a <= 0 and nodes_needed_for_b <= 0: + break + + # If we still need more nodes for workload A, use nodes from the largest switch + if nodes_needed_for_a > 0: + # Take what we need from the largest switch's remaining nodes + workload_a.extend(largest_switch_remaining_nodes[:nodes_needed_for_a]) + + if nodes_needed_for_b > 0: + # Find unallocated nodes (nodes not yet assigned to either workload) + allocated_nodes = set(workload_a + workload_b) + unallocated_nodes = sorted(set(all_nodes) - allocated_nodes) + # print(f"Info: Assigning {len(unallocated_nodes)} unallocated nodes to workload B", file=sys.stderr) + workload_b.extend(unallocated_nodes[:nodes_needed_for_b]) + + return workload_a, workload_b diff --git a/nemo/lightning/run/scripts/split_nodes/split_nodes_by_leaf.py b/nemo/lightning/run/scripts/split_nodes/split_nodes_by_leaf.py new file mode 100644 index 000000000000..0d237155a07e --- /dev/null +++ b/nemo/lightning/run/scripts/split_nodes/split_nodes_by_leaf.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +"""Split nodes by leaf switch for workload distribution. + +This script takes a list of allocated nodes and a cluster topology file, +and distributes the nodes between two workloads based on their position +in the network topology. + +Example usage: + python split_nodes_by_leaf.py allocated_nodes.txt topology.txt --workload-a-nodes 3 --workload-b-nodes 5 +""" + +import os +import sys +from contextlib import redirect_stdout +from io import StringIO +from pathlib import Path +from typing import Dict, List, Optional + +import click +from node_allocation.parsers import group_nodes_by_switch, parse_allocated_nodes, parse_topology_file +from node_allocation.strategies import compact_split_nodes_between_workloads, evenly_split_nodes_between_workloads + + +def process_files( + allocated_nodes_file: str, + topology_file: str, + output_file: str, + strategy: str = 'compact', + workload_a_nodes: Optional[int] = None, + workload_b_nodes: Optional[int] = None, + victim_nodes: int = 1, +) -> None: + """Core processing logic that orchestrates node allocation. + + This function reads input files, validates arguments, and applies + the appropriate allocation strategy. + + Args: + allocated_nodes_file: Path to file containing list of allocated nodes + topology_file: Path to file containing cluster topology information + strategy: 'even' for balanced allocation, 'compact' for compact allocation + workload_a_nodes: Required number of nodes for workload A + workload_b_nodes: Required number of nodes for workload B + + Returns: + None: Results are printed to standard output + """ + # Parse input files + allocated_nodes = parse_allocated_nodes(allocated_nodes_file) + node_to_switch, switch_hierarchy = parse_topology_file(topology_file) + + # Group allocated nodes by switch + switch_to_nodes = group_nodes_by_switch(allocated_nodes, node_to_switch) + print(switch_to_nodes) + + total_nodes = len(allocated_nodes) + + # Validate node count requirements + if workload_a_nodes is None or workload_b_nodes is None: + print("Error: Both workload_a_nodes and workload_b_nodes must be specified", file=sys.stderr) + sys.exit(1) + + # Validate the specified node counts + if workload_a_nodes + workload_b_nodes > total_nodes: + print( + f"Error: Requested nodes ({workload_a_nodes + workload_b_nodes}) exceeds available nodes ({total_nodes})", + file=sys.stderr, + ) + sys.exit(1) + + # Apply the selected allocation strategy + if strategy == 'even': + # Use the even split method + workload_a, workload_b = evenly_split_nodes_between_workloads( + switch_to_nodes, workload_a_nodes, workload_b_nodes + ) + else: + # Use the compact allocation strategy + workload_a, workload_b = compact_split_nodes_between_workloads( + switch_to_nodes, node_to_switch, switch_hierarchy, workload_a_nodes, workload_b_nodes, victim_nodes + ) + + if len(workload_a) != workload_a_nodes or len(workload_b) != workload_b_nodes: + print( + f"Error: Requested {workload_a_nodes} nodes for workload A and {workload_b_nodes} nodes for workload B, but got {len(workload_a)} and {len(workload_b)} nodes respectively", + file=sys.stderr, + ) + sys.exit(1) + + from collections import Counter + + # Get unique switches for each workload + workload_a_switches = Counter(node_to_switch[node] for node in workload_a) + workload_b_switches = Counter(node_to_switch[node] for node in workload_b) + + print("Switch Distribution:") + print("Workload A switches:") + for switch, count in workload_a_switches.items(): + print(f"{switch}: {count}") + print("Workload B switches:") + for switch, count in workload_b_switches.items(): + print(f"{switch}: {count}") + + # Output comma-separated nodelists + with open(output_file, 'w') as f: + print(",".join(workload_a), file=f) + print(",".join(workload_b), file=f) + + +@click.command() +@click.option('--allocated-nodes-file', type=click.Path(exists=True), required=True) +@click.option('--topology-file', type=click.Path(exists=True), required=True) +@click.option( + '--strategy', + type=click.Choice(['even', 'compact']), + default='compact', + help='Node allocation strategy: "even" balances nodes across switches, ' + '"compact" places most of workload A nodes on same/nearby switches', + required=True, +) +@click.option('--workload-a-nodes', type=int, required=True, help='Number of nodes required for workload A') +@click.option('--workload-b-nodes', type=int, required=True, help='Number of nodes required for workload B') +@click.option( + '--victim-nodes', + type=int, + required=True, + default=1, + help='Number of victim nodes required in workload A in compact strategy', +) +@click.option('--output-file', type=click.Path(exists=False), required=False, help='File to write the output to') +def main( + allocated_nodes_file: str, + topology_file: str, + strategy: str, + workload_a_nodes: int, + workload_b_nodes: int, + output_file: str, + victim_nodes: int, +) -> None: + """Split nodes by leaf switch for workload distribution. + + ALLOCATED_NODES_FILE: File containing list of allocated nodes + TOPOLOGY_FILE: File containing cluster topology information + """ + process_files( + allocated_nodes_file, topology_file, output_file, strategy, workload_a_nodes, workload_b_nodes, victim_nodes + ) + + +def test_main() -> None: + """Test the main function with sample inputs.""" + # Print debug info first + print(f"Current working directory: {os.getcwd()}") + nodes_path = Path('allocated_nodes.txt') + topo_path = Path('topology.txt') + print(f"Nodes file exists: {nodes_path.exists()}") + print(f"Topology file exists: {topo_path.exists()}") + + a_nodes = 16 + b_nodes = 60 + victim_nodes = 1 + # Test even strategy with specific node counts + print(f"\n=== Testing EVEN strategy with {a_nodes} nodes for A, {b_nodes} for B ===") + split_nodes_path = Path('split-nodes-even.txt') + process_files(str(nodes_path), str(topo_path), str(split_nodes_path), 'even', a_nodes, b_nodes) + + with open(split_nodes_path, 'r') as f: + split_nodes = f.read().split('\n') + print(f"Split nodes: {split_nodes}") + + print(f"Workload A ({a_nodes} nodes):", split_nodes[0]) + print(f"Workload B ({b_nodes} nodes):", split_nodes[1]) + + node_to_switch, _ = parse_topology_file(str(topo_path)) + + # Print switch distribution for each workload + workload_a_nodes = split_nodes[0].split(',') if split_nodes[0] else [] + workload_b_nodes = split_nodes[1].split(',') if split_nodes[1] else [] + print(f"Workload A nodes: '{workload_a_nodes}'") + print(f"Workload B nodes: '{workload_b_nodes}'") + + # Test compact strategy with specific node counts + print(f"\n=== Testing COMPACT strategy with {a_nodes} nodes for A, {b_nodes} for B ===") + split_nodes_path = Path('split-nodes-compact.txt') + process_files(str(nodes_path), str(topo_path), str(split_nodes_path), 'compact', a_nodes, b_nodes, victim_nodes) + + with open(split_nodes_path, 'r') as f: + split_nodes = f.read().split('\n') + + print(f"Workload A ({a_nodes} nodes):", split_nodes[0]) + print(f"Workload B ({b_nodes} nodes):", split_nodes[1]) + + +if __name__ == "__main__": + if len(sys.argv) > 1 and sys.argv[1] == "--test": + test_main() + else: + main() diff --git a/scripts/performance/argument_parser.py b/scripts/performance/argument_parser.py index 701e6cd35b0f..75e8261beb87 100644 --- a/scripts/performance/argument_parser.py +++ b/scripts/performance/argument_parser.py @@ -534,6 +534,45 @@ def list_of_strings(arg): default=0, ) + parser.add_argument( + "--enable_isolation", + help="Activate isolation test with noisy neighbors. Disabled by default", + action="store_true", + required=False, + ) + + parser.add_argument( + "--isolation_container_image", + type=str, + help="Container image to use for isolation test", + required=False, + default="nvcr.io/nvidia/pytorch:25.02-py3", + ) + + parser.add_argument( + "--isolation_noise_pairs", + type=int, + help="Number of noisy neighbor pairs to run isolation test with", + required=False, + default=1, + ) + + parser.add_argument( + "--isolation_min_victims_nodes", + type=int, + help="Minimum number of victims nodes for isolation test", + required=False, + default=1, + ) + + parser.add_argument( + "--isolation_extra_nodes_to_allocate", + type=int, + help="Extra nodes to allocate for the job to increase the chance of having enough nodes under single leaf switch", + required=False, + default=0, + ) + parser.add_argument( "-vb", "--enable_vboost", diff --git a/scripts/performance/llm/pretrain_nemotronh_56b.py b/scripts/performance/llm/pretrain_nemotronh_56b.py index df54f9827487..1a7bdfc7a6bd 100644 --- a/scripts/performance/llm/pretrain_nemotronh_56b.py +++ b/scripts/performance/llm/pretrain_nemotronh_56b.py @@ -19,7 +19,7 @@ from nemo.collections.llm.recipes.nemotronh_56b import pretrain_recipe from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer -from nemo.lightning.run.plugins import NsysPlugin +from nemo.lightning.run.plugins import IsolationTestPlugin, NsysPlugin from ..argument_parser import parse_additional_slurm_params, parse_cli_args from ..executors import slurm_executor @@ -156,6 +156,17 @@ def override_recipe_configs( if args.gpu.lower() == 'gb200': custom_env_vars |= {"NCCL_NET_GDR_LEVEL": "PHB"} + if args.enable_isolation: + plugins.append( + IsolationTestPlugin( + isolation_container_image=args.isolation_container_image, + num_of_noise_pairs=args.isolation_noise_pairs, + min_victims_nodes=args.isolation_min_victims_nodes, + extra_nodes_to_allocate=args.isolation_extra_nodes_to_allocate, + ) + ) + exp_name = exp_name + "_isolation" + if args.enable_nsys: plugins.append( NsysPlugin(