Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,19 +333,37 @@ def build_dict(**kwargs):
return {k: v for k, v in kwargs.items() if v is not None}

# Build resources
if self.instance_type is None:
requests_value = limits_value = {"nvidia.com/gpu": "0"}
else:
requests_value = build_dict(
accelerators=str(self.accelerators) if self.accelerators else None,
vcpu=str(self.vcpu) if self.vcpu else None,
memory=str(self.memory) if self.memory else None
)
limits_value = build_dict(
accelerators=str(self.accelerators_limit) if self.accelerators_limit else None,
vcpu=str(self.vcpu_limit) if self.vcpu_limit else None,
memory=str(self.memory_limit) if self.memory_limit else None
)
requests_value = {}
limits_value = {}

# Add GPU resources (respect accelerators regardless of instance_type)
if self.accelerators:
requests_value["nvidia.com/gpu"] = str(self.accelerators)
if self.accelerators_limit:
limits_value["nvidia.com/gpu"] = str(self.accelerators_limit)

# Add CPU resources
if self.vcpu:
requests_value["cpu"] = str(self.vcpu)
if self.vcpu_limit:
limits_value["cpu"] = str(self.vcpu_limit)

# Add memory resources
if self.memory:
requests_value["memory"] = f"{self.memory}Gi"
if self.memory_limit:
limits_value["memory"] = f"{self.memory_limit}Gi"

# Add EFA for multi-node jobs
if self.node_count and self.node_count > 1:
requests_value["vpc.amazonaws.com/efa"] = "1"
limits_value["vpc.amazonaws.com/efa"] = "1"

# Set default GPU to "0" only if no resources specified at all
if not requests_value:
requests_value = {"nvidia.com/gpu": "0"}
if not limits_value:
limits_value = {"nvidia.com/gpu": "0"}

# Build container
container_kwargs = build_dict(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
{%- endfor %}
{%- endif %}
resources:
{%- if accelerators or vcpu or memory %}
{%- if accelerators or vcpu or memory or (node_count and node_count > 1) %}
requests:
{%- if accelerators %}
nvidia.com/gpu: {{ accelerators }}
Expand All @@ -95,11 +95,14 @@
{%- if memory %}
memory: {{ memory }}Gi
{%- endif %}
{%- if (node_count and node_count > 1) %}
vpc.amazonaws.com/efa: 1
{%- endif %}
{%- else %}
requests:
nvidia.com/gpu: "0"
{%- endif %}
{%- if accelerators_limit or vcpu_limit or memory_limit %}
{%- if accelerators_limit or vcpu_limit or memory_limit or (node_count and node_count > 1) %}
limits:
{%- if accelerators_limit %}
nvidia.com/gpu: {{ accelerators_limit }}
Expand All @@ -110,6 +113,9 @@
{%- if memory_limit %}
memory: {{ memory_limit }}Gi
{%- endif %}
{%- if (node_count and node_count > 1) %}
vpc.amazonaws.com/efa: 1
{%- endif %}
{%- else %}
limits:
nvidia.com/gpu: "0"
Expand Down
19 changes: 18 additions & 1 deletion src/sagemaker/hyperpod/cli/commands/cluster_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@
logger = logging.getLogger(__name__)


def get_newest_template_version() -> int:
"""Get the newest available template version.

Returns:
int: The newest template version number

TODO: Implement logic to fetch the actual newest template version
from the template registry or remote source.
"""
# Placeholder implementation - currently returns 1 as the latest version
return 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1 the latest version? I think you're right, but just confirming if that's how Pintao set up cluster versioning I don't remember.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task needs follow up when Pintao is back. Currently only 1 is supported as the latest version.

Copy link
Collaborator Author

@mollyheamazon mollyheamazon Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#285 This is the previous PR for template versioning.



def parse_status_list(ctx, param, value):
"""Parse status list from string format like "['CREATE_COMPLETE', 'UPDATE_COMPLETE']" """
if not value:
Expand Down Expand Up @@ -79,7 +92,6 @@ def create_cluster_stack(config_file, region, template_version, debug):
return

# Load config to get template and version

config_dir = Path(config_file).parent
data, template, version = load_config(config_dir)

Expand All @@ -95,6 +107,11 @@ def create_cluster_stack(config_file, region, template_version, debug):
model_instance = model_class(**filtered_config)
config = model_instance.to_config(region=region)

# Use newest template version if not provided
if template_version is None:
template_version = get_newest_template_version()
logger.info(f"No template version specified, using newest version: {template_version}")

# Create the cluster stack
stack_id = HpClusterStack(**config).create(region, template_version)

Expand Down
5 changes: 5 additions & 0 deletions src/sagemaker/hyperpod/cli/commands/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CFN
)
from sagemaker.hyperpod.cluster_management.hp_cluster_stack import HpClusterStack
from sagemaker.hyperpod.cli.commands.cluster_stack import get_newest_template_version
from sagemaker.hyperpod.cli.init_utils import (
generate_click_command,
save_config_yaml,
Expand Down Expand Up @@ -375,6 +376,10 @@ def _default_create(region, template_version):
# Pass region to to_domain for cluster stack template
if template == "cluster-stack":
config = template_model.to_config(region=region)
# Use newest template version if not provided
if template_version is None:
template_version = get_newest_template_version()
click.secho(f"No template version specified, using newest version: {template_version}", fg="yellow")
HpClusterStack(**config).create(region, template_version)
else:
# Create from k8s.yaml
Expand Down
104 changes: 104 additions & 0 deletions test/unit_tests/training/test_pytorch_job_template_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import unittest
from hyperpod_pytorch_job_template.v1_1.model import PyTorchJobConfig


class TestPyTorchJobConfigEFA(unittest.TestCase):
"""Test EFA resource allocation in PyTorchJobConfig"""

def test_single_node_no_efa(self):
"""Test that single-node jobs don't get EFA resources"""
config = PyTorchJobConfig(
job_name="test-single-node",
image="pytorch:latest",
node_count=1,
accelerators=2,
instance_type="ml.p4d.24xlarge"
)

job = config.to_domain()
container = job.replicaSpecs[0].template.spec.containers[0]

# Should not have EFA resources
self.assertNotIn("vpc.amazonaws.com/efa", container.resources.requests)
self.assertNotIn("vpc.amazonaws.com/efa", container.resources.limits)

# Should have GPU resources
self.assertEqual(container.resources.requests["nvidia.com/gpu"], "2")

def test_multi_node_with_efa(self):
"""Test that multi-node jobs automatically get EFA resources"""
config = PyTorchJobConfig(
job_name="test-multi-node",
image="pytorch:latest",
node_count=4,
accelerators=8,
instance_type="ml.p4d.24xlarge"
)

job = config.to_domain()
container = job.replicaSpecs[0].template.spec.containers[0]

# Should have EFA resources
self.assertEqual(container.resources.requests["vpc.amazonaws.com/efa"], "1")
self.assertEqual(container.resources.limits["vpc.amazonaws.com/efa"], "1")

# Should also have GPU resources
self.assertEqual(container.resources.requests["nvidia.com/gpu"], "8")

def test_no_node_count_no_efa(self):
"""Test that jobs without node_count don't get EFA resources"""
config = PyTorchJobConfig(
job_name="test-no-node-count",
image="pytorch:latest",
accelerators=1,
instance_type="ml.g5.xlarge"
)

job = config.to_domain()
container = job.replicaSpecs[0].template.spec.containers[0]

# Should not have EFA resources
self.assertNotIn("vpc.amazonaws.com/efa", container.resources.requests)
self.assertNotIn("vpc.amazonaws.com/efa", container.resources.limits)

def test_multi_node_with_memory_and_cpu(self):
"""Test EFA with other resource types"""
config = PyTorchJobConfig(
job_name="test-multi-resources",
image="pytorch:latest",
node_count=2,
accelerators=4,
vcpu=16.0,
memory=64.0,
instance_type="ml.p4d.24xlarge"
)

job = config.to_domain()
container = job.replicaSpecs[0].template.spec.containers[0]

# Should have all resources including EFA
self.assertEqual(container.resources.requests["vpc.amazonaws.com/efa"], "1")
self.assertEqual(container.resources.requests["nvidia.com/gpu"], "4")
self.assertEqual(container.resources.requests["cpu"], "16.0")
self.assertEqual(container.resources.requests["memory"], "64.0Gi")

def test_accelerators_without_instance_type(self):
"""Test that accelerators work without instance_type (fixes the main issue)"""
config = PyTorchJobConfig(
job_name="test-no-instance-type",
image="pytorch:latest",
accelerators=4
# No instance_type specified
)

job = config.to_domain()
container = job.replicaSpecs[0].template.spec.containers[0]

# Should respect accelerators value even without instance_type
self.assertEqual(container.resources.requests["nvidia.com/gpu"], "4")
# Limits should default to "0" since accelerators_limit not specified
self.assertEqual(container.resources.limits["nvidia.com/gpu"], "0")


if __name__ == '__main__':
unittest.main()