Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ doc/_build/
/sagemaker-hyperpod/.coverage
/sagemaker-hyperpod/.coverage.*

/hyperpod-cluster-stack-template/build
/hyperpod-pytorch-job-template/build
/hyperpod-custom-inference-template/build
/hyperpod-jumpstart-inference-template/build

# Ignore all contents of result and results directories
/result/
/results/

.idea/
.idea/

.venv*
venv

/hyperpod-cluster-stack-template/build
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,6 @@
"description": "Queue name for job scheduling",
"title": "Queue Name"
},
"accelerators": {
"type": "integer",
"minimum": 0,
"description": "Number of accelerators (GPUs/TPUs)"
},
"vcpu": {
"type": "float",
"minimum": 0,
Expand All @@ -293,6 +288,11 @@
"type": "float",
"minimum": 0,
"description": "Amount of memory in GiB"
},
"accelerators": {
"type": "integer",
"minimum": 0,
"description": "Number of accelerators (GPUs/TPUs)"
},
"accelerators-limit": {
"type": "integer",
Expand Down
77 changes: 64 additions & 13 deletions src/sagemaker/hyperpod/training/hyperpod_pytorch_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pydantic import ConfigDict, Field

from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL
from sagemaker.hyperpod.cli.constants.command_constants import INSTANCE_TYPE_LABEL, NEURON_RESOURCE_LIMIT_KEY, \
NVIDIA_GPU_RESOURCE_LIMIT_KEY
from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
_HyperPodPytorchJob, HyperPodPytorchJobStatus
)
Expand All @@ -20,15 +21,26 @@
import yaml
import logging

from sagemaker.hyperpod.training.quota_allocation_util import _is_valid, _get_resources_from_compute_quotas, _get_resources_from_instance, _get_limits
from sagemaker.hyperpod.training.quota_allocation_util import (
_is_valid,
_get_resources_from_compute_quotas,
_get_resources_from_instance,
_get_limits,
_resolve_default_memory_values,
_set_default_accelerators_val,
_validate_accelerators_inputs,
_resolve_default_cpu_values,
_trim_resource_requests
)

TRAINING_GROUP = "sagemaker.amazonaws.com"
API_VERSION = "v1"
PLURAL = "hyperpodpytorchjobs"
KIND = "HyperPodPyTorchJob"
TRAINING_OPERATOR_NAMESPACE = "aws-hyperpod"
TRAINING_OPERATOR_LABEL = "hp-training-control-plane"

NVIDIA_RESOURCE_KEY = NVIDIA_GPU_RESOURCE_LIMIT_KEY
NEURON_RESOURCE_KEY = NEURON_RESOURCE_LIMIT_KEY

class HyperPodPytorchJob(_HyperPodPytorchJob):
"""HyperPod PyTorch job for distributed training on Amazon SageMaker HyperPod clusters.
Expand Down Expand Up @@ -94,27 +106,64 @@ def _process_replica_resources(cls, data):
requests = resources.get('requests', {})
limits = resources.get('limits', {})

accelerators = None
if requests.get('accelerators'):
accelerators = int(requests.get('accelerators'))
elif requests.get(NVIDIA_RESOURCE_KEY):
accelerators = int(requests.get(NVIDIA_RESOURCE_KEY))
elif requests.get(NEURON_RESOURCE_KEY):
accelerators = int(requests.get(NEURON_RESOURCE_KEY))

# Extract resource values
vcpu = float(requests.get('vcpu')) if requests.get('vcpu') else None
vcpu = None
if requests.get('cpu'):
vcpu = float(requests.get('cpu'))
elif requests.get('vcpu'):
vcpu = float(requests.get('vcpu'))

vcpu_limit = None
if limits.get('cpu'):
vcpu_limit = float(limits.get('cpu'))
elif limits.get('vcpu'):
vcpu_limit = float(limits.get('vcpu'))

memory = cls._extract_numeric_value(requests.get('memory'))
accelerators = int(requests.get('accelerators')) if requests.get('accelerators') else None
memory_limit = cls._extract_numeric_value(limits.get('memory'))
vcpu_limit = float(limits.get('vcpu')) if limits.get('vcpu') else None
accelerators_limit = int(limits.get('accelerators')) if limits.get('accelerators') else None

accelerators_limit = None
if limits.get('accelerators'):
accelerators_limit = int(limits.get('accelerators'))
elif limits.get(NVIDIA_RESOURCE_KEY):
accelerators_limit = int(limits.get(NVIDIA_RESOURCE_KEY))
elif limits.get(NEURON_RESOURCE_KEY):
accelerators_limit = int(limits.get(NEURON_RESOURCE_KEY))

acc_req, acc_lim = _set_default_accelerators_val(instance_type, accelerators, accelerators_limit)
_validate_accelerators_inputs(instance_type, acc_req, acc_lim)

# Validate configuration
valid, error = _is_valid(vcpu, memory, accelerators, node_count, instance_type)
valid, error = _is_valid(vcpu, memory, acc_req, node_count, instance_type)
if not valid:
raise ValueError(error)

# Calculate resource values
requests_value = (_get_resources_from_compute_quotas(instance_type, vcpu, memory, accelerators)
or _get_resources_from_instance(instance_type, node_count=1))
limits_value = _get_limits(instance_type, vcpu_limit, memory_limit, accelerators_limit)
requests_values = _get_resources_from_compute_quotas(instance_type, vcpu, memory, acc_req)
if requests_values is None:
requests_values = _get_resources_from_instance(instance_type, node_count=1)
_trim_resource_requests(instance_type, requests_values)
if NVIDIA_RESOURCE_KEY in requests_values:
acc_lim = requests_values[NVIDIA_RESOURCE_KEY]
elif NEURON_RESOURCE_KEY in requests_values:
acc_lim = requests_values[NEURON_RESOURCE_KEY]

limits_values = _get_limits(instance_type, vcpu_limit, memory_limit, acc_lim)
_resolve_default_memory_values(instance_type, requests_values, limits_values)
_resolve_default_cpu_values(instance_type, requests_values)

# Update data with calculated values
data['template']['spec']['containers'][0]['resources']['requests'] = requests_value
data['template']['spec']['containers'][0]['resources']['limits'] = limits_value
data['template']['spec']['containers'][0]['resources']['requests'] = requests_values
data['template']['spec']['containers'][0]['resources']['limits'] = limits_values

return data
except KeyError as e:
raise ValueError(f"Missing required configuration key: {str(e)}")
Expand Down Expand Up @@ -197,6 +246,8 @@ def create(self, debug=False):
"spec": spec.model_dump(exclude_none=True),
}

# print(config['spec'])
# return
custom_api = client.CustomObjectsApi()
logger.debug(
"Deploying HyperPodPytorchJob with config:\n%s",
Expand Down
120 changes: 110 additions & 10 deletions src/sagemaker/hyperpod/training/quota_allocation_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import re
from sagemaker.hyperpod.cli.constants.command_constants import NVIDIA_GPU_RESOURCE_LIMIT_KEY, NEURON_RESOURCE_LIMIT_KEY
from sagemaker.hyperpod.cli.utils import (
setup_logger
Expand Down Expand Up @@ -135,6 +137,9 @@
"ml.i3en.24xlarge": {"cpu": 96, "gpu": 0, "trainium": 0, "memory": 768}
}

MAX_MEMORY_PROPORTION = 0.85
MAX_CPU_PROPORTION = 0.92

def _has_compute_resource_quota_allocation_resources(memory_in_gib: Optional[float], vcpu: Optional[float], accelerators: Optional[int]) -> bool:
return (
(memory_in_gib is not None) or
Expand Down Expand Up @@ -187,16 +192,15 @@ def _get_resources_from_compute_quotas(instance_type: str,

result["cpu"] = f"{result['cpu']}"
result["memory"] = f"{result['memory']}Gi"
_trim_resource_requests(instance_type, result)
return result


# Gets resources from instance type.
def _get_resources_from_instance(instance_type: str, node_count: int) -> dict:

instance = INSTANCE_RESOURCES.get(instance_type, {})
cpu = instance.get("cpu", 0)
memory = instance.get("memory", 0)

result = {
"cpu": cpu * node_count,
"memory": memory * node_count
Expand All @@ -210,8 +214,29 @@ def _get_resources_from_instance(instance_type: str, node_count: int) -> dict:
result["memory"] = f"{result['memory']}Gi"
return result


def _trim_resource_requests(instance_type: str, requests_values: dict):
instance = INSTANCE_RESOURCES.get(instance_type, {})
allocatable_cpu = float(instance.get("cpu", 0) * MAX_CPU_PROPORTION)
allocatable_memory = float(instance.get("memory", 0) * MAX_MEMORY_PROPORTION)

cpu_request_str = requests_values.get('cpu', '0')
cpu_request = float(''.join(filter(lambda x: x.isdigit() or x == '.', cpu_request_str)))

mem_request_str = requests_values.get('memory', '0Gi')
mem_request = float(mem_request_str.replace('Gi', ''))

final_cpu = min(allocatable_cpu, cpu_request)
final_memory = min(allocatable_memory, mem_request)

requests_values['cpu'] = str(final_cpu)
requests_values['memory'] = f"{final_memory}Gi"

return requests_values


def _get_limits(instance_type: str, vcpu_limit: Optional[float], memory_in_gib_limit: Optional[float], accelerators_limit: Optional[int]) -> dict:

result = {}
type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type)

Expand All @@ -224,32 +249,107 @@ def _get_limits(instance_type: str, vcpu_limit: Optional[float], memory_in_gib_l
else:
# user specified accelerator limit but the instance type wasn't found, set limit to 0 as a precaution
result["nvidia.com/gpu"] = 0

if memory_in_gib_limit is not None:
result["memory"] = memory_in_gib_limit
result["memory"] = f"{result['memory']}Gi"
result["memory"] = str(memory_in_gib_limit) + "Gi"

return result


def _resolve_default_cpu_values(instance_type: str, requests_values: dict) -> None:
instance = INSTANCE_RESOURCES.get(instance_type, {})
total_available_cpu = instance.get('cpu')

cpu_request = float(requests_values.get('cpu')) if requests_values.get('cpu') is not None else None

if cpu_request is not None and cpu_request > total_available_cpu:
raise ValueError(
f"Specified CPU request ({cpu_request}) exceeds instance capacity. "
f"Maximum available CPU for {instance_type} is {total_available_cpu}."
)

max_allocatable_cpu = int(total_available_cpu * MAX_CPU_PROPORTION)
cpu_request = min(cpu_request, max_allocatable_cpu)
requests_values["cpu"] = str(cpu_request)


def _resolve_default_memory_values(instance_type: str, requests_values: dict, limits_values: dict) -> None:

instance = INSTANCE_RESOURCES.get(instance_type, {})
total_available_memory = instance.get("memory", 0)
mem_limit_str = limits_values.get("memory")
mem_request_str = requests_values.get("memory")

user_set_limit = True if mem_limit_str is not None else False
if mem_limit_str is None and mem_request_str is not None:
mem_limit_str = mem_request_str

try:
memory_limit = float(re.match(r'^([0-9]*\.?[0-9]+)', mem_limit_str).group(1))
memory_request = float(re.match(r'^([0-9]*\.?[0-9]+)', mem_request_str).group(1))
except (AttributeError, ValueError):
raise ValueError(f"Invalid memory format: {mem_limit_str or mem_request_str}")

if memory_request > total_available_memory:
raise ValueError(
f"Specified memory request ({memory_request}Gi) exceeds instance capacity. "
f"Maximum available memory for {instance_type} is {total_available_memory}Gi."
)

max_allocatable_memory = int(total_available_memory * MAX_MEMORY_PROPORTION)
if not user_set_limit:
memory_limit = min(memory_limit, max_allocatable_memory)
memory_request = min(memory_request, max_allocatable_memory)
limits_values["memory"] = str(memory_limit) + "Gi"
requests_values["memory"] = str(memory_request) + "Gi"


def _validate_accelerators_inputs(instance_type: str, accelerators_request: int, accelerators_limit: int) -> None:
type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type)
if type_of_accelerator is None and (accelerators_request is not None or accelerators_limit is not None):
raise ValueError(
f"Instance type {instance_type} does not support accelerators, but accelerator values were provided.")

if type_of_accelerator is not None:
if accelerators_request is not None and accelerators_limit is not None:
if accelerators_request != accelerators_limit:
raise ValueError('Accelerator request must equal accelerator limit')
if accelerators_limit > _max_accelerator_per_instance:
raise ValueError('Requested accelerators exceeds capacity')
if accelerators_request > _max_accelerator_per_instance:
raise ValueError('Requested accelerators exceeds capacity')


def _set_default_accelerators_val(instance_type: Optional[str], accelerators_request: Optional[int], accelerators_limit: int|None) -> Tuple[Optional[int], Optional[int]]:
type_of_accelerator, _max_accelerator_per_instance = _get_accelerator_type_and_count(instance_type)
if type_of_accelerator is not None:
if accelerators_request is None and accelerators_limit is None:
return None, None
elif accelerators_request is not None and accelerators_limit is None:
return accelerators_request, accelerators_request
elif accelerators_request is None and accelerators_limit is not None:
return accelerators_limit, accelerators_limit
else:
return accelerators_request, accelerators_limit
return None, None


def _is_valid(vcpu: Optional[float], memory_in_gib: Optional[float], accelerators: Optional[int],
node_count: Optional[int], instance_type: Optional[str]) -> tuple[bool, str]:

has_gpu_quota_allocation = _has_compute_resource_quota_allocation_resources(memory_in_gib, vcpu, accelerators)

if instance_type is None and has_gpu_quota_allocation:
return False, "Instance-type must be specified when accelerators, vcpu, or memory-in-gib specified"

node_specified = node_count is not None and node_count > 0

# Check if instance_type is valid only when it's provided
if instance_type is not None and (INSTANCE_RESOURCES.get(instance_type) is None):
return False, f"Invalid instance-type {instance_type}. Please re-check the instance type and contact AWS for support."

if instance_type is not None:
#both resources and node count specified
if (has_gpu_quota_allocation and node_specified):
return False, f"Either node-count or a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
return False, f"Either node-count OR a combination of accelerators, vcpu, memory-in-gib must be specified for instance-type {instance_type}"
return True, ""


Expand Down
Loading
Loading