Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions nemo/lightning/run/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,126 @@ def setup(self, task: run.Partial | run.Script, executor: run.Executor):
_merge_callbacks(task, callbacks=callbacks)


@dataclass(kw_only=True)
class IsolationTestPlugin(run.Plugin):
"""
A plugin for adding a noise job to the workload job for measuring the impact of noisy neighbors
on the performance of the workload.

Args:
num_of_noise_pairs: Number of noise pairs to run
min_victims_nodes: Minimum number of victims nodes for isolation test
extra_nodes_to_allocate: Extra nodes to allocate for the job to increase the chance of having enough nodes under single leaf switch
isolation_container_image: Container image to use for isolation test
"""

num_of_noise_pairs: int = 1
min_victims_nodes: int = 1
extra_nodes_to_allocate: int = 0
isolation_container_image: str = "nvcr.io/nvidia/pytorch:25.02-py3"

def update_number_of_nodes_for_workload_and_noise(self, executor: run.Executor):
"Update the number of nodes for the workload and noise"
num_of_workload_nodes = executor.nodes
num_of_noise_nodes = 2 * self.num_of_noise_pairs
total_num_of_nodes = num_of_workload_nodes + num_of_noise_nodes + self.extra_nodes_to_allocate

# set total number of nodes for sbatch
executor.nodes = total_num_of_nodes

# initialize env vars for workload and noise

executor.env_vars["NUM_OF_WORKLOAD_NODES"] = str(num_of_workload_nodes)
executor.env_vars["NUM_OF_NOISE_NODES"] = str(num_of_noise_nodes)
executor.env_vars["MIN_VICTIMS_NODES"] = str(self.min_victims_nodes)
executor.env_vars["NOISE_BUILD_TIME"] = str(120)

def generate_noise_initialization_code(self, job_dir):
"Generate the noise preparation and execution code"

current_file_path = os.path.abspath(__file__)
current_directory = os.path.dirname(current_file_path)
split_nodes_dir = os.path.join(current_directory, "scripts", "split_nodes")

noise_cmd = f'''

# Step 1: Save temporary input files
scontrol show hostnames $SLURM_NODELIST > {job_dir}/allocated_nodes.txt
scontrol show topo > {job_dir}/topology.txt

# Step 2: Call external Python script
python {split_nodes_dir}/split_nodes_by_leaf.py --allocated-nodes-file {job_dir}/allocated_nodes.txt --topology-file {job_dir}/topology.txt --strategy compact --workload-a-nodes ${{NUM_OF_WORKLOAD_NODES}} --workload-b-nodes ${{NUM_OF_NOISE_NODES}} --victim-nodes ${{MIN_VICTIMS_NODES}} --output-file {job_dir}/split-nodes.txt

readarray -t SPLIT_NODES_OUTPUT < <(cat {job_dir}/split-nodes.txt)
WORKLOAD_NODES=${{SPLIT_NODES_OUTPUT[0]}}
NOISE_NODES=${{SPLIT_NODES_OUTPUT[1]}}

echo "WORKLOAD_NODES: ${{WORKLOAD_NODES}}"
echo "NOISE_NODES: ${{NOISE_NODES}}"

IFS=',' read -ra NUM_ALLOCATED_WORKLOAD_NODES <<< "$WORKLOAD_NODES"
IFS=',' read -ra NUM_ALLOCATED_NOISE_NODES <<< "$NOISE_NODES"

# This is here to debug the script and make sure the number of nodes is correct
NUM_ALLOCATED_WORKLOAD_NODES=${{#NUM_ALLOCATED_WORKLOAD_NODES[@]}}
NUM_ALLOCATED_NOISE_NODES=${{#NUM_ALLOCATED_NOISE_NODES[@]}}

# Verify that the number of LLM nodes matches expected value
if [ "${{NUM_ALLOCATED_WORKLOAD_NODES}}" -ne ${{NUM_OF_WORKLOAD_NODES}} ]; then
echo "Error: Expected ${{NUM_OF_WORKLOAD_NODES}} for workload A, but got ${{NUM_ALLOCATED_WORKLOAD_NODES}}"
exit 1
fi

# Verify that the number of noisy neighbor nodes matches expected value
if [ "${{NUM_ALLOCATED_NOISE_NODES}}" -ne ${{NUM_OF_NOISE_NODES}} ]; then
echo "Error: Expected ${{NUM_OF_NOISE_NODES}} for workload B, but got ${{NUM_ALLOCATED_NOISE_NODES}}"
exit 1
fi


# Step 3: Run the noise job
mkdir -p {job_dir}/noise
echo "Launching noise in the background"
NUMBER_OF_PAIRS=$(($NUM_OF_NOISE_NODES * 4)) # we have 8 GPUS per node, and we want to split to pairs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 GPUs per node is only true for h100 nodes. B200, GB200, GB300 nodes usually have 4 GPUs per node

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to understand how to define similar test for other systems, since they need to be different.
I added check for it in llmb side

srun --nodes=${{NUM_OF_NOISE_NODES}} --nodelist=${{NOISE_NODES}} --label --ntasks-per-node=8 --output={job_dir}/noise/out.log --error={job_dir}/noise/err.log --export=ALL --mpi=pmix --container-image={self.isolation_container_image} bash -c "NCCL_TESTS_SPLIT=%${{NUMBER_OF_PAIRS}} NCCL_IB_QPS_PER_CONNECTION=4 sendrecv_perf_mpi --nthreads 1 --ngpus 1 --minbytes 4G --maxbytes 4G --stepbytes 1M --op sum --datatype float --root 0 --iters 25 --warmup_iters 1 --agg_iters 1 --average 1 --parallel_init 0 --check 0 --blocking 0 --cudagraph 0 --stepfactor 2 --run_cycles 0 -R 1" &

# wait for the noise to build up
echo "Waiting ${{NOISE_BUILD_TIME}} seconds for the noise to build up"
sleep ${{NOISE_BUILD_TIME}}

# Step 4: run the workload with the noise
echo "Launching workload with noise"

'''
return noise_cmd

def setup(self, task: run.Partial | run.Script, executor: run.Executor):
"""Enable the performance environment settings"""

if isinstance(executor, run.SlurmExecutor):
if not executor.additional_parameters:
executor.additional_parameters = {}
if not executor.srun_args:
executor.srun_args = []
if not executor.env_vars:
executor.env_vars = {}

# add node numbers to srun args and requirements for workload and noise
self.update_number_of_nodes_for_workload_and_noise(executor)

# add number of nodes and nodelist to srun args
executor.srun_args.insert(0, "--nodelist=${WORKLOAD_NODES}")
executor.srun_args.insert(0, "--nodes=${NUM_OF_WORKLOAD_NODES}")

# add noise command
noise_cmd = self.generate_noise_initialization_code(executor.tunnel.job_dir)
executor.setup_lines = (
executor.setup_lines + noise_cmd
if (executor.setup_lines and len(executor.setup_lines) > 0)
else noise_cmd
)


@dataclass(kw_only=True)
class NsysPlugin(run.Plugin):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Node allocation package for distributing compute nodes between workloads.

This package provides utilities and strategies for splitting allocated nodes
between workloads based on their topology information.
"""

__version__ = "1.0.0"
199 changes: 199 additions & 0 deletions nemo/lightning/run/scripts/split_nodes/node_allocation/parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Parsers and utility functions for node allocation.

This module contains functions for parsing topology files, node lists,
and other utility functions needed for node allocation.
"""

import re
from typing import Dict, List, Optional, Set, Tuple


def expand_nodes(raw_string: str) -> List[str]:
"""Expand a string containing node specifications into a list of node names.

Args:
raw_string: String containing node specifications, e.g. "pool1-1195,pool1-[2110-2111]"
or "hgx-isr1-[001-008]"

Returns:
List of expanded node names
"""
# Extract the nodes part after "Nodes="
nodes_part = raw_string.split("Nodes=")[1].split()[0]

expanded = []
# Split on commas to handle each node specification
for spec in nodes_part.split(','):
# Handle range format: prefix-[start-end]
if '[' in spec and ']' in spec:
# Extract prefix (e.g., "pool1-" or "hgx-isr1-")
prefix = spec.split('[')[0]
range_part = spec.split('[')[1].split(']')[0]

if '-' in range_part:
start_str, end_str = range_part.split('-')
start = int(start_str)
end = int(end_str)
# Determine padding based on start number's string representation
padding = len(start_str)
for num in range(start, end + 1):
expanded.append(f"{prefix}{num:0{padding}d}")
else:
# Single number in brackets
num = int(range_part)
# Determine padding based on the number's length
padding = len(range_part)
expanded.append(f"{prefix}{num:0{padding}d}")
else:
# Handle individual node format without ranges
expanded.append(spec)

return expanded


def parse_topology_file(topology_file: str) -> Tuple[Dict[str, str], Dict[str, Dict[str, str]]]:
"""Parse a topology file and return node-to-switch mapping and switch relationships.

Args:
topology_file: Path to the topology file

Returns:
Tuple containing:
- node_to_switch: Dictionary mapping node names to their switch
- switch_hierarchy: Dict containing switch parent-child relationships
"""
with open(topology_file) as f:
topo_output = f.read().strip().splitlines()

# Parse topology to map nodes to switches
node_to_switch: Dict[str, str] = {}
switch_hierarchy: Dict[str, Dict[str, str]] = {
'parents': {}, # Maps switches to their parents
'children': {}, # Maps switches to their children
}

current_switch = None

for line in topo_output:
# Look for switch definitions - match any switch name after SwitchName=
m = re.search(r'SwitchName=([^\s]+) Level=(\d+)', line)
if m:
current_switch = m.group(1)
switch_level = int(m.group(2))

# For leaf switches (Level 0), find their parents
if switch_level == 0:
parent_match = re.search(r'Switches=(.*?)$', line)
if parent_match:
parents = parent_match.group(1).strip()
switch_hierarchy['parents'][current_switch] = parents

# Add this switch as a child of its parents
for parent in parents.split(','):
if parent not in switch_hierarchy['children']:
switch_hierarchy['children'][parent] = []
switch_hierarchy['children'][parent].append(current_switch)

# Look for node definitions and map them to the current switch
if "Nodes=" in line and current_switch:
expanded = expand_nodes(line)
for node in expanded:
node_to_switch[node] = current_switch

return node_to_switch, switch_hierarchy


def parse_allocated_nodes(allocated_nodes_file: str) -> List[str]:
"""Parse a file containing allocated nodes.

Args:
allocated_nodes_file: Path to the file containing the list of allocated nodes

Returns:
List of node names
"""
with open(allocated_nodes_file) as f:
allocated_nodes = f.read().strip().split()
return allocated_nodes


def parse_node_input(node_input: str, is_file: bool = False) -> List[str]:
"""Parse node input, either from a file or directly from a string.

Args:
node_input: Either a file path or a direct node list string
is_file: Whether the input is a file path

Returns:
List of node names
"""
if is_file:
with open(node_input) as f:
nodes = f.read().strip().split()
return nodes
else:
# Direct input string, could be compressed, so don't split
return [node_input]


def group_nodes_by_switch(allocated_nodes: List[str], node_to_switch: Dict[str, str]) -> Dict[str, List[str]]:
"""Group allocated nodes by their switch.

Args:
allocated_nodes: List of allocated node names
node_to_switch: Dictionary mapping nodes to switches

Returns:
Dictionary mapping switches to their list of allocated nodes
"""
switch_to_nodes: Dict[str, List[str]] = {}
missing_nodes: List[str] = []

for node in allocated_nodes:
switch = node_to_switch.get(node)
if switch:
switch_to_nodes.setdefault(switch, []).append(node)
else:
missing_nodes.append(node)

if missing_nodes:
print(f"Warning: {len(missing_nodes)} node(s) not found in topology!")

return switch_to_nodes


def calculate_switch_distance(switch1: str, switch2: str, switch_hierarchy: Dict[str, Dict[str, str]]) -> int:
"""Calculate the 'distance' between two switches based on topology.

Distance is defined as:
- 0 if switches are the same
- 1 if they share a direct parent
- 2 if they only share the core switch

Args:
switch1: First switch name
switch2: Second switch name
switch_hierarchy: Dict containing switch parent-child relationships

Returns:
Distance between the switches (0, 1, or 2)
"""
if switch1 == switch2:
return 0

# Get parents
parents = switch_hierarchy.get('parents', {})

# If we don't have parent info, assume maximum distance
if switch1 not in parents or switch2 not in parents:
return 2

parent1 = parents.get(switch1)
parent2 = parents.get(switch2)

# If they share a direct parent, they're close
if parent1 and parent2 and parent1 == parent2:
return 1

# Otherwise, they meet at the core
return 2
Loading