From 99dbe7be64dc502fc5624a969a2be0f8e82c33f8 Mon Sep 17 00:00:00 2001 From: Shantanu Tripathi Date: Thu, 20 Nov 2025 23:24:51 -0800 Subject: [PATCH 1/5] Upgrade Inference Operator Version (#327) --- helm_chart/HyperPodHelmChart/Chart.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helm_chart/HyperPodHelmChart/Chart.yaml b/helm_chart/HyperPodHelmChart/Chart.yaml index 4ef3687f..366bff97 100644 --- a/helm_chart/HyperPodHelmChart/Chart.yaml +++ b/helm_chart/HyperPodHelmChart/Chart.yaml @@ -81,7 +81,7 @@ dependencies: repository: "file://charts/team-role-and-bindings" condition: team-role-and-bindings.enabled - name: hyperpod-inference-operator - version: "1.0.0" + version: "1.1.0" repository: "file://charts/inference-operator" condition: inferenceOperators.enabled - name: hyperpod-patching From 4db9168667238cb2e81070d863c1cfbf5305d3b0 Mon Sep 17 00:00:00 2001 From: Mohamed Zeidan <81834882+mohamedzeidan2021@users.noreply.github.com> Date: Fri, 21 Nov 2025 01:03:40 -0800 Subject: [PATCH 2/5] pyproj version update (#328) Co-authored-by: Mohamed Zeidan --- hyperpod-custom-inference-template/pyproject.toml | 2 +- hyperpod-jumpstart-inference-template/pyproject.toml | 2 +- hyperpod-pytorch-job-template/pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hyperpod-custom-inference-template/pyproject.toml b/hyperpod-custom-inference-template/pyproject.toml index 002ca0d0..88a983d3 100644 --- a/hyperpod-custom-inference-template/pyproject.toml +++ b/hyperpod-custom-inference-template/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hyperpod-custom-inference-template" -version = "1.1.0" +version = "1.2.0" readme = "README.md" authors = [{name = "Amazon Web Services"}] license = {text = "Apache-2.0"} diff --git a/hyperpod-jumpstart-inference-template/pyproject.toml b/hyperpod-jumpstart-inference-template/pyproject.toml index dd967719..800531fc 100644 --- a/hyperpod-jumpstart-inference-template/pyproject.toml +++ b/hyperpod-jumpstart-inference-template/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hyperpod-jumpstart-inference-template" -version = "1.0.3" +version = "1.1.0" readme = "README.md" authors = [{name = "Amazon Web Services"}] license = {text = "Apache-2.0"} diff --git a/hyperpod-pytorch-job-template/pyproject.toml b/hyperpod-pytorch-job-template/pyproject.toml index 5ba9acc4..9e8b45fc 100644 --- a/hyperpod-pytorch-job-template/pyproject.toml +++ b/hyperpod-pytorch-job-template/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hyperpod-pytorch-job-template" -version = "1.1.4" +version = "1.2.0" readme = "README.md" authors = [{name = "Amazon Web Services"}] license = {text = "Apache-2.0"} From e37b011e9c729372f51da5b01b4363174035c40d Mon Sep 17 00:00:00 2001 From: Mohamed Zeidan <81834882+mohamedzeidan2021@users.noreply.github.com> Date: Fri, 21 Nov 2025 01:10:05 -0800 Subject: [PATCH 3/5] version change (#329) Co-authored-by: Mohamed Zeidan --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 90a68f0a..8e4308e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] dynamic = ["dependencies"] name = "sagemaker-hyperpod" -version = "3.3.1" +version = "3.4.0" description = "Amazon SageMaker HyperPod SDK and CLI" readme = "README.md" requires-python = ">=3.8" From ef6b9a5ef487b14b0b0310e7e8ca3a66c183b06a Mon Sep 17 00:00:00 2001 From: Mohamed Zeidan <81834882+mohamedzeidan2021@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:50:23 -0800 Subject: [PATCH 4/5] elastic training to keynote3 (#307) * feat: Implement elastic training cli arguments (#273) * feat: Implement elastic training cli arguments * Add elastic training unified config and unit test * Add graceful shutdown and scaling timeout to cli args * Revert "feat: Implement elastic training cli arguments (#273)" This reverts commit 18428ef2b1c0562bf51a9a4b4aa2914eed441259. * feat: Implement elastic training cli arguments (#295) * feat: implement elastic training cli args * Rename args name to match crd for elastic training * Add unit test for replcia discrete values * Add integ test for elastic training cli --------- Co-authored-by: Sophia Co-authored-by: Molly He Co-authored-by: Mohamed Zeidan --- .../v1_1/model.py | 124 +++++++++- .../v1_1/schema.json | 87 +++++++ src/sagemaker/hyperpod/cli/training_utils.py | 15 +- .../hyperpod_pytorch_job_unified_config.py | 150 ++++++++++++ test/conftest.py | 10 + .../training/cli/test_cli_elastic_training.py | 229 ++++++++++++++++++ test/unit_tests/cli/test_training.py | 116 +++++++++ 7 files changed, 726 insertions(+), 5 deletions(-) create mode 100644 test/integration_tests/training/cli/test_cli_elastic_training.py diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py index 9011c44e..714081b5 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py +++ b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/model.py @@ -11,7 +11,8 @@ Metadata, Volumes, HostPath, - PersistentVolumeClaim + PersistentVolumeClaim, + ElasticPolicy ) from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob import yaml @@ -239,6 +240,38 @@ class PyTorchJobConfig(BaseModel): alias="required_topology", description="Required topology annotation for scheduling", ) + elastic_replica_increment_step: Optional[int] = Field( + default=None, + alias="elastic_replica_increment_step", + description="Scaling step size for elastic training", + ge=1, + ) + max_node_count: Optional[int] = Field( + default=None, + alias="max_node_count", + description="Maximum number of nodes for elastic training", + ge=1, + ) + elastic_graceful_shutdown_timeout_in_seconds: Optional[int] = Field( + default=None, + alias="elastic_graceful_shutdown_timeout_in_seconds", + description="Graceful shutdown timeout in seconds for elastic scaling operations" + ) + elastic_scaling_timeout_in_seconds: Optional[int] = Field( + default=None, + alias="elastic_scaling_timeout_in_seconds", + description="Scaling timeout for elastic training" + ) + elastic_scale_up_snooze_time_in_seconds: Optional[int] = Field( + default=None, + alias="elastic_scale_up_snooze_time_in_seconds", + description="Timeout period after job restart during which no scale up/workload admission is allowed" + ) + elastic_replica_discrete_values: Optional[List[int]] = Field( + default=None, + alias="elastic_replica_discrete_values", + description="Alternative to replica increment step. Provides exact values for total replicas count" + ) @field_validator('tasks_per_node', mode='before') @classmethod @@ -363,6 +396,45 @@ def validate_accelerator_partition_options(self): ) if not valid: raise ValueError(error) + + return self + + @model_validator(mode='after') + def validate_elastic_replica_config(self): + """Validate elastic replica configuration.""" + has_increment_step = self.elastic_replica_increment_step is not None + has_discrete_values = self.elastic_replica_discrete_values is not None + + # Check mutual exclusivity + if has_increment_step and has_discrete_values: + raise ValueError( + "Only one of 'elastic_replica_increment_step' or 'elastic_replica_discrete_values' " + "can be specified, not both. Please use either:\n" + " - elastic_replica_increment_step for uniform scaling steps, or\n" + " - elastic_replica_discrete_values for specific replica counts" + ) + + # Validate discrete values are within valid range + if has_discrete_values: + discrete_values = self.elastic_replica_discrete_values + + # Check that all values are positive + if any(val <= 0 for val in discrete_values): + raise ValueError( + f"All values in 'elastic_replica_discrete_values' must be positive integers. " + f"Got: {discrete_values}" + ) + + # Check against max_node_count if specified + if self.max_node_count is not None: + invalid_values = [val for val in discrete_values if val > self.max_node_count] + if invalid_values: + raise ValueError( + f"All values in 'elastic_replica_discrete_values' must be ≤ max_node_count ({self.max_node_count}). " + f"Invalid values: {invalid_values}. " + f"Please either increase max_node_count or remove values exceeding it." + ) + return self def to_domain(self) -> Dict: @@ -467,15 +539,61 @@ def build_dict(**kwargs): replica_kwargs = build_dict( name="pod", template=Template(metadata=Metadata(**metadata_kwargs), spec=Spec(**spec_kwargs)), - replicas=self.node_count + replicas=self.node_count, + max_replicas=self.max_node_count ) + # Build elastic policy + elastic_policy = None + if any([ + self.elastic_replica_increment_step is not None, + self.max_node_count is not None, + self.elastic_graceful_shutdown_timeout_in_seconds is not None, + self.elastic_scaling_timeout_in_seconds is not None, + self.elastic_replica_discrete_values is not None + ]): + # Build base elastic policy kwargs + elastic_policy_kwargs = build_dict( + min_replicas=self.node_count, + max_replicas=self.max_node_count, + graceful_shutdown_timeout_in_seconds=self.elastic_graceful_shutdown_timeout_in_seconds, + scaling_timeout_in_seconds=self.elastic_scaling_timeout_in_seconds + ) + + if self.elastic_replica_discrete_values is not None: + elastic_policy_kwargs['replica_discrete_values'] = self.elastic_replica_discrete_values + elif self.elastic_replica_increment_step is not None: + elastic_policy_kwargs['replica_increment_step'] = self.elastic_replica_increment_step + + elastic_policy = ElasticPolicy(**elastic_policy_kwargs) + + # Build run policy + run_policy = None + if self.max_retry is not None or self.elastic_scale_up_snooze_time_in_seconds is not None: + from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import RestartPolicy + + run_policy_kwargs = build_dict( + clean_pod_policy="None", + job_max_retry_count=self.max_retry + ) + + # Add restart policy if scale_up_snooze_interval is provided + if self.elastic_scale_up_snooze_time_in_seconds is not None: + restart_policy = RestartPolicy( + eval_period_seconds=3600, + scale_up_snooze_time_in_seconds=self.elastic_scale_up_snooze_time_in_seconds + ) + run_policy_kwargs['restart_policy'] = restart_policy + + run_policy = RunPolicy(**run_policy_kwargs) + # Build job job_kwargs = build_dict( metadata=metadata_kwargs, replica_specs=[ReplicaSpec(**replica_kwargs)], nproc_per_node=str(self.tasks_per_node) if self.tasks_per_node else None, - run_policy=RunPolicy(clean_pod_policy="None", job_max_retry_count=self.max_retry) if self.max_retry else None + run_policy=run_policy, + elastic_policy=elastic_policy ) result = HyperPodPytorchJob(**job_kwargs) diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json index f6dc79ac..83bd6120 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json +++ b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json @@ -395,7 +395,94 @@ "type": "string", "description": "Required topology annotation for scheduling", "$ref": "#/$defs/topologyLabels" + }, + "elastic_replica_increment_step": { + "anyOf": [ + { + "minimum": 1, + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Scaling step size for elastic training", + "title": "Elastic Training Replica Increment Step" + }, + "max_node_count": { + "anyOf": [ + { + "minimum": 1, + "type": "integer" + }, + { + "type": "null" } + ], + "default": null, + "description": "Maximum number of nodes for elastic training", + "title": "Max Node Count" + }, + "elastic_graceful_shutdown_timeout_in_seconds": { + "anyOf": [ + { + "minimum": 0, + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Graceful shutdown timeout in seconds for elastic scaling operations", + "title": "Elastic Graceful Shutdown Timeout In Seconds" + }, + "elastic_scaling_timeout_in_seconds": { + "anyOf": [ + { + "minimum": 0, + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Scaling timeout for elastic training", + "title": "Elastic Scaling Timeout In Seconds" + }, + "elastic_scale_up_snooze_time_in_seconds": { + "anyOf": [ + { + "minimum": 0, + "type": "integer" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Timeout period after job restart during which no scale up/workload admission is allowed", + "title": "Elastic Scale Up Snooze Time In Seconds" + }, + "elastic_replica_discrete_values": { + "anyOf": [ + { + "items": { + "type": "integer" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "default": null, + "description": "Alternative to replica increment step. Provides exact values for total replicas count", + "title": "Elastic Replica Discrete Values" + } + }, "required": [ "job_name", diff --git a/src/sagemaker/hyperpod/cli/training_utils.py b/src/sagemaker/hyperpod/cli/training_utils.py index 290ab5f1..c5ba3667 100644 --- a/src/sagemaker/hyperpod/cli/training_utils.py +++ b/src/sagemaker/hyperpod/cli/training_utils.py @@ -42,7 +42,16 @@ def _parse_list_flag(ctx, param, value): return None # Remove brackets and split by comma value = value.strip("[]") - return [item.strip() for item in value.split(",") if item.strip()] + items = [item.strip() for item in value.split(",") if item.strip()] + + # Convert to integers for elastic_replica_discrete_values + if param and hasattr(param, 'name') and param.name == 'elastic_replica_discrete_values': + try: + return [int(item) for item in items] + except ValueError as e: + raise click.BadParameter(f"elastic-replica-discrete-values must contain only integers: {e}") + + return items def _parse_volume_param(ctx, param, value): """Parse volume parameters from command line format to dictionary format.""" @@ -134,11 +143,12 @@ def wrapped_func(*args, **kwargs): list_params = { "command": "List of command arguments", "args": "List of script arguments, e.g. '[--batch-size, 32, --learning-rate, 0.001]'", + "elastic_replica_discrete_values": "List of discrete replica values for elastic training, e.g. '[2, 4, 8, 16]'", } for param_name, help_text in list_params.items(): wrapped_func = click.option( - f"--{param_name}", + f"--{param_name.replace('_', '-')}", callback=_parse_list_flag, type=str, default=None, @@ -154,6 +164,7 @@ def wrapped_func(*args, **kwargs): "command", "args", "volume", + "elastic_replica_discrete_values" ] ) diff --git a/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py b/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py index a7855ef5..88b8d84b 100644 --- a/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py +++ b/src/sagemaker/hyperpod/training/config/hyperpod_pytorch_job_unified_config.py @@ -2982,6 +2982,11 @@ class ReplicaSpec(BaseModel): default=0, description="Replicas is the desired number of replicas of the given template.", ) + maxReplicas: Optional[int] = Field( + default=None, + alias="max_replicas", + description="Maximum replicas for elastic training" + ) spares: Optional[int] = Field( default=0, description="Spares requests spare resources from Kueue. E.g. If a job is configured with 4 replicas and 2 spares, job requests resources required to run 6 pods such as cpu, gpu", @@ -2991,6 +2996,47 @@ class ReplicaSpec(BaseModel): description="Template is the object that describes the pod that will be created for this replica.", ) +class ElasticPolicy(BaseModel): + """ElasticPolicy defines the elastic training policy""" + + model_config = ConfigDict(extra="forbid") + + minReplicas: Optional[int] = Field( + default=None, + alias="min_replicas", + description="Minimum number of replicas" + ) + maxReplicas: Optional[int] = Field( + default=None, + alias="max_replicas", + description="Maximum number of replicas" + ) + replicaIncrementStep: Optional[int] = Field( + default=None, + alias="replica_increment_step", + description="Step size for elastic replica scaling" + ) + replicaDiscreteValues: Optional[List[int]] = Field( + default=None, + alias="replica_discrete_values", + description="Alternative to ReplicaIncrementStep. Provides exact values for total replicas count" + ) + scalingTimeoutInSeconds: Optional[int] = Field( + default=None, + alias="scaling_timeout_in_seconds", + description="Timeout for scaling operations" + ) + gracefulShutdownTimeoutInSeconds: Optional[int] = Field( + default=None, + alias="graceful_shutdown_timeout_in_seconds", + description="Graceful shutdown timeout in seconds for elastic scaling operations" + ) + faultyScaleDownTimeoutInSeconds: Optional[int] = Field( + default=None, + alias="faulty_scale_down_timeout_in_seconds", + description="Timeout in seconds after entering Faulted state before triggering faulty pod scale-down" + ) + class LogMonitoringConfiguration(BaseModel): """LogMonitoringRule defines the criteria used to detect a SLOW or HANGING job""" @@ -3052,6 +3098,16 @@ class RestartPolicy(BaseModel): alias="num_restart_before_full_job_restart", description="The number of standard restarts before a full job restart", ) + maxNumRepeatOffendersToAvoid: Optional[int] = Field( + default=None, + alias="max_num_repeat_offenders_to_avoid", + description="The max repeat offenders to exclude in next job level restart", + ) + scaleUpSnoozeTimeInSeconds: Optional[int] = Field( + default=0, + alias="scale_up_snooze_time_in_seconds", + description="Timeout period after job restart during which no scale up/workload admission is allowed", + ) class RunPolicy(BaseModel): @@ -3098,6 +3154,11 @@ class RunPolicy(BaseModel): alias="ttl_seconds_after_finished", description="TTLSecondsAfterFinished is the TTL to clean up jobs. Set to -1 for infinite", ) + workloadMode: Optional[str] = Field( + default=None, + alias="workload_mode", + description="Workload deployment mode for elastic training (e.g., 'Deployment')", + ) class PodSets(BaseModel): @@ -3153,6 +3214,48 @@ class Pods(BaseModel): ) +class ElasticScalingStatus(BaseModel): + """ElasticScalingStatus represents the current state of elastic scaling operations""" + + model_config = ConfigDict(extra="forbid") + + targetReplicas: Optional[Dict[str, int]] = Field( + default=None, + alias="target_replicas", + description="TargetReplicas contains the desired replica counts per ReplicaSpec name", + ) + lastUpdated: Optional[str] = Field( + default=None, + alias="last_updated", + description="LastUpdated is the timestamp when this status was last modified", + ) + lastScalingTime: Optional[str] = Field( + default=None, + alias="last_scaling_time", + description="LastScalingTime tracks when the last scaling operation completed", + ) + lastRestartTime: Optional[str] = Field( + default=None, + alias="last_restart_time", + description="LastRestartTime tracks when the job was last restarted for scaleUpRestartTimeout", + ) + podsScaled: Optional[bool] = Field( + default=None, + alias="pods_scaled", + description="PodsScaled indicates whether pods have already been scaled in this scaling round", + ) + isFaultyPodScaleDown: Optional[bool] = Field( + default=None, + alias="is_faulty_pod_scale_down", + description="IsFaultyPodScaleDown indicates this scaling operation is removing faulty pods", + ) + consecutiveScalingFailures: Optional[int] = Field( + default=None, + alias="consecutive_scaling_failures", + description="ConsecutiveScalingFailures tracks the number of consecutive elastic scaling failures", + ) + + class RestartStatus(BaseModel): """Additional restart limiting status""" @@ -3171,6 +3274,33 @@ class RestartStatus(BaseModel): ) +class FaultyPodInstanceList(BaseModel): + """FaultyPodInstanceRecord tracks faulty pod/instances for each restart""" + + model_config = ConfigDict(extra="forbid") + + restartType: Optional[str] = Field( + default=None, + alias="restart_type", + description="RestartType indicates whether this was a PLR or JLR" + ) + faultyInstanceIdList: Optional[List[str]] = Field( + default_factory=list, + alias="faulty_instance_id_list", + description="FaultyInstanceIdList tracks faulty instance ids" + ) + faultyPodList: Optional[List[str]] = Field( + default_factory=list, + alias="faulty_pod_list", + description="FaultyPodList tracks faulty pod names" + ) + faultyRankList: Optional[List[str]] = Field( + default_factory=list, + alias="faulty_rank_list", + description="FaultyRankList tracks faulty pod ranks" + ) + + class HyperPodPytorchJobStatus(BaseModel): """HyperPodPytorchJobStatus defines the observed state of HyperPodPytorchJob""" @@ -3187,6 +3317,11 @@ class HyperPodPytorchJobStatus(BaseModel): alias="job_pods", description="The StatefulSet containing the training pods", ) + latestFaultyPodInstanceList: Optional[FaultyPodInstanceList] = Field( + default=None, + alias="latest_faulty_pod_instance_list", + description="LatestFaultyPodInstanceList tracks faulty pods/nodes of latest restart" + ) managerPods: Optional[ManagerPods] = Field( default=None, alias="manager_pods", description="Pod Manager pods" ) @@ -3221,6 +3356,16 @@ class HyperPodPytorchJobStatus(BaseModel): alias="restart_status", description="Additional restart limiting status", ) + elasticScalingStatus: Optional[ElasticScalingStatus] = Field( + default=None, + alias="elastic_scaling_status", + description="ElasticScalingStatus contains the current state of elastic scaling operations", + ) + elasticWorkloadRef: Optional[Dict[str, str]] = Field( + default=None, + alias="elastic_workload_ref", + description="Reference to associated ElasticWorkload (optional, only set when ElasticPolicy is present)", + ) startTime: Optional[str] = Field( default=None, alias="start_time", @@ -3245,4 +3390,9 @@ class _HyperPodPytorchJob(BaseModel): ) runPolicy: Optional[RunPolicy] = Field( default=None, alias="run_policy", description="RunPolicy" + ) + elasticPolicy: Optional[ElasticPolicy] = Field( + default=None, + alias="elastic_policy", + description="ElasticPolicy for elastic training" ) \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py index 8ec0a320..002fe3a5 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -46,6 +46,16 @@ def test_job_name(): """Generate a unique job name for testing.""" return f"test-pytorch-job-{str(uuid.uuid4())[:8]}" +@pytest.fixture(scope="class") +def test_elastic_job_name_increment(): + """Generate a unique job name for elastic training with increment step.""" + return f"test-increment-{str(uuid.uuid4())[:8]}" + +@pytest.fixture(scope="class") +def test_elastic_job_name_discrete(): + """Generate a unique job name for elastic training with discrete values.""" + return f"test-discrete-{str(uuid.uuid4())[:8]}" + @pytest.fixture(scope="class") def image_uri(): """Return a standard PyTorch image URI for testing.""" diff --git a/test/integration_tests/training/cli/test_cli_elastic_training.py b/test/integration_tests/training/cli/test_cli_elastic_training.py new file mode 100644 index 00000000..2943c630 --- /dev/null +++ b/test/integration_tests/training/cli/test_cli_elastic_training.py @@ -0,0 +1,229 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import pytest +import time + +from sagemaker.hyperpod.cli.utils import setup_logger +from test.integration_tests.utils import execute_command + +logger = setup_logger(__name__) + + +# Fixture to provide test parameters for both elastic training scenarios +@pytest.fixture(params=[ + { + "job_name_fixture": "test_elastic_job_name_increment", + "create_args": [ + "--elastic-replica-increment-step", "2", + "--max-node-count", "4", + "--elastic-graceful-shutdown-timeout-in-seconds", "180", + "--elastic-scaling-timeout-in-seconds", "90", + "--elastic-scale-up-snooze-time-in-seconds", "120" + ], + "scenario": "increment_step" + }, + { + "job_name_fixture": "test_elastic_job_name_discrete", + "create_args": [ + "--elastic-replica-discrete-values", "[2, 4, 8]", + "--max-node-count", "8", + "--elastic-graceful-shutdown-timeout-in-seconds", "180", + "--elastic-scaling-timeout-in-seconds", "90" + ], + "scenario": "discrete_values" + } +], ids=["increment_step", "discrete_values"]) +def elastic_job_params(request): + """Fixture providing parameters for both elastic training scenarios.""" + return request.param + + +class TestElasticTrainingCLI: + """Integration tests for HyperPod CLI elastic training using hyp commands.""" + + def test_list_clusters(self, cluster_name): + """Test listing clusters""" + assert cluster_name + + def test_get_cluster_context(self): + """Test getting current cluster context.""" + result = execute_command(["hyp", "get-cluster-context"]) + assert result.returncode == 0 + + context_output = result.stdout.strip() + assert "Cluster context:" in context_output + # Just verify we got a valid ARN without checking the specific name + current_arn = context_output.split("Cluster context:")[-1].strip() + assert "arn:aws:eks:" in current_arn + + def test_create_elastic_job(self, elastic_job_params, image_uri, request): + """Test creating an elastic PyTorch job with different configurations.""" + # Get the job name from the fixture + job_name = request.getfixturevalue(elastic_job_params["job_name_fixture"]) + + # Build the command with common and scenario-specific args + command = [ + "hyp", "create", "hyp-pytorch-job", + "--version", "1.1", + "--job-name", job_name, + "--image", image_uri, + "--pull-policy", "Always", + "--tasks-per-node", "1", + "--max-retry", "1" + ] + command.extend(elastic_job_params["create_args"]) + + result = execute_command(command) + assert result.returncode == 0 + logger.info(f"Created elastic job ({elastic_job_params['scenario']}): {job_name}") + + # Wait a moment for the job to be created + time.sleep(5) + + def test_list_jobs(self, elastic_job_params, request): + """Test listing jobs and verifying the elastic job is present with a valid status.""" + job_name = request.getfixturevalue(elastic_job_params["job_name_fixture"]) + + list_result = execute_command(["hyp", "list", "hyp-pytorch-job"]) + assert list_result.returncode == 0 + + # Check if the job name is in the output + assert job_name in list_result.stdout + + # Check that the job status is not Unknown + output_lines = list_result.stdout.strip().split('\n') + job_status = None + for line in output_lines: + if job_name in line: + # Extract the status from the line (assuming format: NAME NAMESPACE STATUS AGE) + parts = line.split() + if len(parts) >= 3: + job_status = parts[2].strip() + break + + # Verify job status is not Unknown + assert job_status is not None, f"Could not find status for job {job_name}" + assert job_status != "Unknown", f"Job {job_name} has Unknown status, which indicates a potential issue" + + logger.info(f"Successfully listed jobs. Job {job_name} ({elastic_job_params['scenario']}) has status: {job_status}") + + def test_wait_for_job_running(self, elastic_job_params, request): + """Test that the elastic job transitions to Running state.""" + job_name = request.getfixturevalue(elastic_job_params["job_name_fixture"]) + + max_attempts = 10 + for attempt in range(1, max_attempts + 1): + logger.info(f"Checking elastic job status (attempt {attempt}/{max_attempts})...") + + list_result = execute_command(["hyp", "list", "hyp-pytorch-job"]) + assert list_result.returncode == 0 + + output_lines = list_result.stdout.strip().split('\n') + job_status = None + for line in output_lines: + if job_name in line: + parts = line.split() + if len(parts) >= 3: + job_status = parts[2].strip() + break + + logger.info(f"Current elastic job status: {job_status}") + + if job_status == "Unknown": + pytest.fail(f"Job {job_name} has Unknown status") + + if job_status in ["Running", "Completed"]: + logger.info(f"Elastic job {job_name} ({elastic_job_params['scenario']}) is now in {job_status} state") + return + + logger.info(f"Elastic job {job_name} is in {job_status} state, waiting...") + time.sleep(30) + + pytest.fail(f"Elastic job {job_name} did not reach Running or Completed state within timeout") + + def test_wait_for_job_completion(self, elastic_job_params, request): + """Test that the elastic job reaches Completed status.""" + job_name = request.getfixturevalue(elastic_job_params["job_name_fixture"]) + + max_attempts = 20 + for attempt in range(1, max_attempts + 1): + logger.info(f"Checking elastic job completion status (attempt {attempt}/{max_attempts})...") + + list_result = execute_command(["hyp", "list", "hyp-pytorch-job"]) + assert list_result.returncode == 0 + + output_lines = list_result.stdout.strip().split('\n') + job_status = None + for line in output_lines: + if job_name in line: + parts = line.split() + if len(parts) >= 3: + job_status = parts[2].strip() + break + + logger.info(f"Current elastic job status: {job_status}") + + if job_status == "Completed": + logger.info(f"Elastic job {job_name} ({elastic_job_params['scenario']}) has successfully completed") + return + + if job_status not in ["Running", "Completed"]: + pytest.fail(f"Elastic job {job_name} is in {job_status} state") + + logger.info(f"Elastic job {job_name} is still running, waiting...") + time.sleep(30) + + pytest.fail(f"Elastic job {job_name} did not reach Completed state within timeout") + + def test_list_pods(self, elastic_job_params, request): + """Test listing pods for the elastic job.""" + job_name = request.getfixturevalue(elastic_job_params["job_name_fixture"]) + + # Wait a moment to ensure pods are created + time.sleep(10) + + list_pods_result = execute_command([ + "hyp", "list-pods", "hyp-pytorch-job", + "--job-name", job_name + ]) + assert list_pods_result.returncode == 0 + + # Verify the output contains expected headers and job name + output = list_pods_result.stdout.strip() + assert f"Pods for job: {job_name}" in output + assert "POD NAME" in output + assert "NAMESPACE" in output + + # Verify at least one pod is listed (should contain the job name in the pod name) + assert f"{job_name}-pod-" in output + + logger.info(f"Successfully listed pods for elastic job ({elastic_job_params['scenario']}): {job_name}") + + def test_delete_job(self, elastic_job_params, request): + """Test deleting the elastic job.""" + job_name = request.getfixturevalue(elastic_job_params["job_name_fixture"]) + + delete_result = execute_command(["hyp", "delete", "hyp-pytorch-job", "--job-name", job_name]) + assert delete_result.returncode == 0 + logger.info(f"Successfully deleted elastic job ({elastic_job_params['scenario']}): {job_name}") + + # Wait a moment for the job to be deleted + time.sleep(5) + + # Verify the job is no longer listed + list_result = execute_command(["hyp", "list", "hyp-pytorch-job"]) + assert list_result.returncode == 0 + + # The job name should no longer be in the output + assert job_name not in list_result.stdout diff --git a/test/unit_tests/cli/test_training.py b/test/unit_tests/cli/test_training.py index e3c4883d..07251cf2 100644 --- a/test/unit_tests/cli/test_training.py +++ b/test/unit_tests/cli/test_training.py @@ -157,6 +157,122 @@ def test_optional_params(self): self.assertEqual(call_args["metadata"]["labels"]["kueue.x-k8s.io/queue-name"], "localqueue") self.assertEqual(call_args["metadata"]["annotations"]["kueue.x-k8s.io/podset-required-topology"], "topology.k8s.aws/ultraserver-id") + @patch('sys.argv', ['pytest', '--version', '1.1']) + def test_elastic_training_params(self): + """Test job creation with elastic training parameters""" + # Reload the training module with mocked sys.argv + if 'sagemaker.hyperpod.cli.commands.training' in sys.modules: + importlib.reload(sys.modules['sagemaker.hyperpod.cli.commands.training']) + + from sagemaker.hyperpod.cli.commands.training import pytorch_create + + # Test case 1: providing elastic-replica-increment-step + with patch("hyperpod_pytorch_job_template.v1_1.model.HyperPodPytorchJob") as mock_hyperpod_job: + mock_instance = Mock() + mock_hyperpod_job.return_value = mock_instance + + result = self.runner.invoke( + pytorch_create, + [ + "--version", + "1.1", + "--job-name", + "elastic-test-job", + "--image", + "pytorch:latest", + "--elastic-replica-increment-step", + "2", + "--max-node-count", + "4", + "--elastic-graceful-shutdown-timeout-in-seconds", + "180", + "--elastic-scaling-timeout-in-seconds", + "90", + "--elastic-scale-up-snooze-time-in-seconds", + "120" + ], + ) + + print(f"Command output: {result.output}") + + # Verify command succeeded + self.assertEqual(result.exit_code, 0) + self.assertIn("Using version: 1.1", result.output) + + # Verify HyperPodPytorchJob was created with elastic parameters + mock_hyperpod_job.assert_called_once() + call_args = mock_hyperpod_job.call_args[1] + + # Validate basic job configuration + self.assertEqual(call_args["metadata"]["name"], "elastic-test-job") + + # Validate elastic policy configuration + self.assertIsNotNone(call_args.get("elastic_policy")) + elastic_policy = call_args["elastic_policy"] + self.assertEqual(elastic_policy.replicaIncrementStep, 2) + self.assertEqual(elastic_policy.maxReplicas, 4) + self.assertEqual(elastic_policy.gracefulShutdownTimeoutInSeconds, 180) + self.assertEqual(elastic_policy.scalingTimeoutInSeconds, 90) + + self.assertIsNotNone(call_args.get("run_policy")) + run_policy = call_args["run_policy"] + self.assertIsNotNone(run_policy.restartPolicy) + restart_policy = run_policy.restartPolicy + self.assertEqual(restart_policy.scaleUpSnoozeTimeInSeconds, 120) + + mock_instance.create.assert_called_once() + + # Test case 2: providing elastic-replica-discrete-values + with patch("hyperpod_pytorch_job_template.v1_1.model.HyperPodPytorchJob") as mock_hyperpod_job: + mock_instance = Mock() + mock_hyperpod_job.return_value = mock_instance + + result = self.runner.invoke( + pytorch_create, + [ + "--version", + "1.1", + "--job-name", + "elastic-discrete-test-job", + "--image", + "pytorch:latest", + "--elastic-replica-discrete-values", + "[2, 4, 8]", + "--max-node-count", + "8", + "--elastic-graceful-shutdown-timeout-in-seconds", + "180", + "--elastic-scaling-timeout-in-seconds", + "90", + ], + ) + + print(f"Command output: {result.output}") + + # Verify command succeeded + self.assertEqual(result.exit_code, 0) + self.assertIn("Using version: 1.1", result.output) + + # Verify HyperPodPytorchJob was created with elastic parameters + mock_hyperpod_job.assert_called_once() + call_args = mock_hyperpod_job.call_args[1] + + # Validate basic job configuration + self.assertEqual(call_args["metadata"]["name"], "elastic-discrete-test-job") + + # Validate elastic policy configuration + self.assertIsNotNone(call_args.get("elastic_policy")) + elastic_policy = call_args["elastic_policy"] + + self.assertEqual(elastic_policy.replicaDiscreteValues, [2, 4, 8]) + self.assertIsNone(elastic_policy.replicaIncrementStep) + + self.assertEqual(elastic_policy.maxReplicas, 8) + self.assertEqual(elastic_policy.gracefulShutdownTimeoutInSeconds, 180) + self.assertEqual(elastic_policy.scalingTimeoutInSeconds, 90) + + mock_instance.create.assert_called_once() + @patch('sagemaker.hyperpod.common.cli_decorators._namespace_exists') @patch("sagemaker.hyperpod.cli.commands.training.HyperPodPytorchJob") def test_list_jobs(self, mock_hyperpod_pytorch_job, mock_namespace_exists): From 682303b8a50db40fb497918f5da3316aeef9f28f Mon Sep 17 00:00:00 2001 From: Molly He Date: Wed, 3 Dec 2025 07:14:42 -0800 Subject: [PATCH 5/5] version update for v3.5.0 --- CHANGELOG.md | 6 ++++++ hyperpod-pytorch-job-template/CHANGELOG.md | 6 ++++++ hyperpod-pytorch-job-template/pyproject.toml | 2 +- pyproject.toml | 2 +- setup.py | 2 +- 5 files changed, 15 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e9377db3..af4acecb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## v.3.5.0 (2025-12-03) + +### Features + * Elastic training support for HyperPodTrainingOperator that is released in Reinvent 2025 keynote 3. This is a method that dynamically scales distributed machine learning operations. + + ## v.3.4.0 (2025-11-20) ### Features diff --git a/hyperpod-pytorch-job-template/CHANGELOG.md b/hyperpod-pytorch-job-template/CHANGELOG.md index b872a9c4..54bbb327 100644 --- a/hyperpod-pytorch-job-template/CHANGELOG.md +++ b/hyperpod-pytorch-job-template/CHANGELOG.md @@ -1,3 +1,9 @@ +## v1.3.0 (2025-12-03) + +### Features + +* Support for elastic training + ## v1.2.0 (2025-11-20) ### Features diff --git a/hyperpod-pytorch-job-template/pyproject.toml b/hyperpod-pytorch-job-template/pyproject.toml index 9e8b45fc..5eeeac6d 100644 --- a/hyperpod-pytorch-job-template/pyproject.toml +++ b/hyperpod-pytorch-job-template/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "hyperpod-pytorch-job-template" -version = "1.2.0" +version = "1.3.0" readme = "README.md" authors = [{name = "Amazon Web Services"}] license = {text = "Apache-2.0"} diff --git a/pyproject.toml b/pyproject.toml index 8e4308e0..73e07995 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] dynamic = ["dependencies"] name = "sagemaker-hyperpod" -version = "3.4.0" +version = "3.5.0" description = "Amazon SageMaker HyperPod SDK and CLI" readme = "README.md" requires-python = ">=3.8" diff --git a/setup.py b/setup.py index bf52bb00..53036451 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ setup( data_files=sagemaker_hyperpod_recipes, name="sagemaker-hyperpod", - version="3.4.0", + version="3.5.0", description="Amazon SageMaker HyperPod SDK and CLI", long_description=open("README.md").read(), long_description_content_type="text/markdown",