diff --git a/src/hyperpod_cli/commands/cluster.py b/src/hyperpod_cli/commands/cluster.py index bab4ad2b..14ffb391 100644 --- a/src/hyperpod_cli/commands/cluster.py +++ b/src/hyperpod_cli/commands/cluster.py @@ -43,6 +43,8 @@ TEMP_KUBE_CONFIG_FILE, OutputFormat, ) +from hyperpod_cli.telemetry import _hyperpod_telemetry_emitter +from hyperpod_cli.telemetry.constants import Feature from hyperpod_cli.telemetry.user_agent import ( get_user_agent_extra_suffix, ) @@ -107,6 +109,7 @@ multiple=True, help="Optional. The namespace that you want to check the capacity for. Only SageMaker managed namespaces are supported.", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.get_clusters_cli") def get_clusters( region: Optional[str], orchestrator: Optional[str], @@ -463,6 +466,7 @@ def _aggregate_nodes_info( is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.connect_cluster_cli") def connect_cluster( cluster_name: str, region: Optional[str], diff --git a/src/hyperpod_cli/commands/job.py b/src/hyperpod_cli/commands/job.py index 23b95a75..9e85d0c8 100644 --- a/src/hyperpod_cli/commands/job.py +++ b/src/hyperpod_cli/commands/job.py @@ -50,6 +50,8 @@ Volume, USER_NAME_LABEL_KEY, ) +from hyperpod_cli.telemetry import _hyperpod_telemetry_emitter +from hyperpod_cli.telemetry.constants import Feature from hyperpod_cli.clients.kubernetes_client import ( KubernetesClient, ) @@ -124,6 +126,7 @@ is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.get_job_cli") def get_job( job_name: str, namespace: Optional[str], @@ -144,9 +147,8 @@ def get_job( result = get_training_job_service.get_training_job(job_name, namespace, verbose) click.echo(result) except Exception as e: - sys.exit( - f"Unexpected error happens when trying to get training job {job_name} : {e}" - ) + logger.error(f"Unexpected error happens when trying to get training job {job_name} : {e}") + raise @click.command() @@ -186,6 +188,7 @@ def get_job( is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.list_jobs_cli") def list_jobs( namespace: Optional[str], all_namespaces: Optional[bool], @@ -205,7 +208,8 @@ def list_jobs( ) click.echo(result) except Exception as e: - sys.exit(f"Unexpected error happens when trying to list training job : {e}") + logger.error(f"Unexpected error happens when trying to list training job : {e}") + raise @click.command() @@ -228,6 +232,7 @@ def list_jobs( is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.list_pods_cli") def list_pods( job_name: str, namespace: Optional[str], @@ -246,9 +251,8 @@ def list_pods( result = list_pods_service.list_pods_for_training_job(job_name, namespace, True) click.echo(result) except Exception as e: - sys.exit( - f"Unexpected error happens when trying to list pods for training job {job_name} : {e}" - ) + logger.error(f"Unexpected error happens when trying to list pods for training job {job_name} : {e}") + raise @click.command() @@ -271,6 +275,7 @@ def list_pods( is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.cancel_job_cli") def cancel_job( job_name: str, namespace: Optional[str], @@ -287,9 +292,8 @@ def cancel_job( result = cancel_training_job_service.cancel_training_job(job_name, namespace) click.echo(result) except Exception as e: - sys.exit( - f"Unexpected error happens when trying to cancel training job {job_name} : {e}" - ) + logger.error(f"Unexpected error happens when trying to cancel training job {job_name} : {e}") + raise @click.command() @@ -536,6 +540,7 @@ def cancel_job( is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.start_job_cli") def start_job( config_file: Optional[str], job_name: Optional[str], @@ -876,6 +881,7 @@ def start_job( help="Optional. The namespace to use. If not specified, this command will first use the namespace wh connecting the cluster." "Otherwise if namespace is not configured when connecting to the cluster, a namespace that is managed by SageMaker will be auto discovered.", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.patch_job_cli") def patch_job(patch_type: str, job_name: str, namespace: Optional[str]): if patch_type not in JobPatchType.get_values(): diff --git a/src/hyperpod_cli/commands/pod.py b/src/hyperpod_cli/commands/pod.py index 908e5195..c9f86048 100644 --- a/src/hyperpod_cli/commands/pod.py +++ b/src/hyperpod_cli/commands/pod.py @@ -24,6 +24,8 @@ setup_logger, set_logging_level, ) +from hyperpod_cli.telemetry import _hyperpod_telemetry_emitter +from hyperpod_cli.telemetry.constants import Feature logger = setup_logger(__name__) @@ -54,6 +56,7 @@ is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.get_log_cli") def get_log( job_name: str, pod: str, @@ -73,9 +76,8 @@ def get_log( ) click.echo(result) except Exception as e: - sys.exit( - f"Unexpected error happens when trying to get logs for training job {job_name} : {e}" - ) + logger.error(f"Unexpected error happens when trying to get logs for training job {job_name} : {e}") + raise try: cloudwatch_link = get_logs_service.generate_cloudwatch_link(pod, namespace=namespace) @@ -148,6 +150,7 @@ def invoke(self, ctx): is_flag=True, help="Enable debug mode", ) +@_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "hyperpod_v2.exec_cli") def exec( job_name: str, namespace: Optional[str], @@ -173,6 +176,5 @@ def exec( ) click.echo(result) except Exception as e: - sys.exit( - f"Unexpected error happens when trying to exec command for pod {pod} : {e}" - ) + logger.error(f"Unexpected error happens when trying to exec command for pod {pod} : {e}") + raise diff --git a/src/hyperpod_cli/telemetry/__init__.py b/src/hyperpod_cli/telemetry/__init__.py index 65490521..3bb710cc 100644 --- a/src/hyperpod_cli/telemetry/__init__.py +++ b/src/hyperpod_cli/telemetry/__init__.py @@ -10,3 +10,5 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. +from __future__ import absolute_import +from .telemetry_logging import _hyperpod_telemetry_emitter diff --git a/src/hyperpod_cli/telemetry/constants.py b/src/hyperpod_cli/telemetry/constants.py new file mode 100644 index 00000000..50def5a1 --- /dev/null +++ b/src/hyperpod_cli/telemetry/constants.py @@ -0,0 +1,60 @@ +from __future__ import absolute_import +from enum import Enum + + +class Feature(Enum): + """Enumeration of feature names used in telemetry.""" + + HYPERPOD_V2 = 10 + + def __str__(self): # pylint: disable=E0307 + """Return the feature name.""" + return self.name + + +class Status(Enum): + """Enumeration of status values used in telemetry.""" + + SUCCESS = 1 + FAILURE = 0 + + def __str__(self): # pylint: disable=E0307 + """Return the status name.""" + return self.name + + +class Region(str, Enum): + """Telemetry: List of all supported AWS regions.""" + + # Classic + US_EAST_1 = "us-east-1" # IAD + US_EAST_2 = "us-east-2" # CMH + US_WEST_1 = "us-west-1" # SFO + US_WEST_2 = "us-west-2" # PDX + AP_NORTHEAST_1 = "ap-northeast-1" # NRT + AP_NORTHEAST_2 = "ap-northeast-2" # ICN + AP_NORTHEAST_3 = "ap-northeast-3" # KIX + AP_SOUTH_1 = "ap-south-1" # BOM + AP_SOUTHEAST_1 = "ap-southeast-1" # SIN + AP_SOUTHEAST_2 = "ap-southeast-2" # SYD + CA_CENTRAL_1 = "ca-central-1" # YUL + EU_CENTRAL_1 = "eu-central-1" # FRA + EU_NORTH_1 = "eu-north-1" # ARN + EU_WEST_1 = "eu-west-1" # DUB + EU_WEST_2 = "eu-west-2" # LHR + EU_WEST_3 = "eu-west-3" # CDG + SA_EAST_1 = "sa-east-1" # GRU + # Opt-in + AP_EAST_1 = "ap-east-1" # HKG + AP_SOUTHEAST_3 = "ap-southeast-3" # CGK + AF_SOUTH_1 = "af-south-1" # CPT + EU_SOUTH_1 = "eu-south-1" # MXP + ME_SOUTH_1 = "me-south-1" # BAH + MX_CENTRAL_1 = "mx-central-1" # QRO + AP_SOUTHEAST_7 = "ap-southeast-7" # BKK + AP_SOUTH_2 = "ap-south-2" # HYD + AP_SOUTHEAST_4 = "ap-southeast-4" # MEL + EU_CENTRAL_2 = "eu-central-2" # ZRH + EU_SOUTH_2 = "eu-south-2" # ZAZ + IL_CENTRAL_1 = "il-central-1" # TLV + ME_CENTRAL_1 = "me-central-1" # DXB diff --git a/src/hyperpod_cli/telemetry/telemetry_logging.py b/src/hyperpod_cli/telemetry/telemetry_logging.py new file mode 100644 index 00000000..47caa045 --- /dev/null +++ b/src/hyperpod_cli/telemetry/telemetry_logging.py @@ -0,0 +1,356 @@ +from __future__ import absolute_import +import logging +import platform +import sys +from time import perf_counter +from typing import List, Tuple +import functools +import requests +import subprocess +import re + +import boto3 +from hyperpod_cli.telemetry.constants import Feature, Status, Region +import importlib.metadata + +SDK_VERSION = importlib.metadata.version("hyperpod") +DEFAULT_AWS_REGION = "us-west-2" +OS_NAME = platform.system() or "UnresolvedOS" +OS_VERSION = platform.release() or "UnresolvedOSVersion" +OS_NAME_VERSION = "{}/{}".format(OS_NAME, OS_VERSION) +PYTHON_VERSION = "{}.{}.{}".format( + sys.version_info.major, sys.version_info.minor, sys.version_info.micro +) + +FEATURE_TO_CODE = { + # str(Feature.HYPERPOD): 6, # Added to support telemetry in sagemaker-hyperpod-cli + # str(Feature.HYPERPOD_CLI): 7, + str(Feature.HYPERPOD_V2): 10 +} + +STATUS_TO_CODE = { + str(Status.SUCCESS): 1, + str(Status.FAILURE): 0, +} + +logger = logging.getLogger(__name__) + + +def get_region_and_account_from_current_context() -> Tuple[str, str]: + """ + Get region and account ID from current kubernetes context + Returns: (region, account_id) + """ + try: + # Get current context + result = subprocess.run( + ["kubectl", "config", "current-context"], capture_output=True, text=True + ) + + if result.returncode == 0: + context = result.stdout.strip() + + # Extract region + region_pattern = r"([a-z]{2}-[a-z]+-\d{1})" + region = DEFAULT_AWS_REGION + if match := re.search(region_pattern, context): + region = match.group(1) + + # Extract account ID (12 digits) + account_pattern = r"(\d{12})" + account = "unknown" + if match := re.search(account_pattern, context): + account = match.group(1) + + return region, account + + except Exception as e: + logger.debug(f"Failed to get context info from kubectl: {e}") + + return DEFAULT_AWS_REGION, "unknown" + + +def _requests_helper(url, timeout): + """Make a GET request to the given URL""" + + response = None + try: + response = requests.get(url, timeout) + except requests.exceptions.RequestException as e: + logger.exception("Request exception: %s", str(e)) + return response + + +def _construct_url( + accountId: str, + region: str, + status: str, + feature: str, + failure_reason: str, + failure_type: str, + extra_info: str, +) -> str: + """Construct the URL for the telemetry request""" + + base_url = ( + f"https://sm-pysdk-t-{region}.s3.{region}.amazonaws.com/telemetry?" + f"x-accountId={accountId}" + f"&x-status={status}" + f"&x-feature={feature}" + ) + logger.debug("Failure reason: %s", failure_reason) + if failure_reason: + base_url += f"&x-failureReason={failure_reason}" + base_url += f"&x-failureType={failure_type}" + if extra_info: + base_url += f"&x-extra={extra_info}" + return base_url + + +def _extract_telemetry_data(func_name: str, *args, **kwargs) -> str: + """Extract comprehensive telemetry data for all CLI functions""" + telemetry_data = [] + + # Recipe metrics for start_job_cli + if func_name == "hyperpod_v2.start_job_cli": + # Existing high-value parameters + recipe = kwargs.get('recipe') + override_parameters = kwargs.get('override_parameters') + instance_type = kwargs.get('instance_type') + node_count = kwargs.get('node_count') + scheduler_type = kwargs.get('scheduler_type') + auto_resume = kwargs.get('auto_resume') + + # New high-value parameters + config_file = kwargs.get('config_file') + job_kind = kwargs.get('job_kind') + pull_policy = kwargs.get('pull_policy') + restart_policy = kwargs.get('restart_policy') + queue_name = kwargs.get('queue_name') + priority = kwargs.get('priority') + max_retry = kwargs.get('max_retry') + deep_health_check_passed_nodes_only = kwargs.get('deep_health_check_passed_nodes_only') + tasks_per_node = kwargs.get('tasks_per_node') + persistent_volume_claims = kwargs.get('persistent_volume_claims') + volumes = kwargs.get('volumes') + pre_script = kwargs.get('pre_script') + post_script = kwargs.get('post_script') + + # Recipe analysis + if recipe: + parts = recipe.split('/') + if len(parts) >= 2: + telemetry_data.append(f"recipe_type={parts[0]}") + telemetry_data.append(f"model_family={parts[1]}") + telemetry_data.append(f"recipe_name={recipe}") + + # Extract sequence length, GPU type, model size from recipe + if seq_match := re.search(r'seq(\d+)k?', recipe): + telemetry_data.append(f"sequence_length={seq_match.group(1)}k") + if gpu_match := re.search(r'(p5x\d+|trn1x?\d*|p4)', recipe): + telemetry_data.append(f"gpu_type={gpu_match.group(1)}") + if model_match := re.search(r'(\d+)b', recipe): + telemetry_data.append(f"model_size={model_match.group(1)}b") + + # Configuration approach + if config_file: + telemetry_data.append("config_approach=yaml") + else: + telemetry_data.append("config_approach=cli") + + # Job configuration + if job_kind: + telemetry_data.append(f"job_kind={job_kind}") + if pull_policy: + telemetry_data.append(f"pull_policy={pull_policy}") + if restart_policy: + telemetry_data.append(f"restart_policy={restart_policy}") + if queue_name: + telemetry_data.append(f"queue_name_provided=true") + if priority: + telemetry_data.append(f"priority_provided=true") + if max_retry: + telemetry_data.append(f"max_retry={max_retry}") + if deep_health_check_passed_nodes_only: + telemetry_data.append("deep_health_check=true") + + # Resource configuration + if tasks_per_node: + telemetry_data.append(f"tasks_per_node={tasks_per_node}") + if persistent_volume_claims: + telemetry_data.append("pvc_used=true") + if volumes: + telemetry_data.append("volumes_used=true") + if pre_script: + telemetry_data.append("pre_script_used=true") + if post_script: + telemetry_data.append("post_script_used=true") + + # Existing parameters + if override_parameters: + telemetry_data.append("override_used=true") + if instance_type: + telemetry_data.append(f"instance_type={instance_type}") + if node_count: + telemetry_data.append(f"node_count={node_count}") + if scheduler_type: + telemetry_data.append(f"scheduler_type={scheduler_type}") + if auto_resume: + telemetry_data.append(f"auto_resume={auto_resume}") + + # Cluster metrics (get_clusters_cli only - no create/delete found) + elif func_name == "hyperpod_v2.get_clusters_cli": + clusters = kwargs.get('clusters') # Comma-separated cluster names filter + namespace = kwargs.get('namespace') # List of namespaces + + if clusters: + telemetry_data.append(f"clusters_filter_provided=true") + cluster_count = len(clusters.split(',')) if isinstance(clusters, str) else len(clusters) + telemetry_data.append(f"clusters_count={cluster_count}") + if namespace: + telemetry_data.append(f"namespace_provided=true") + ns_count = len(namespace) if isinstance(namespace, (list, tuple)) else 1 + telemetry_data.append(f"namespace_count={ns_count}") + + # Job metrics + elif func_name in ["hyperpod_v2.list_jobs_cli", "hyperpod_v2.get_job_cli", "hyperpod_v2.cancel_job_cli"]: + job_name = kwargs.get('job_name') + namespace = kwargs.get('namespace') + all_namespaces = kwargs.get('all_namespaces') # list_jobs specific + selector = kwargs.get('selector') # list_jobs specific + + if job_name: + telemetry_data.append(f"job_name_provided=true") + if namespace: + telemetry_data.append(f"namespace_provided=true") + if all_namespaces: + telemetry_data.append(f"all_namespaces=true") + if selector: + telemetry_data.append(f"label_selector_provided=true") + + # Pod metrics (list_pods_cli from job.py, get_log_cli and exec_cli from pod.py) + elif func_name in ["hyperpod_v2.list_pods_cli", "hyperpod_v2.get_log_cli", "hyperpod_v2.exec_cli"]: + job_name = kwargs.get('job_name') + namespace = kwargs.get('namespace') + pod = kwargs.get('pod') # get_log_cli and exec_cli specific + all_pods = kwargs.get('all_pods') # exec_cli specific + + if job_name: + telemetry_data.append(f"job_name_provided=true") + if namespace: + telemetry_data.append(f"namespace_provided=true") + if pod: + telemetry_data.append(f"pod_name_provided=true") + if all_pods: + telemetry_data.append(f"all_pods_mode=true") + + # Job patch metrics + elif func_name == "hyperpod_v2.patch_job_cli": + patch_type = kwargs.get('patch_type') # First positional arg + job_name = kwargs.get('job_name') + namespace = kwargs.get('namespace') + + if patch_type: + telemetry_data.append(f"patch_type={patch_type}") + if job_name: + telemetry_data.append(f"job_name_provided=true") + if namespace: + telemetry_data.append(f"namespace_provided=true") + + # Cluster connection metrics + elif func_name == "hyperpod_v2.connect_cluster_cli": + cluster_name = kwargs.get('cluster_name') + namespace = kwargs.get('namespace') + + if cluster_name: + telemetry_data.append(f"cluster_name_provided=true") + if namespace: + telemetry_data.append(f"namespace_provided=true") + + return "&" + "&".join(telemetry_data) if telemetry_data else "" + + + +def _send_telemetry_request( + status: int, + feature_list: List[int], + session, + failure_reason: str = None, + failure_type: str = None, + extra_info: str = None, +) -> None: + """Make GET request to an empty object in S3 bucket""" + try: + region, accountId = get_region_and_account_from_current_context() + + try: + Region(region) # Validate the region + except ValueError: + logger.warning( + "Region not found in supported regions. Telemetry request will not be emitted." + ) + return + + url = _construct_url( + accountId, + region, + str(status), + str( + ",".join(map(str, feature_list)) + ), # Remove brackets and quotes to cut down on length + failure_reason, + failure_type, + extra_info, + ) + # Send the telemetry request + logger.info("Sending telemetry request to [%s]", url) + _requests_helper(url, 2) + logger.info("SageMaker Python SDK telemetry successfully emitted.") + except Exception: # pylint: disable=W0703 + logger.warning("SageMaker Python SDK telemetry not emitted!") + + +def _hyperpod_telemetry_emitter(feature: str, func_name: str): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + extra = ( + f"{func_name}" + f"&x-sdkVersion={SDK_VERSION}" + f"&x-env={PYTHON_VERSION}" + f"&x-sys={OS_NAME_VERSION}" + ) + + # Add comprehensive telemetry data for all functions + telemetry_data = _extract_telemetry_data(func_name, *args, **kwargs) + extra += telemetry_data + + start = perf_counter() + try: + result = func(*args, **kwargs) + duration = round(perf_counter() - start, 2) + extra += f"&x-latency={duration}" + _send_telemetry_request( + STATUS_TO_CODE[str(Status.SUCCESS)], + [FEATURE_TO_CODE[str(feature)]], + None, + None, + None, + extra, + ) + return result + except Exception as e: + duration = round(perf_counter() - start, 2) + extra += f"&x-latency={duration}" + _send_telemetry_request( + STATUS_TO_CODE[str(Status.FAILURE)], + [FEATURE_TO_CODE[str(feature)]], + None, + str(e), + type(e).__name__, + extra, + ) + raise + + return wrapper + return decorator diff --git a/test/unit_tests/common/telemetry/test_telemetry_logging.py b/test/unit_tests/common/telemetry/test_telemetry_logging.py new file mode 100644 index 00000000..99b0722b --- /dev/null +++ b/test/unit_tests/common/telemetry/test_telemetry_logging.py @@ -0,0 +1,286 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +import unittest +from unittest.mock import patch, MagicMock +import pytest + +from hyperpod_cli.telemetry.telemetry_logging import ( + _extract_telemetry_data, + _construct_url, + get_region_and_account_from_current_context, + _hyperpod_telemetry_emitter, +) +from hyperpod_cli.telemetry.constants import Feature, Status + + +class TestTelemetryLogging(unittest.TestCase): + + def test_extract_telemetry_data_start_job_cli(self): + """Test telemetry data extraction for start_job_cli""" + result = _extract_telemetry_data( + "hyperpod_v2.start_job_cli", + recipe="training/llama/hf_llama3_8b_seq8k_gpu_p5x16_pretrain", + instance_type="ml.p5.48xlarge", + node_count=2, + config_file="/path/to/config.yaml", + auto_resume=True, + max_retry=3 + ) + + self.assertIn("recipe_type=training", result) + self.assertIn("model_family=llama", result) + self.assertIn("config_approach=yaml", result) + self.assertIn("instance_type=ml.p5.48xlarge", result) + self.assertIn("node_count=2", result) + self.assertIn("auto_resume=True", result) + self.assertIn("max_retry=3", result) + + def test_extract_telemetry_data_start_job_cli_no_config_file(self): + """Test telemetry data extraction for start_job_cli without config file""" + result = _extract_telemetry_data( + "hyperpod_v2.start_job_cli", + recipe="fine-tuning/llama/hf_llama3_70b_seq8k_gpu_lora", + instance_type="ml.g5.2xlarge" + ) + + self.assertIn("recipe_type=fine-tuning", result) + self.assertIn("model_family=llama", result) + self.assertIn("config_approach=cli", result) + + def test_extract_telemetry_data_get_clusters_cli(self): + """Test telemetry data extraction for get_clusters_cli""" + result = _extract_telemetry_data( + "hyperpod_v2.get_clusters_cli", + clusters="cluster1,cluster2,cluster3", + namespace=["ns1", "ns2"] + ) + + self.assertIn("clusters_filter_provided=true", result) + self.assertIn("clusters_count=3", result) + self.assertIn("namespace_provided=true", result) + self.assertIn("namespace_count=2", result) + + def test_extract_telemetry_data_list_jobs_cli(self): + """Test telemetry data extraction for list_jobs_cli""" + result = _extract_telemetry_data( + "hyperpod_v2.list_jobs_cli", + job_name="test-job", + namespace="test-ns", + all_namespaces=True, + selector="app=test" + ) + + self.assertIn("job_name_provided=true", result) + self.assertIn("namespace_provided=true", result) + self.assertIn("all_namespaces=true", result) + self.assertIn("label_selector_provided=true", result) + + def test_extract_telemetry_data_get_log_cli(self): + """Test telemetry data extraction for get_log_cli""" + result = _extract_telemetry_data( + "hyperpod_v2.get_log_cli", + job_name="test-job", + pod="test-pod", + namespace="test-ns" + ) + + self.assertIn("job_name_provided=true", result) + self.assertIn("pod_name_provided=true", result) + self.assertIn("namespace_provided=true", result) + + def test_extract_telemetry_data_patch_job_cli(self): + """Test telemetry data extraction for patch_job_cli""" + result = _extract_telemetry_data( + "hyperpod_v2.patch_job_cli", + patch_type="suspend", + job_name="test-job", + namespace="test-ns" + ) + + self.assertIn("patch_type=suspend", result) + self.assertIn("job_name_provided=true", result) + self.assertIn("namespace_provided=true", result) + + def test_extract_telemetry_data_connect_cluster_cli(self): + """Test telemetry data extraction for connect_cluster_cli""" + result = _extract_telemetry_data( + "hyperpod_v2.connect_cluster_cli", + cluster_name="test-cluster", + namespace="test-ns" + ) + + self.assertIn("cluster_name_provided=true", result) + self.assertIn("namespace_provided=true", result) + + def test_extract_telemetry_data_unknown_function(self): + """Test telemetry data extraction for unknown function""" + result = _extract_telemetry_data("unknown_function") + self.assertEqual(result, "") + + def test_construct_url(self): + """Test URL construction for telemetry""" + url = _construct_url( + accountId="123456789012", + region="us-west-2", + status="1", + feature="10", + failure_reason=None, + failure_type=None, + extra_info="test=value" + ) + + expected_base = "https://sm-pysdk-t-us-west-2.s3.us-west-2.amazonaws.com/telemetry?" + self.assertTrue(url.startswith(expected_base)) + self.assertIn("x-accountId=123456789012", url) + self.assertIn("x-status=1", url) + self.assertIn("x-feature=10", url) + self.assertIn("x-extra=test=value", url) + + def test_construct_url_with_failure(self): + """Test URL construction with failure information""" + url = _construct_url( + accountId="123456789012", + region="us-east-1", + status="0", + feature="10", + failure_reason="Test error", + failure_type="ValueError", + extra_info="test=value" + ) + + self.assertIn("x-failureReason=Test error", url) + self.assertIn("x-failureType=ValueError", url) + + @patch('hyperpod_cli.telemetry.telemetry_logging.subprocess.run') + def test_get_region_and_account_from_current_context_success(self, mock_run): + """Test successful extraction of region and account from kubectl context""" + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = "arn:aws:eks:us-west-2:123456789012:cluster/test-cluster" + + region, account = get_region_and_account_from_current_context() + + self.assertEqual(region, "us-west-2") + self.assertEqual(account, "123456789012") + + @patch('hyperpod_cli.telemetry.telemetry_logging.subprocess.run') + def test_get_region_and_account_from_current_context_failure(self, mock_run): + """Test fallback when kubectl context extraction fails""" + mock_run.return_value.returncode = 1 + + region, account = get_region_and_account_from_current_context() + + self.assertEqual(region, "us-west-2") # DEFAULT_AWS_REGION + self.assertEqual(account, "unknown") + + @patch('hyperpod_cli.telemetry.telemetry_logging.subprocess.run') + def test_get_region_and_account_exception(self, mock_run): + """Test exception handling in context extraction""" + mock_run.side_effect = Exception("Command failed") + + region, account = get_region_and_account_from_current_context() + + self.assertEqual(region, "us-west-2") # DEFAULT_AWS_REGION + self.assertEqual(account, "unknown") + + @patch('hyperpod_cli.telemetry.telemetry_logging._send_telemetry_request') + def test_telemetry_decorator_success(self, mock_send): + """Test telemetry decorator on successful function execution""" + @_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "test_function") + def test_func(arg1, arg2=None): + return "success" + + result = test_func("value1", arg2="value2") + + self.assertEqual(result, "success") + mock_send.assert_called_once() + # Verify success status was sent + args, kwargs = mock_send.call_args + self.assertEqual(args[0], 1) # SUCCESS status + + @patch('hyperpod_cli.telemetry.telemetry_logging._send_telemetry_request') + def test_telemetry_decorator_failure(self, mock_send): + """Test telemetry decorator on function failure""" + @_hyperpod_telemetry_emitter(Feature.HYPERPOD_V2, "test_function") + def test_func(): + raise ValueError("Test error") + + with self.assertRaises(ValueError): + test_func() + + mock_send.assert_called_once() + # Verify failure status was sent + args, kwargs = mock_send.call_args + self.assertEqual(args[0], 0) # FAILURE status + self.assertEqual(args[3], "Test error") # failure_reason + self.assertEqual(args[4], "ValueError") # failure_type + + def test_recipe_parsing_sequence_length(self): + """Test recipe parsing for sequence length extraction""" + result = _extract_telemetry_data( + "hyperpod_v2.start_job_cli", + recipe="training/llama/hf_llama3_8b_seq16k_gpu_p5x16_pretrain" + ) + + self.assertIn("sequence_length=16k", result) + + def test_recipe_parsing_gpu_type(self): + """Test recipe parsing for GPU type extraction""" + result = _extract_telemetry_data( + "hyperpod_v2.start_job_cli", + recipe="training/llama/hf_llama3_8b_seq8k_trn1x4_pretrain" + ) + + self.assertIn("gpu_type=trn1x4", result) + + def test_recipe_parsing_model_size(self): + """Test recipe parsing for model size extraction""" + result = _extract_telemetry_data( + "hyperpod_v2.start_job_cli", + recipe="training/llama/hf_llama3_70b_seq8k_gpu_p5x32_pretrain" + ) + + self.assertIn("model_size=70b", result) + + def test_advanced_job_parameters(self): + """Test extraction of advanced job parameters""" + result = _extract_telemetry_data( + "hyperpod_v2.start_job_cli", + job_kind="kubeflow/PyTorchJob", + pull_policy="Always", + restart_policy="OnFailure", + queue_name="test-queue", + priority="high", + deep_health_check_passed_nodes_only=True, + tasks_per_node=8, + persistent_volume_claims="pvc1:/data", + volumes="vol1:/host:/container", + pre_script="echo start", + post_script="echo end" + ) + + self.assertIn("job_kind=kubeflow/PyTorchJob", result) + self.assertIn("pull_policy=Always", result) + self.assertIn("restart_policy=OnFailure", result) + self.assertIn("queue_name_provided=true", result) + self.assertIn("priority_provided=true", result) + self.assertIn("deep_health_check=true", result) + self.assertIn("tasks_per_node=8", result) + self.assertIn("pvc_used=true", result) + self.assertIn("volumes_used=true", result) + self.assertIn("pre_script_used=true", result) + self.assertIn("post_script_used=true", result) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file