diff --git a/.gitignore b/.gitignore index 2aeb3d4b..f03375cd 100644 --- a/.gitignore +++ b/.gitignore @@ -23,13 +23,13 @@ doc/_build/ /sagemaker-hyperpod/.coverage /sagemaker-hyperpod/.coverage.* -/hyperpod-cluster-stack-template/build -/hyperpod-pytorch-job-template/build -/hyperpod-custom-inference-template/build -/hyperpod-jumpstart-inference-template/build - # Ignore all contents of result and results directories /result/ /results/ -.idea/ \ No newline at end of file +.idea/ + +.venv* +venv + +/hyperpod-cluster-stack-template/build \ No newline at end of file diff --git a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json index 4b86c591..fc37ed0e 100644 --- a/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json +++ b/hyperpod-pytorch-job-template/hyperpod_pytorch_job_template/v1_1/schema.json @@ -279,11 +279,6 @@ "description": "Queue name for job scheduling", "title": "Queue Name" }, - "accelerators": { - "type": "integer", - "minimum": 0, - "description": "Number of accelerators (GPUs/TPUs)" - }, "vcpu": { "type": "float", "minimum": 0, @@ -293,6 +288,11 @@ "type": "float", "minimum": 0, "description": "Amount of memory in GiB" + }, + "accelerators": { + "type": "integer", + "minimum": 0, + "description": "Number of accelerators (GPUs/TPUs)" }, "accelerators-limit": { "type": "integer", diff --git a/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py b/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py index 98a4791c..87635746 100644 --- a/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py +++ b/src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py @@ -1,6 +1,7 @@ from pydantic import ConfigDict, Field -from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL +from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL, NEURON_RESOURCE_LIMIT_KEY, \ + NVIDIA_GPU_RESOURCE_LIMIT_KEY from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import ( _HyperPodPytorchJob, HyperPodPytorchJobStatus ) @@ -20,7 +21,17 @@ import yaml import logging -from sagemaker.hyperpod.training.quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits +from sagemaker.hyperpod.training.quota_allocation_util import ( + _is_valid, + _get_resources_from_compute_quotas, + _get_resources_from_instance, + _get_limits, + _resolve_default_memory_values, + _set_default_accelerators_val, + _validate_accelerators_inputs, + _resolve_default_cpu_values, + _trim_resource_requests +) TRAINING_GROUP = "sagemaker.amazonaws.com" API_VERSION = "v1" @@ -28,7 +39,8 @@ KIND = "HyperPodPyTorchJob" TRAINING_OPERATOR_NAMESPACE = "aws-hyperpod" TRAINING_OPERATOR_LABEL = "hp-training-control-plane" - +NVIDIA_RESOURCE_KEY = NVIDIA_GPU_RESOURCE_LIMIT_KEY +NEURON_RESOURCE_KEY = NEURON_RESOURCE_LIMIT_KEY class HyperPodPytorchJob(_HyperPodPytorchJob): """HyperPod PyTorch job for distributed training on Amazon SageMaker HyperPod clusters. @@ -94,27 +106,64 @@ def _process_replica_resources(cls, data): requests = resources.get('requests', {}) limits = resources.get('limits', {}) + accelerators = None + if requests.get('accelerators'): + accelerators = int(requests.get('accelerators')) + elif requests.get(NVIDIA_RESOURCE_KEY): + accelerators = int(requests.get(NVIDIA_RESOURCE_KEY)) + elif requests.get(NEURON_RESOURCE_KEY): + accelerators = int(requests.get(NEURON_RESOURCE_KEY)) + # Extract resource values - vcpu = float(requests.get('vcpu')) if requests.get('vcpu') else None + vcpu = None + if requests.get('cpu'): + vcpu = float(requests.get('cpu')) + elif requests.get('vcpu'): + vcpu = float(requests.get('vcpu')) + + vcpu_limit = None + if limits.get('cpu'): + vcpu_limit = float(limits.get('cpu')) + elif limits.get('vcpu'): + vcpu_limit = float(limits.get('vcpu')) + memory = cls._extract_numeric_value(requests.get('memory')) - accelerators = int(requests.get('accelerators')) if requests.get('accelerators') else None memory_limit = cls._extract_numeric_value(limits.get('memory')) - vcpu_limit = float(limits.get('vcpu')) if limits.get('vcpu') else None - accelerators_limit = int(limits.get('accelerators')) if limits.get('accelerators') else None + + accelerators_limit = None + if limits.get('accelerators'): + accelerators_limit = int(limits.get('accelerators')) + elif limits.get(NVIDIA_RESOURCE_KEY): + accelerators_limit = int(limits.get(NVIDIA_RESOURCE_KEY)) + elif limits.get(NEURON_RESOURCE_KEY): + accelerators_limit = int(limits.get(NEURON_RESOURCE_KEY)) + + acc_req, acc_lim = _set_default_accelerators_val(instance_type, accelerators, accelerators_limit) + _validate_accelerators_inputs(instance_type, acc_req, acc_lim) # Validate configuration - valid, error = _is_valid(vcpu, memory, accelerators, node_count, instance_type) + valid, error = _is_valid(vcpu, memory, acc_req, node_count, instance_type) if not valid: raise ValueError(error) # Calculate resource values - requests_value = (_get_resources_from_compute_quotas(instance_type, vcpu, memory, accelerators) - or _get_resources_from_instance(instance_type, node_count=1)) - limits_value = _get_limits(instance_type, vcpu_limit, memory_limit, accelerators_limit) + requests_values = _get_resources_from_compute_quotas(instance_type, vcpu, memory, acc_req) + if requests_values is None: + requests_values = _get_resources_from_instance(instance_type, node_count=1) + _trim_resource_requests(instance_type, requests_values) + if NVIDIA_RESOURCE_KEY in requests_values: + acc_lim = requests_values[NVIDIA_RESOURCE_KEY] + elif NEURON_RESOURCE_KEY in requests_values: + acc_lim = requests_values[NEURON_RESOURCE_KEY] + + limits_values = _get_limits(instance_type, vcpu_limit, memory_limit, acc_lim) + _resolve_default_memory_values(instance_type, requests_values, limits_values) + _resolve_default_cpu_values(instance_type, requests_values) # Update data with calculated values - data['template']['spec']['containers'][0]['resources']['requests'] = requests_value - data['template']['spec']['containers'][0]['resources']['limits'] = limits_value + data['template']['spec']['containers'][0]['resources']['requests'] = requests_values + data['template']['spec']['containers'][0]['resources']['limits'] = limits_values + return data except KeyError as e: raise ValueError(f"Missing required configuration key: {str(e)}") @@ -197,6 +246,8 @@ def create(self, debug=False): "spec": spec.model_dump(exclude_none=True), } + # print(config['spec']) + # return custom_api = client.CustomObjectsApi() logger.debug( "Deploying HyperPodPytorchJob with config:\n%s", diff --git a/src/sagemaker/hyperpod/training/quota_allocation_util.py b/src/sagemaker/hyperpod/training/quota_allocation_util.py index 99aec20c..adb269db 100644 --- a/src/sagemaker/hyperpod/training/quota_allocation_util.py +++ b/src/sagemaker/hyperpod/training/quota_allocation_util.py @@ -10,6 +10,8 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +import logging +import re from sagemaker.hyperpod.cli.constants.command_constants import NVIDIA_GPU_RESOURCE_LIMIT_KEY, NEURON_RESOURCE_LIMIT_KEY from sagemaker.hyperpod.cli.utils import ( setup_logger @@ -135,6 +137,9 @@ "ml.i3en.24xlarge": {"cpu": 96, "gpu": 0, "trainium": 0, "memory": 768} } +MAX_MEMORY_PROPORTION = 0.85 +MAX_CPU_PROPORTION = 0.92 + def _has_compute_resource_quota_allocation_resources(memory_in_gib: Optional[float], vcpu: Optional[float], accelerators: Optional[int]) -> bool: return ( (memory_in_gib is not None) or @@ -187,16 +192,15 @@ def _get_resources_from_compute_quotas(instance_type: str, result["cpu"] = f"{result['cpu']}" result["memory"] = f"{result['memory']}Gi" + _trim_resource_requests(instance_type, result) return result # Gets resources from instance type. def _get_resources_from_instance(instance_type: str, node_count: int) -> dict: - instance = INSTANCE_RESOURCES.get(instance_type, {}) cpu = instance.get("cpu", 0) memory = instance.get("memory", 0) - result = { "cpu": cpu * node_count, "memory": memory * node_count @@ -210,8 +214,29 @@ def _get_resources_from_instance(instance_type: str, node_count: int) -> dict: result["memory"] = f"{result['memory']}Gi" return result + +def _trim_resource_requests(instance_type: str, requests_values: dict): + instance = INSTANCE_RESOURCES.get(instance_type, {}) + allocatable_cpu = float(instance.get("cpu", 0) * MAX_CPU_PROPORTION) + allocatable_memory = float(instance.get("memory", 0) * MAX_MEMORY_PROPORTION) + + cpu_request_str = requests_values.get('cpu', '0') + cpu_request = float(''.join(filter(lambda x: x.isdigit() or x == '.', cpu_request_str))) + + mem_request_str = requests_values.get('memory', '0Gi') + mem_request = float(mem_request_str.replace('Gi', '')) + + final_cpu = min(allocatable_cpu, cpu_request) + final_memory = min(allocatable_memory, mem_request) + + requests_values['cpu'] = str(final_cpu) + requests_values['memory'] = f"{final_memory}Gi" + + return requests_values + + def _get_limits(instance_type: str, vcpu_limit: Optional[float], memory_in_gib_limit: Optional[float], accelerators_limit: Optional[int]) -> dict: - + result = {} type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type) @@ -224,14 +249,90 @@ def _get_limits(instance_type: str, vcpu_limit: Optional[float], memory_in_gib_l else: # user specified accelerator limit but the instance type wasn't found, set limit to 0 as a precaution result["nvidia.com/gpu"] = 0 - if memory_in_gib_limit is not None: - result["memory"] = memory_in_gib_limit - result["memory"] = f"{result['memory']}Gi" + result["memory"] = str(memory_in_gib_limit) + "Gi" return result +def _resolve_default_cpu_values(instance_type: str, requests_values: dict) -> None: + instance = INSTANCE_RESOURCES.get(instance_type, {}) + total_available_cpu = instance.get('cpu') + + cpu_request = float(requests_values.get('cpu')) if requests_values.get('cpu') is not None else None + + if cpu_request is not None and cpu_request > total_available_cpu: + raise ValueError( + f"Specified CPU request ({cpu_request}) exceeds instance capacity. " + f"Maximum available CPU for {instance_type} is {total_available_cpu}." + ) + + max_allocatable_cpu = int(total_available_cpu * MAX_CPU_PROPORTION) + cpu_request = min(cpu_request, max_allocatable_cpu) + requests_values["cpu"] = str(cpu_request) + + +def _resolve_default_memory_values(instance_type: str, requests_values: dict, limits_values: dict) -> None: + + instance = INSTANCE_RESOURCES.get(instance_type, {}) + total_available_memory = instance.get("memory", 0) + mem_limit_str = limits_values.get("memory") + mem_request_str = requests_values.get("memory") + + user_set_limit = True if mem_limit_str is not None else False + if mem_limit_str is None and mem_request_str is not None: + mem_limit_str = mem_request_str + + try: + memory_limit = float(re.match(r'^([0-9]*\.?[0-9]+)', mem_limit_str).group(1)) + memory_request = float(re.match(r'^([0-9]*\.?[0-9]+)', mem_request_str).group(1)) + except (AttributeError, ValueError): + raise ValueError(f"Invalid memory format: {mem_limit_str or mem_request_str}") + + if memory_request > total_available_memory: + raise ValueError( + f"Specified memory request ({memory_request}Gi) exceeds instance capacity. " + f"Maximum available memory for {instance_type} is {total_available_memory}Gi." + ) + + max_allocatable_memory = int(total_available_memory * MAX_MEMORY_PROPORTION) + if not user_set_limit: + memory_limit = min(memory_limit, max_allocatable_memory) + memory_request = min(memory_request, max_allocatable_memory) + limits_values["memory"] = str(memory_limit) + "Gi" + requests_values["memory"] = str(memory_request) + "Gi" + + +def _validate_accelerators_inputs(instance_type: str, accelerators_request: int, accelerators_limit: int) -> None: + type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type) + if type_of_accelerator is None and (accelerators_request is not None or accelerators_limit is not None): + raise ValueError( + f"Instance type {instance_type} does not support accelerators, but accelerator values were provided.") + + if type_of_accelerator is not None: + if accelerators_request is not None and accelerators_limit is not None: + if accelerators_request != accelerators_limit: + raise ValueError('Accelerator request must equal accelerator limit') + if accelerators_limit > _max_accelerator_per_instance: + raise ValueError('Requested accelerators exceeds capacity') + if accelerators_request > _max_accelerator_per_instance: + raise ValueError('Requested accelerators exceeds capacity') + + +def _set_default_accelerators_val(instance_type: Optional[str], accelerators_request: Optional[int], accelerators_limit: int|None) -> Tuple[Optional[int], Optional[int]]: + type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type) + if type_of_accelerator is not None: + if accelerators_request is None and accelerators_limit is None: + return None, None + elif accelerators_request is not None and accelerators_limit is None: + return accelerators_request, accelerators_request + elif accelerators_request is None and accelerators_limit is not None: + return accelerators_limit, accelerators_limit + else: + return accelerators_request, accelerators_limit + return None, None + + def _is_valid(vcpu: Optional[float], memory_in_gib: Optional[float], accelerators: Optional[int], node_count: Optional[int], instance_type: Optional[str]) -> tuple[bool, str]: @@ -239,17 +340,16 @@ def _is_valid(vcpu: Optional[float], memory_in_gib: Optional[float], accelerator if instance_type is None and has_gpu_quota_allocation: return False, "Instance-type must be specified when accelerators, vcpu, or memory-in-gib specified" - + node_specified = node_count is not None and node_count > 0 - + # Check if instance_type is valid only when it's provided if instance_type is not None and (INSTANCE_RESOURCES.get(instance_type) is None): return False, f"Invalid instance-type {instance_type}. Please re-check the instance type and contact AWS for support." - if instance_type is not None: #both resources and node count specified if (has_gpu_quota_allocation and node_specified): - return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}" + return False, f"Either node-count OR a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}" return True, "" diff --git a/test/integration_tests/training/sdk/test_sdk_quota_allocation.py b/test/integration_tests/training/sdk/test_sdk_quota_allocation.py index 46eb237e..f98f688b 100644 --- a/test/integration_tests/training/sdk/test_sdk_quota_allocation.py +++ b/test/integration_tests/training/sdk/test_sdk_quota_allocation.py @@ -75,7 +75,7 @@ def test_create_job_with_quota_parameters(self, test_job_name, image_uri): # Verify the job was created with correct resource allocation created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) assert created_job is not None - + # Clean up pytorch_job.delete() logger.info(f"Successfully deleted job: {test_job_name}") @@ -123,7 +123,6 @@ def test_create_job_with_only_replicas_parameters(self, test_job_name, image_uri pytorch_job.delete() logger.info(f"Successfully deleted job: {test_job_name}") - def test_create_job_with_float_quota_parameters(self, test_job_name, image_uri): """Test creating a job with float quota parameters.""" replica_specs = [ @@ -297,4 +296,261 @@ def test_default_replicas_allocation(self, test_job_name, image_uri): # Clean up pytorch_job.delete() - logger.info(f"Successfully deleted job: {test_job_name}") \ No newline at end of file + logger.info(f"Successfully deleted job: {test_job_name}") + + def test_set_default_memory_limit_caps_at_93_percent(self, test_job_name, image_uri): + """Test that _set_default_memory_limit caps memory at 93% of instance capacity.""" + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"memory": "128Gi"}, # Exceeds 93% of ml.g5.8xlarge + limits={"memory": "128Gi"} + ) + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.8xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + pytorch_job.create() + logger.info(f"Created job memory should be set to 93% max capacity: {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + + def test_validate_accelerators_values_enforces_equality(self, test_job_name, image_uri): + """Test that _validate_accelerators_values enforces request/limit equality. + + This test verifies: + 1. Mismatched accelerator requests and limits raise ValueError + 2. Error message includes both values for debugging + """ + replica_specs = [ + ReplicaSpec( + name="pod", + replicas=0, + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"nvidia.com/gpu": "1"}, + limits={"nvidia.com/gpu": "2"} # Mismatch should cause error + ) + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + # Should raise ValueError due to mismatched accelerator values + with pytest.raises(ValueError, match="Accelerator request must equal accelerator limit"): + pytorch_job.create() + + def test_set_default_accelerators_values_with_missing_values(self, test_job_name, image_uri): + """Test that _set_default_accelerators_values sets defaults when values are missing.""" + replica_specs = [ + ReplicaSpec( + name="pod", + replicas=0, + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"cpu": "4", "memory": "16Gi"}, + limits={"nvidia.com/gpu": "1"} # Only limit specified + ) + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + pytorch_job.create() + logger.info(f"Created job accelerators should be set to (1): {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + + def test_resolve_default_memory_values_request_exceeds_limit(self, test_job_name, image_uri): + """Test that _resolve_default_memory_values reduces request when it exceeds limit.""" + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"memory": "10Gi"}, + limits={"memory": "8Gi"} # Request exceeds limit + ) + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + pytorch_job.create() + logger.info(f"Created job memory should be set to '8Gi': {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + + def test_validate_accelerators_inputs_exceeds_capacity(self, test_job_name, image_uri): + """Test that _validate_accelerators_inputs validates capacity limits.""" + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + requests={"nvidia.com/gpu": "2"}, + limits={"nvidia.com/gpu": "2"} # Exceeds ml.g5.xlarge capacity (1 GPU) + ) + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + with pytest.raises(ValueError, match="Requested accelerators exceeds capacity"): + pytorch_job.create() + + def test_set_default_accelerators_val_missing_request(self, test_job_name, image_uri): + """Test that _set_default_accelerators_val sets request when missing.""" + replica_specs = [ + ReplicaSpec( + name="pod", + template=Template( + spec=Spec( + containers=[ + Containers( + name="container-name", + image=image_uri, + image_pull_policy="Always", + resources=Resources( + limits={"nvidia.com/gpu": "1"}, # Only limit specified + requests={} + ) + ) + ], + node_selector={"node.kubernetes.io/instance-type": "ml.g5.xlarge"} + ) + ), + ) + ] + + pytorch_job = HyperPodPytorchJob( + metadata=Metadata(name=test_job_name, namespace=NAMESPACE), + nproc_per_node="1", + replica_specs=replica_specs, + run_policy=RunPolicy(clean_pod_policy="None"), + ) + + pytorch_job.create() + logger.info(f"Created job memory should be set to '8Gi': {test_job_name}") + + # Wait for job to be created + time.sleep(10) + + # Verify the job was created + created_job = HyperPodPytorchJob.get(test_job_name, NAMESPACE) + assert created_job is not None + + # Clean up + pytorch_job.delete() + logger.info(f"Successfully deleted job: {test_job_name}") + diff --git a/test/unit_tests/cli/test_quota_allocation_util.py b/test/unit_tests/cli/test_quota_allocation_util.py index e0fc4b36..dd2655de 100644 --- a/test/unit_tests/cli/test_quota_allocation_util.py +++ b/test/unit_tests/cli/test_quota_allocation_util.py @@ -10,6 +10,7 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +import re import pytest from sagemaker.hyperpod.training.quota_allocation_util import ( @@ -19,6 +20,10 @@ _get_accelerator_type_and_count, _get_resources_from_compute_quotas, _has_compute_resource_quota_allocation_resources, + _resolve_default_memory_values, + _validate_accelerators_inputs, + _set_default_accelerators_val, + _resolve_default_cpu_values, INSTANCE_RESOURCES ) @@ -92,7 +97,7 @@ def test_get_resources_from_compute_quotas_memory_only(self): def test_get_resources_from_compute_quotas_gpu_instance_with_accelerators_ratio_1(self): result = _get_resources_from_compute_quotas("ml.g5.xlarge", None, None, 1) # ml.g5.xlarge has 1 GPU, 4 CPUs, 16GiB memory - assert result == {"cpu": "4.0", "memory": "16.0Gi", "nvidia.com/gpu": 1} + assert result == {"cpu": "3.68", "memory": "13.6Gi", "nvidia.com/gpu": 1} def test_get_resources_from_compute_quotas_gpu_instance_with_accelerators_ratio_half(self): result = _get_resources_from_compute_quotas("ml.g6e.48xlarge", None, None, 4) @@ -122,7 +127,7 @@ def test_get_resources_from_compute_quotas_vcpu_only(self): def test_get_resources_from_compute_quotas_accelerators_and_cpu_only(self): result = _get_resources_from_compute_quotas("ml.g5.xlarge", 2.0, None, 1) # ml.g5.xlarge has 1 gpu, 4 CPUs and 16GB memory, and memory calculated as accelerator ratio - assert result == {"cpu": "2.0", "memory": "16.0Gi", "nvidia.com/gpu": 1} + assert result == {"cpu": "2.0", "memory": "13.6Gi", "nvidia.com/gpu": 1} # Tests for _get_resources_from_instance method @pytest.mark.parametrize( @@ -204,7 +209,7 @@ def test_is_valid_invalid_instance_type(self): def test_is_valid_both_node_count_and_resources(self): valid, message = _is_valid(4.0, None, None, 2, "ml.g5.xlarge") assert not valid - assert message == "Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type ml.g5.xlarge" + assert message == "Either node-count OR a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type ml.g5.xlarge" def test_is_valid_both_node_count_and_limits(self): valid, message = _is_valid(None, None, None, 2, "ml.g5.xlarge") @@ -265,3 +270,112 @@ def test_get_resources_from_compute_quotas_float_values(self): def test_get_resources_from_instance_zero_nodes(self): result = _get_resources_from_instance("ml.g5.xlarge", 0) assert result == {"cpu": "0", "memory": "0Gi", "nvidia.com/gpu": 0} + + # Tests for _validate_memory_limit + def test_validate_memory_limit_within_bounds(self): + requests = {"memory": "8Gi"} + limits = {"memory": "12Gi"} + _resolve_default_memory_values("ml.g5.xlarge", requests, limits) + assert requests["memory"] == "8.0Gi" + assert limits["memory"] == "12.0Gi" + + def test_validate_memory_limit_missing_values(self): + requests = {} + limits = {"memory": "8Gi"} + with pytest.raises(TypeError, match="expected string or bytes-like object, got 'NoneType'"): + _resolve_default_memory_values("ml.g5.xlarge", requests, limits) + + def test_validate_memory_limit_invalid_format(self): + requests = {"memory": "invalid"} + limits = {"memory": "8Gi"} + with pytest.raises(ValueError, match="Invalid memory format"): + _resolve_default_memory_values("ml.g5.xlarge", requests, limits) + + def test_resolve_default_memory_values_set_to_request(self): + requests = {"memory": "10Gi"} + limits = {} + _resolve_default_memory_values("ml.g5.xlarge", requests, limits) + assert requests["memory"] == "10.0Gi" + assert limits["memory"] == "10.0Gi" + + def test_resolve_default_memory_values_set_to_allocatable(self): + requests = {"memory": "16Gi"} + limits = {} + _resolve_default_memory_values("ml.g5.xlarge", requests, limits) + assert requests["memory"] == "13Gi" + assert limits["memory"] == "13Gi" + + # Tests for _validate_accelerators_inputs + def test_validate_accelerators_inputs_valid_equal_values(self): + # Should not raise exception + _validate_accelerators_inputs("ml.g5.xlarge", 1, 1) + + def test_validate_accelerators_inputs_unequal_values(self): + with pytest.raises(ValueError, match="Accelerator request must equal accelerator limit"): + _validate_accelerators_inputs("ml.g5.xlarge", 1, 2) + + def test_validate_accelerators_inputs_exceeds_capacity_request(self): + with pytest.raises(ValueError, match="Requested accelerators exceeds capacity"): + _validate_accelerators_inputs("ml.g5.xlarge", 2, 2) + + def test_validate_accelerators_inputs_exceeds_capacity_limit(self): + with pytest.raises(ValueError, match="Accelerator request must equal accelerator limit"): + _validate_accelerators_inputs("ml.g5.xlarge", 1, 2) + + def test_validate_accelerators_inputs_cpu_only_instance(self): + with pytest.raises(ValueError, match="Instance type ml.c5.large does not support accelerators, but accelerator values were provided."): + _validate_accelerators_inputs("ml.c5.large", 1, 1) + + # Tests for _set_default_accelerators_val + def test_set_default_accelerators_val_both_none(self): + request, limit = _set_default_accelerators_val("ml.g5.xlarge", None, None) + assert request is None + assert limit is None + + def test_set_default_accelerators_val_request_only(self): + request, limit = _set_default_accelerators_val("ml.g5.xlarge", 1, None) + assert request == 1 + assert limit == 1 + + def test_set_default_accelerators_val_limit_only(self): + request, limit = _set_default_accelerators_val("ml.g5.xlarge", None, 1) + assert request == 1 + assert limit == 1 + + def test_set_default_accelerators_val_both_provided(self): + request, limit = _set_default_accelerators_val("ml.g5.xlarge", 1, 1) + assert request == 1 + assert limit == 1 + + def test_set_default_accelerators_val_cpu_only_instance(self): + request, limit = _set_default_accelerators_val("ml.c5.large", 1, 1) + assert request is None + assert limit is None + + def test_resolve_default_cpu_request_exceeds_capacity(self): + requests_values = {"cpu": "10.0"} + limits_values = {} + with pytest.raises(ValueError, match=re.escape("Specified CPU request (10.0) exceeds instance capacity. Maximum available CPU for ml.g5.2xlarge is 8.")): + _resolve_default_cpu_values("ml.g5.2xlarge", requests_values) + + # Tests for _resolve_default_cpu_values + def test_resolve_default_cpu_values_request_only(self): + requests_values = {"cpu": "2.0"} + limits_values = {} + _resolve_default_cpu_values("ml.c5.large", requests_values) + assert requests_values["cpu"] == "1" + assert "cpu" not in limits_values + + + def test_resolve_default_cpu_values_both_provided(self): + requests_values = {"cpu": "2.0"} + limits_values = {"cpu": "4.0"} + _resolve_default_cpu_values("ml.c5.large", requests_values) + assert requests_values["cpu"] == "1" + assert limits_values["cpu"] == "4.0" + + def test_resolve_default_cpu_values_exceeds_instance_capacity(self): + requests_values = {"cpu": "10.0"} + limits_values = {} + with pytest.raises(ValueError, match=re.escape("Specified CPU request (10.0) exceeds instance capacity. Maximum available CPU for ml.c5.large is 2.")): + _resolve_default_cpu_values("ml.c5.large", requests_values) \ No newline at end of file