From 0b5b85fee88ab3cd996e0b59afc95821e33ac865 Mon Sep 17 00:00:00 2001 From: droctothorpe Date: Mon, 31 Jul 2023 14:04:45 -0400 Subject: [PATCH 1/2] [SDK] Add namespace parameter to KatibClient Co-authored-by: andreafehrman Co-authored-by: ryanrusson --- .../kubeflow/katib/api/katib_client.py | 94 ++++++++++++++----- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py index 9728382d345..9e96e1d36fe 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py @@ -12,19 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing -from typing import Callable, List, Dict, Any import inspect +import multiprocessing import textwrap -import grpc import time +from typing import Any, Callable, Dict, List, Optional -from kubernetes import client, config +import grpc +import kubeflow.katib.katib_api_pb2 as katib_api_pb2 from kubeflow.katib import models from kubeflow.katib.api_client import ApiClient from kubeflow.katib.constants import constants from kubeflow.katib.utils import utils -import kubeflow.katib.katib_api_pb2 as katib_api_pb2 +from kubernetes import client, config class KatibClient(object): @@ -33,6 +33,7 @@ def __init__( config_file: str = None, context: str = None, client_configuration: client.Configuration = None, + namespace: str = utils.get_default_target_namespace(), ): """KatibClient constructor. @@ -43,6 +44,7 @@ def __init__( You have to provide valid configuration with Bearer token or with username and password. You can find an example here: https://github.com/kubernetes-client/python/blob/67f9c7a97081b4526470cad53576bc3b71fa6fcc/examples/remote_cluster.py#L31 + namespace: Target namespace. Can be overridden during method invocations. """ self.in_cluster = False @@ -58,9 +60,11 @@ def __init__( k8s_client = client.ApiClient(client_configuration) self.custom_api = client.CustomObjectsApi(k8s_client) self.api_client = ApiClient() + self.namespace = namespace def _is_ipython(self): """Returns whether we are running in notebook.""" + try: import IPython @@ -74,7 +78,7 @@ def _is_ipython(self): def create_experiment( self, experiment: models.V1beta1Experiment, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, ): """Create the Katib Experiment. @@ -87,6 +91,8 @@ def create_experiment( RuntimeError: Failed to create Katib Experiment. """ + namespace = namespace or self.namespace + try: self.custom_api.create_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -129,7 +135,7 @@ def tune( objective: Callable, parameters: Dict[str, Any], base_image: str = constants.BASE_IMAGE_TENSORFLOW, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, algorithm_name: str = "random", objective_metric_name: str = None, additional_metric_names: List[str] = [], @@ -183,6 +189,8 @@ def tune( RuntimeError: Failed to create Katib Experiment. """ + namespace = namespace or self.namespace + # Create Katib Experiment template. experiment = models.V1beta1Experiment( api_version=f"{constants.KUBEFLOW_GROUP}/{constants.KATIB_VERSION}", @@ -316,7 +324,7 @@ def tune( def get_experiment( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """Get the Katib Experiment. @@ -335,6 +343,8 @@ def get_experiment( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + try: thread = self.custom_api.get_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -355,7 +365,7 @@ def get_experiment( def list_experiments( self, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """List of all Katib Experiments in namespace. @@ -374,6 +384,8 @@ def list_experiments( RuntimeError: Failed to list Katib Experiments. """ + namespace = namespace or self.namespace + result = [] try: thread = self.custom_api.list_namespaced_custom_object( @@ -403,7 +415,7 @@ def list_experiments( def get_experiment_conditions( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, experiment: models.V1beta1Experiment = None, timeout: int = constants.DEFAULT_TIMEOUT, ): @@ -428,6 +440,8 @@ def get_experiment_conditions( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + if experiment is None: experiment = self.get_experiment(name, namespace, timeout) @@ -443,7 +457,7 @@ def get_experiment_conditions( def is_experiment_created( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, experiment: models.V1beta1Experiment = None, timeout: int = constants.DEFAULT_TIMEOUT, ): @@ -464,6 +478,8 @@ def is_experiment_created( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + return utils.has_condition( self.get_experiment_conditions(name, namespace, experiment, timeout), constants.EXPERIMENT_CONDITION_CREATED, @@ -472,7 +488,7 @@ def is_experiment_created( def is_experiment_running( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, experiment: models.V1beta1Experiment = None, timeout: int = constants.DEFAULT_TIMEOUT, ): @@ -493,6 +509,8 @@ def is_experiment_running( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + return utils.has_condition( self.get_experiment_conditions(name, namespace, experiment, timeout), constants.EXPERIMENT_CONDITION_RUNNING, @@ -501,7 +519,7 @@ def is_experiment_running( def is_experiment_restarting( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, experiment: models.V1beta1Experiment = None, timeout: int = constants.DEFAULT_TIMEOUT, ): @@ -521,6 +539,8 @@ def is_experiment_restarting( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + return utils.has_condition( self.get_experiment_conditions(name, namespace, experiment, timeout), constants.EXPERIMENT_CONDITION_RESTARTING, @@ -529,7 +549,7 @@ def is_experiment_restarting( def is_experiment_succeeded( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, experiment: models.V1beta1Experiment = None, timeout: int = constants.DEFAULT_TIMEOUT, ): @@ -549,6 +569,8 @@ def is_experiment_succeeded( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + return utils.has_condition( self.get_experiment_conditions(name, namespace, experiment, timeout), constants.EXPERIMENT_CONDITION_SUCCEEDED, @@ -557,7 +579,7 @@ def is_experiment_succeeded( def is_experiment_failed( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, experiment: models.V1beta1Experiment = None, timeout: int = constants.DEFAULT_TIMEOUT, ): @@ -577,6 +599,8 @@ def is_experiment_failed( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + return utils.has_condition( self.get_experiment_conditions(name, namespace, experiment, timeout), constants.EXPERIMENT_CONDITION_FAILED, @@ -585,7 +609,7 @@ def is_experiment_failed( def wait_for_experiment_condition( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, expected_condition: str = constants.EXPERIMENT_CONDITION_SUCCEEDED, timeout: int = 600, polling_interval: int = 15, @@ -613,6 +637,8 @@ def wait_for_experiment_condition( or timeout to get Katib Experiment. """ + namespace = namespace or self.namespace + for _ in range(round(timeout / polling_interval)): # We should get Experiment only once per cycle and check the statuses. @@ -696,7 +722,7 @@ def wait_for_experiment_condition( def edit_experiment_budget( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, max_trial_count: int = None, parallel_trial_count: int = None, max_failed_trial_count: int = None, @@ -725,6 +751,8 @@ def edit_experiment_budget( reaches Failed condition. """ + namespace = namespace or self.namespace + # The new Trial budget must be set. if ( max_trial_count is None @@ -766,7 +794,7 @@ def edit_experiment_budget( def delete_experiment( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, delete_options: client.V1DeleteOptions = None, ): """Delete the Katib Experiment. @@ -782,6 +810,8 @@ def delete_experiment( RuntimeError: Failed to delete Katib Experiment. """ + namespace = namespace or self.namespace + try: self.custom_api.delete_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -804,7 +834,7 @@ def delete_experiment( def get_suggestion( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """Get the Katib Suggestion. @@ -823,6 +853,8 @@ def get_suggestion( RuntimeError: Failed to get Katib Suggestion. """ + namespace = namespace or self.namespace + try: thread = self.custom_api.get_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -843,7 +875,7 @@ def get_suggestion( def list_suggestions( self, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """List of all Katib Suggestion in namespace. @@ -862,6 +894,8 @@ def list_suggestions( RuntimeError: Failed to list Katib Suggestions. """ + namespace = namespace or self.namespace + result = [] try: thread = self.custom_api.list_namespaced_custom_object( @@ -891,7 +925,7 @@ def list_suggestions( def get_trial( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """Get the Katib Trial. @@ -910,6 +944,8 @@ def get_trial( RuntimeError: Failed to get Katib Trial. """ + namespace = namespace or self.namespace + try: thread = self.custom_api.get_namespaced_custom_object( constants.KUBEFLOW_GROUP, @@ -931,7 +967,7 @@ def get_trial( def list_trials( self, experiment_name: str = None, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """List of all Trials in namespace. If Experiment name is set, @@ -952,6 +988,8 @@ def list_trials( RuntimeError: Failed to list Katib Trials. """ + namespace = namespace or self.namespace + result = [] try: if experiment_name is None: @@ -989,7 +1027,7 @@ def list_trials( def get_success_trial_details( self, experiment_name: str = None, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """Get the Succeeded Trial details. If Experiment name is set, @@ -1010,6 +1048,8 @@ def get_success_trial_details( RuntimeError: Failed to list Katib Trials. """ + namespace = namespace or self.namespace + result = [] try: if experiment_name is None: @@ -1060,7 +1100,7 @@ def get_success_trial_details( def get_optimal_hyperparameters( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, timeout: int = constants.DEFAULT_TIMEOUT, ): """Get the current optimal Trial from the Experiment. @@ -1080,6 +1120,8 @@ def get_optimal_hyperparameters( RuntimeError: Failed to get Katib Experiment. """ + namespace = namespace or self.namespace + experiment = self.get_experiment(name, namespace, timeout) if ( experiment.status @@ -1093,7 +1135,7 @@ def get_optimal_hyperparameters( def get_trial_metrics( self, name: str, - namespace: str = utils.get_default_target_namespace(), + namespace: Optional[str] = None, db_manager_address: str = constants.DEFAULT_DB_MANAGER_ADDRESS, timeout: str = constants.DEFAULT_TIMEOUT, ): @@ -1125,6 +1167,8 @@ def get_trial_metrics( RuntimeError: Unable to get Trial metrics. """ + namespace = namespace or self.namespace + db_manager_address = db_manager_address.split(":") channel = grpc.beta.implementations.insecure_channel( db_manager_address[0], int(db_manager_address[1]) From 4e845429b5669691f2d8c2092bf47ddeb2578694 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 31 Jul 2023 19:48:04 -0400 Subject: [PATCH 2/2] Update sdk/python/v1beta1/kubeflow/katib/api/katib_client.py Co-authored-by: Andrey Velichkevich --- sdk/python/v1beta1/kubeflow/katib/api/katib_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py index 9e96e1d36fe..0929303cff9 100644 --- a/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py +++ b/sdk/python/v1beta1/kubeflow/katib/api/katib_client.py @@ -44,7 +44,7 @@ def __init__( You have to provide valid configuration with Bearer token or with username and password. You can find an example here: https://github.com/kubernetes-client/python/blob/67f9c7a97081b4526470cad53576bc3b71fa6fcc/examples/remote_cluster.py#L31 - namespace: Target namespace. Can be overridden during method invocations. + namespace: Target Kubernetes namespace. Can be overridden during method invocations. """ self.in_cluster = False