From 60492128dc04f12adeba9efa691a5b1167128134 Mon Sep 17 00:00:00 2001 From: Molly He Date: Wed, 19 Nov 2025 19:55:56 -0800 Subject: [PATCH] set template-version flag to optional for cluster create, add support for efa for pytorch job, remove default request and limits when instance type is none --- .../v1_1/model.py | 44 +++++--- .../v1_1/template.py | 10 +- .../hyperpod/cli/commands/cluster_stack.py | 19 +++- src/sagemaker/hyperpod/cli/commands/init.py | 5 + .../test_pytorch_job_template_model.py | 104 ++++++++++++++++++ 5 files changed, 166 insertions(+), 16 deletions(-) create mode 100644 test/unit_tests/training/test_pytorch_job_template_model.py diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py index abfe0f53..01cf8075 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py +++ b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py @@ -333,19 +333,37 @@ def build_dict(**kwargs): return {k: v for k, v in kwargs.items() if v is not None} # Build resources - if self.instance_type is None: - requests_value = limits_value = {"nvidia.com/gpu": "0"} - else: - requests_value = build_dict( - accelerators=str(self.accelerators) if self.accelerators else None, - vcpu=str(self.vcpu) if self.vcpu else None, - memory=str(self.memory) if self.memory else None - ) - limits_value = build_dict( - accelerators=str(self.accelerators_limit) if self.accelerators_limit else None, - vcpu=str(self.vcpu_limit) if self.vcpu_limit else None, - memory=str(self.memory_limit) if self.memory_limit else None - ) + requests_value = {} + limits_value = {} + + # Add GPU resources (respect accelerators regardless of instance_type) + if self.accelerators: + requests_value["nvidia.com/gpu"] = str(self.accelerators) + if self.accelerators_limit: + limits_value["nvidia.com/gpu"] = str(self.accelerators_limit) + + # Add CPU resources + if self.vcpu: + requests_value["cpu"] = str(self.vcpu) + if self.vcpu_limit: + limits_value["cpu"] = str(self.vcpu_limit) + + # Add memory resources + if self.memory: + requests_value["memory"] = f"{self.memory}Gi" + if self.memory_limit: + limits_value["memory"] = f"{self.memory_limit}Gi" + + # Add EFA for multi-node jobs + if self.node_count and self.node_count > 1: + requests_value["vpc.amazonaws.com/efa"] = "1" + limits_value["vpc.amazonaws.com/efa"] = "1" + + # Set default GPU to "0" only if no resources specified at all + if not requests_value: + requests_value = {"nvidia.com/gpu": "0"} + if not limits_value: + limits_value = {"nvidia.com/gpu": "0"} # Build container container_kwargs = build_dict( diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/template.py b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/template.py index 4348d6cc..98b55475 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/template.py +++ b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/template.py @@ -84,7 +84,7 @@ {%- endfor %} {%- endif %} resources: -{%- if accelerators or vcpu or memory %} +{%- if accelerators or vcpu or memory or (node_count and node_count > 1) %} requests: {%- if accelerators %} nvidia.com/gpu: {{ accelerators }} @@ -95,11 +95,14 @@ {%- if memory %} memory: {{ memory }}Gi {%- endif %} +{%- if (node_count and node_count > 1) %} + vpc.amazonaws.com/efa: 1 +{%- endif %} {%- else %} requests: nvidia.com/gpu: "0" {%- endif %} -{%- if accelerators_limit or vcpu_limit or memory_limit %} +{%- if accelerators_limit or vcpu_limit or memory_limit or (node_count and node_count > 1) %} limits: {%- if accelerators_limit %} nvidia.com/gpu: {{ accelerators_limit }} @@ -110,6 +113,9 @@ {%- if memory_limit %} memory: {{ memory_limit }}Gi {%- endif %} +{%- if (node_count and node_count > 1) %} + vpc.amazonaws.com/efa: 1 +{%- endif %} {%- else %} limits: nvidia.com/gpu: "0" diff --git a/src/sagemaker/hyperpod/cli/commands/cluster_stack.py b/src/sagemaker/hyperpod/cli/commands/cluster_stack.py index 2a278086..f53d544e 100644 --- a/src/sagemaker/hyperpod/cli/commands/cluster_stack.py +++ b/src/sagemaker/hyperpod/cli/commands/cluster_stack.py @@ -30,6 +30,19 @@ logger = logging.getLogger(__name__) +def get_newest_template_version() -> int: + """Get the newest available template version. + + Returns: + int: The newest template version number + + TODO: Implement logic to fetch the actual newest template version + from the template registry or remote source. + """ + # Placeholder implementation - currently returns 1 as the latest version + return 1 + + def parse_status_list(ctx, param, value): """Parse status list from string format like "['CREATE_COMPLETE', 'UPDATE_COMPLETE']" """ if not value: @@ -79,7 +92,6 @@ def create_cluster_stack(config_file, region, template_version, debug): return # Load config to get template and version - config_dir = Path(config_file).parent data, template, version = load_config(config_dir) @@ -95,6 +107,11 @@ def create_cluster_stack(config_file, region, template_version, debug): model_instance = model_class(**filtered_config) config = model_instance.to_config(region=region) + # Use newest template version if not provided + if template_version is None: + template_version = get_newest_template_version() + logger.info(f"No template version specified, using newest version: {template_version}") + # Create the cluster stack stack_id = HpClusterStack(**config).create(region, template_version) diff --git a/src/sagemaker/hyperpod/cli/commands/init.py b/src/sagemaker/hyperpod/cli/commands/init.py index 66ce7068..eb445d47 100644 --- a/src/sagemaker/hyperpod/cli/commands/init.py +++ b/src/sagemaker/hyperpod/cli/commands/init.py @@ -11,6 +11,7 @@ CFN ) from sagemaker.hyperpod.cluster_management.hp_cluster_stack import HpClusterStack +from sagemaker.hyperpod.cli.commands.cluster_stack import get_newest_template_version from sagemaker.hyperpod.cli.init_utils import ( generate_click_command, save_config_yaml, @@ -375,6 +376,10 @@ def _default_create(region, template_version): # Pass region to to_domain for cluster stack template if template == "cluster-stack": config = template_model.to_config(region=region) + # Use newest template version if not provided + if template_version is None: + template_version = get_newest_template_version() + click.secho(f"No template version specified, using newest version: {template_version}", fg="yellow") HpClusterStack(**config).create(region, template_version) else: # Create from k8s.yaml diff --git a/test/unit_tests/training/test_pytorch_job_template_model.py b/test/unit_tests/training/test_pytorch_job_template_model.py new file mode 100644 index 00000000..b7a3490e --- /dev/null +++ b/test/unit_tests/training/test_pytorch_job_template_model.py @@ -0,0 +1,104 @@ +import unittest +from hyperpod_pytorch_job_template.v1_1.model import PyTorchJobConfig + + +class TestPyTorchJobConfigEFA(unittest.TestCase): + """Test EFA resource allocation in PyTorchJobConfig""" + + def test_single_node_no_efa(self): + """Test that single-node jobs don't get EFA resources""" + config = PyTorchJobConfig( + job_name="test-single-node", + image="pytorch:latest", + node_count=1, + accelerators=2, + instance_type="ml.p4d.24xlarge" + ) + + job = config.to_domain() + container = job.replicaSpecs[0].template.spec.containers[0] + + # Should not have EFA resources + self.assertNotIn("vpc.amazonaws.com/efa", container.resources.requests) + self.assertNotIn("vpc.amazonaws.com/efa", container.resources.limits) + + # Should have GPU resources + self.assertEqual(container.resources.requests["nvidia.com/gpu"], "2") + + def test_multi_node_with_efa(self): + """Test that multi-node jobs automatically get EFA resources""" + config = PyTorchJobConfig( + job_name="test-multi-node", + image="pytorch:latest", + node_count=4, + accelerators=8, + instance_type="ml.p4d.24xlarge" + ) + + job = config.to_domain() + container = job.replicaSpecs[0].template.spec.containers[0] + + # Should have EFA resources + self.assertEqual(container.resources.requests["vpc.amazonaws.com/efa"], "1") + self.assertEqual(container.resources.limits["vpc.amazonaws.com/efa"], "1") + + # Should also have GPU resources + self.assertEqual(container.resources.requests["nvidia.com/gpu"], "8") + + def test_no_node_count_no_efa(self): + """Test that jobs without node_count don't get EFA resources""" + config = PyTorchJobConfig( + job_name="test-no-node-count", + image="pytorch:latest", + accelerators=1, + instance_type="ml.g5.xlarge" + ) + + job = config.to_domain() + container = job.replicaSpecs[0].template.spec.containers[0] + + # Should not have EFA resources + self.assertNotIn("vpc.amazonaws.com/efa", container.resources.requests) + self.assertNotIn("vpc.amazonaws.com/efa", container.resources.limits) + + def test_multi_node_with_memory_and_cpu(self): + """Test EFA with other resource types""" + config = PyTorchJobConfig( + job_name="test-multi-resources", + image="pytorch:latest", + node_count=2, + accelerators=4, + vcpu=16.0, + memory=64.0, + instance_type="ml.p4d.24xlarge" + ) + + job = config.to_domain() + container = job.replicaSpecs[0].template.spec.containers[0] + + # Should have all resources including EFA + self.assertEqual(container.resources.requests["vpc.amazonaws.com/efa"], "1") + self.assertEqual(container.resources.requests["nvidia.com/gpu"], "4") + self.assertEqual(container.resources.requests["cpu"], "16.0") + self.assertEqual(container.resources.requests["memory"], "64.0Gi") + + def test_accelerators_without_instance_type(self): + """Test that accelerators work without instance_type (fixes the main issue)""" + config = PyTorchJobConfig( + job_name="test-no-instance-type", + image="pytorch:latest", + accelerators=4 + # No instance_type specified + ) + + job = config.to_domain() + container = job.replicaSpecs[0].template.spec.containers[0] + + # Should respect accelerators value even without instance_type + self.assertEqual(container.resources.requests["nvidia.com/gpu"], "4") + # Limits should default to "0" since accelerators_limit not specified + self.assertEqual(container.resources.limits["nvidia.com/gpu"], "0") + + +if __name__ == '__main__': + unittest.main()