From 60aca2ec8f8cdda2fc9688f969bb8e6e9ecb3445 Mon Sep 17 00:00:00 2001 From: zpoint Date: Wed, 29 Oct 2025 11:14:48 +0800 Subject: [PATCH 1/4] retry for gpu label --- sky/utils/kubernetes/gpu_labeler.py | 76 +++++++++++++++++++---------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/sky/utils/kubernetes/gpu_labeler.py b/sky/utils/kubernetes/gpu_labeler.py index 50938f9b1cd..e0506d88a83 100644 --- a/sky/utils/kubernetes/gpu_labeler.py +++ b/sky/utils/kubernetes/gpu_labeler.py @@ -181,38 +181,62 @@ def wait_for_jobs_completion(jobs_to_node_names: Dict[str, str], True if the Job completed successfully, False if it failed or timed out. """ batch_v1 = kubernetes.batch_api(context=context) - w = kubernetes.watch() completed_jobs = [] - for event in w.stream(func=batch_v1.list_namespaced_job, - namespace=namespace, - timeout_seconds=timeout): - job = event['object'] - job_name = job.metadata.name - if job_name in jobs_to_node_names: - node_name = jobs_to_node_names[job_name] - if job.status and job.status.completion_time: - print( - _format_string( - f'GPU labeler job for node {node_name} ' - 'completed successfully', colorama.Style.DIM)) - completed_jobs.append(job_name) - num_remaining_jobs = len(jobs_to_node_names) - len( - completed_jobs) - if num_remaining_jobs == 0: + + def _watch_jobs(): + """Helper function to watch jobs with error handling.""" + w = kubernetes.watch() + for event in w.stream(func=batch_v1.list_namespaced_job, + namespace=namespace, + timeout_seconds=timeout): + job = event['object'] + job_name = job.metadata.name + if job_name in jobs_to_node_names: + node_name = jobs_to_node_names[job_name] + if job.status and job.status.completion_time: + print( + _format_string( + f'GPU labeler job for node {node_name} ' + 'completed successfully', colorama.Style.DIM)) + completed_jobs.append(job_name) + num_remaining_jobs = len(jobs_to_node_names) - len( + completed_jobs) + if num_remaining_jobs == 0: + w.stop() + return True + elif job.status and job.status.failed: + print( + _format_string( + f'GPU labeler job for node {node_name} failed', + colorama.Style.DIM)) w.stop() - return True - elif job.status and job.status.failed: - print( - _format_string( - f'GPU labeler job for node {node_name} failed', - colorama.Style.DIM)) - w.stop() - return False + return False + return None # Timeout + + try: + result = _watch_jobs() + if result is not None: + return result + except kubernetes.api_exception() as e: + if e.status == 504 and 'Too large resource version' in str(e): + print( + _format_string( + 'Watch failed due to resource version mismatch. ' + 'Restarting watch...', colorama.Fore.YELLOW)) + # Restart watch without resource version - let Kubernetes choose + # starting point + result = _watch_jobs() + if result is not None: + return result + else: + # Re-raise other API exceptions + raise + print( _format_string( f'Timed out after waiting {timeout} seconds ' 'for job to complete', colorama.Style.DIM)) - return False #Timed out + return False # Timed out def main(): From 5df3410e554bd5b2c90c8a8015f5b96ffcb7e8ae Mon Sep 17 00:00:00 2001 From: zpoint Date: Wed, 29 Oct 2025 17:36:54 +0800 Subject: [PATCH 2/4] poll mechanism --- sky/utils/kubernetes/gpu_labeler.py | 105 +++++++++++++++++++++++++--- 1 file changed, 94 insertions(+), 11 deletions(-) diff --git a/sky/utils/kubernetes/gpu_labeler.py b/sky/utils/kubernetes/gpu_labeler.py index e0506d88a83..18392e89a6f 100644 --- a/sky/utils/kubernetes/gpu_labeler.py +++ b/sky/utils/kubernetes/gpu_labeler.py @@ -3,6 +3,7 @@ import hashlib import os import subprocess +import time from typing import Dict, Optional, Tuple import colorama @@ -13,6 +14,9 @@ from sky.utils import directory_utils from sky.utils import rich_utils +# Polling interval in seconds for job completion checks +JOB_COMPLETION_POLL_INTERVAL = 5 + def _format_string(str_to_format: str, colorama_format: str) -> str: return f'{colorama_format}{str_to_format}{colorama.Style.RESET_ALL}' @@ -166,6 +170,73 @@ def label(context: Optional[str] = None, wait_for_completion: bool = True): '`skypilot.co/accelerator: `. ') +def _poll_jobs_completion(jobs_to_node_names: Dict[str, str], + namespace: str, + context: Optional[str] = None, + timeout: int = 60 * 20) -> bool: + """Fallback polling method to check job completion status. + + This method polls the Kubernetes API to check job status instead of using + the watch API. It's used as a fallback when the watch API fails due to + resource version mismatches. + + Args: + jobs_to_node_names: A dictionary mapping job names to node names. + namespace: The namespace the jobs are in. + context: Optional Kubernetes context to use. + timeout: Timeout in seconds (default: 1200 seconds = 20 minutes). + + Returns: + True if all jobs completed successfully, False if any failed + or timed out. + """ + batch_v1 = kubernetes.batch_api(context=context) + start_time = time.time() + completed_jobs = [] + + print( + _format_string('Using polling method to check job completion...', + colorama.Style.DIM)) + + while time.time() - start_time < timeout: + try: + jobs = batch_v1.list_namespaced_job(namespace=namespace) + for job in jobs.items: + job_name = job.metadata.name + if job_name in jobs_to_node_names: + node_name = jobs_to_node_names[job_name] + if job.status and job.status.completion_time: + if job_name not in completed_jobs: + print( + _format_string( + f'GPU labeler job for node {node_name} ' + 'completed successfully', + colorama.Style.DIM)) + completed_jobs.append(job_name) + elif job.status and job.status.failed: + print( + _format_string( + f'GPU labeler job for node {node_name} failed', + colorama.Style.DIM)) + return False + + if len(completed_jobs) == len(jobs_to_node_names): + return True + + time.sleep(JOB_COMPLETION_POLL_INTERVAL) + except kubernetes.api_exception() as poll_error: + print( + _format_string(f'Polling error: {str(poll_error)}', + colorama.Fore.RED)) + time.sleep(JOB_COMPLETION_POLL_INTERVAL) + + print( + _format_string( + f'Timed out after waiting {timeout} seconds ' + 'for job to complete', colorama.Style.DIM)) + return False + + def wait_for_jobs_completion(jobs_to_node_names: Dict[str, str], namespace: str, context: Optional[str] = None, @@ -183,12 +254,23 @@ def wait_for_jobs_completion(jobs_to_node_names: Dict[str, str], batch_v1 = kubernetes.batch_api(context=context) completed_jobs = [] - def _watch_jobs(): - """Helper function to watch jobs with error handling.""" + def _watch_jobs(resource_version=None): + """Helper function to watch jobs with error handling. + + Args: + resource_version: Specific resource version to watch from. + If None, starts from the current state. + """ w = kubernetes.watch() - for event in w.stream(func=batch_v1.list_namespaced_job, - namespace=namespace, - timeout_seconds=timeout): + kwargs = { + 'namespace': namespace, + 'timeout_seconds': timeout, + } + # Only specify resource_version if explicitly provided + if resource_version is not None: + kwargs['resource_version'] = resource_version + + for event in w.stream(func=batch_v1.list_namespaced_job, **kwargs): job = event['object'] job_name = job.metadata.name if job_name in jobs_to_node_names: @@ -222,12 +304,13 @@ def _watch_jobs(): print( _format_string( 'Watch failed due to resource version mismatch. ' - 'Restarting watch...', colorama.Fore.YELLOW)) - # Restart watch without resource version - let Kubernetes choose - # starting point - result = _watch_jobs() - if result is not None: - return result + 'Falling back to polling method...', colorama.Fore.YELLOW)) + # Fall back to polling instead of watch API + # The watch API is unreliable when resource versions are changing + # rapidly or when there are multiple API server instances with + # different cache states + return _poll_jobs_completion(jobs_to_node_names, namespace, context, + timeout) else: # Re-raise other API exceptions raise From 656991cc58977090f6d5a9bcf2272a0832dc164d Mon Sep 17 00:00:00 2001 From: zpoint Date: Wed, 29 Oct 2025 17:45:18 +0800 Subject: [PATCH 3/4] revert --- sky/utils/kubernetes/gpu_labeler.py | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/sky/utils/kubernetes/gpu_labeler.py b/sky/utils/kubernetes/gpu_labeler.py index 18392e89a6f..1db837f4876 100644 --- a/sky/utils/kubernetes/gpu_labeler.py +++ b/sky/utils/kubernetes/gpu_labeler.py @@ -254,23 +254,12 @@ def wait_for_jobs_completion(jobs_to_node_names: Dict[str, str], batch_v1 = kubernetes.batch_api(context=context) completed_jobs = [] - def _watch_jobs(resource_version=None): - """Helper function to watch jobs with error handling. - - Args: - resource_version: Specific resource version to watch from. - If None, starts from the current state. - """ + def _watch_jobs(): + """Helper function to watch jobs with error handling.""" w = kubernetes.watch() - kwargs = { - 'namespace': namespace, - 'timeout_seconds': timeout, - } - # Only specify resource_version if explicitly provided - if resource_version is not None: - kwargs['resource_version'] = resource_version - - for event in w.stream(func=batch_v1.list_namespaced_job, **kwargs): + for event in w.stream(func=batch_v1.list_namespaced_job, + namespace=namespace, + timeout_seconds=timeout): job = event['object'] job_name = job.metadata.name if job_name in jobs_to_node_names: From 2814a88c1bf71904bc06d941e082f838ea7982b1 Mon Sep 17 00:00:00 2001 From: zpoint Date: Tue, 11 Nov 2025 16:33:12 +0800 Subject: [PATCH 4/4] set resource_version = 0 --- sky/utils/kubernetes/gpu_labeler.py | 154 ++++++---------------------- 1 file changed, 33 insertions(+), 121 deletions(-) diff --git a/sky/utils/kubernetes/gpu_labeler.py b/sky/utils/kubernetes/gpu_labeler.py index 1db837f4876..3a4ea946252 100644 --- a/sky/utils/kubernetes/gpu_labeler.py +++ b/sky/utils/kubernetes/gpu_labeler.py @@ -3,7 +3,6 @@ import hashlib import os import subprocess -import time from typing import Dict, Optional, Tuple import colorama @@ -14,9 +13,6 @@ from sky.utils import directory_utils from sky.utils import rich_utils -# Polling interval in seconds for job completion checks -JOB_COMPLETION_POLL_INTERVAL = 5 - def _format_string(str_to_format: str, colorama_format: str) -> str: return f'{colorama_format}{str_to_format}{colorama.Style.RESET_ALL}' @@ -170,73 +166,6 @@ def label(context: Optional[str] = None, wait_for_completion: bool = True): '`skypilot.co/accelerator: `. ') -def _poll_jobs_completion(jobs_to_node_names: Dict[str, str], - namespace: str, - context: Optional[str] = None, - timeout: int = 60 * 20) -> bool: - """Fallback polling method to check job completion status. - - This method polls the Kubernetes API to check job status instead of using - the watch API. It's used as a fallback when the watch API fails due to - resource version mismatches. - - Args: - jobs_to_node_names: A dictionary mapping job names to node names. - namespace: The namespace the jobs are in. - context: Optional Kubernetes context to use. - timeout: Timeout in seconds (default: 1200 seconds = 20 minutes). - - Returns: - True if all jobs completed successfully, False if any failed - or timed out. - """ - batch_v1 = kubernetes.batch_api(context=context) - start_time = time.time() - completed_jobs = [] - - print( - _format_string('Using polling method to check job completion...', - colorama.Style.DIM)) - - while time.time() - start_time < timeout: - try: - jobs = batch_v1.list_namespaced_job(namespace=namespace) - for job in jobs.items: - job_name = job.metadata.name - if job_name in jobs_to_node_names: - node_name = jobs_to_node_names[job_name] - if job.status and job.status.completion_time: - if job_name not in completed_jobs: - print( - _format_string( - f'GPU labeler job for node {node_name} ' - 'completed successfully', - colorama.Style.DIM)) - completed_jobs.append(job_name) - elif job.status and job.status.failed: - print( - _format_string( - f'GPU labeler job for node {node_name} failed', - colorama.Style.DIM)) - return False - - if len(completed_jobs) == len(jobs_to_node_names): - return True - - time.sleep(JOB_COMPLETION_POLL_INTERVAL) - except kubernetes.api_exception() as poll_error: - print( - _format_string(f'Polling error: {str(poll_error)}', - colorama.Fore.RED)) - time.sleep(JOB_COMPLETION_POLL_INTERVAL) - - print( - _format_string( - f'Timed out after waiting {timeout} seconds ' - 'for job to complete', colorama.Style.DIM)) - return False - - def wait_for_jobs_completion(jobs_to_node_names: Dict[str, str], namespace: str, context: Optional[str] = None, @@ -252,58 +181,41 @@ def wait_for_jobs_completion(jobs_to_node_names: Dict[str, str], True if the Job completed successfully, False if it failed or timed out. """ batch_v1 = kubernetes.batch_api(context=context) + w = kubernetes.watch() completed_jobs = [] - - def _watch_jobs(): - """Helper function to watch jobs with error handling.""" - w = kubernetes.watch() - for event in w.stream(func=batch_v1.list_namespaced_job, - namespace=namespace, - timeout_seconds=timeout): - job = event['object'] - job_name = job.metadata.name - if job_name in jobs_to_node_names: - node_name = jobs_to_node_names[job_name] - if job.status and job.status.completion_time: - print( - _format_string( - f'GPU labeler job for node {node_name} ' - 'completed successfully', colorama.Style.DIM)) - completed_jobs.append(job_name) - num_remaining_jobs = len(jobs_to_node_names) - len( - completed_jobs) - if num_remaining_jobs == 0: - w.stop() - return True - elif job.status and job.status.failed: - print( - _format_string( - f'GPU labeler job for node {node_name} failed', - colorama.Style.DIM)) + # Use resource_version="0" to start from the oldest available version. + # In multi-replica API server environments, replicas may be at different + # resource versions due to replication lag. Without specifying this, the + # watch may get version X from one replica but connect to another replica + # that only has up to version Y < X, causing "Too large resource version" + # errors. Using "0" ensures all replicas can serve the request from their + # oldest available version, avoiding version mismatches. + for event in w.stream(func=batch_v1.list_namespaced_job, + namespace=namespace, + timeout_seconds=timeout, + resource_version='0'): + job = event['object'] + job_name = job.metadata.name + if job_name in jobs_to_node_names: + node_name = jobs_to_node_names[job_name] + if job.status and job.status.completion_time: + print( + _format_string( + f'GPU labeler job for node {node_name} ' + 'completed successfully', colorama.Style.DIM)) + completed_jobs.append(job_name) + num_remaining_jobs = len(jobs_to_node_names) - len( + completed_jobs) + if num_remaining_jobs == 0: w.stop() - return False - return None # Timeout - - try: - result = _watch_jobs() - if result is not None: - return result - except kubernetes.api_exception() as e: - if e.status == 504 and 'Too large resource version' in str(e): - print( - _format_string( - 'Watch failed due to resource version mismatch. ' - 'Falling back to polling method...', colorama.Fore.YELLOW)) - # Fall back to polling instead of watch API - # The watch API is unreliable when resource versions are changing - # rapidly or when there are multiple API server instances with - # different cache states - return _poll_jobs_completion(jobs_to_node_names, namespace, context, - timeout) - else: - # Re-raise other API exceptions - raise - + return True + elif job.status and job.status.failed: + print( + _format_string( + f'GPU labeler job for node {node_name} failed', + colorama.Style.DIM)) + w.stop() + return False print( _format_string( f'Timed out after waiting {timeout} seconds '