From ed5b531750f272eeeb00ce56744f088e5d14560e Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Mon, 24 Jan 2022 16:27:44 -0500 Subject: [PATCH 1/8] Added scheduling to customtrainingjob --- google/cloud/aiplatform/training_jobs.py | 70 ++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 38aafef4fa..5839edb2d3 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1379,6 +1379,8 @@ def _prepare_training_task_inputs_and_output_dir( base_output_dir: Optional[str] = None, service_account: Optional[str] = None, network: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, enable_web_access: bool = False, tensorboard: Optional[str] = None, ) -> Tuple[Dict, str]: @@ -1398,6 +1400,13 @@ def _prepare_training_task_inputs_and_output_dir( should be peered. For example, projects/12345/global/networks/myVPC. Private services access must already be configured for the network. If left unspecified, the job is not peered with any network. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. enable_web_access (bool): Whether you want Vertex AI to enable interactive shell access to training containers. @@ -1442,6 +1451,14 @@ def _prepare_training_task_inputs_and_output_dir( if enable_web_access: training_task_inputs["enable_web_access"] = enable_web_access + if timeout or restart_job_on_worker_restart: + timeout = f"{timeout}s" if timeout else None + scheduling = { + "timeout": timeout, + "restart_job_on_worker_restart": restart_job_on_worker_restart, + } + training_task_inputs["scheduling"] = scheduling + return training_task_inputs, base_output_dir @property @@ -1794,6 +1811,8 @@ def run( test_filter_split: Optional[str] = None, predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, enable_web_access: bool = False, tensorboard: Optional[str] = None, sync=True, @@ -2014,6 +2033,13 @@ def run( that piece is ignored by the pipeline. Supported only for tabular and time series Datasets. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. enable_web_access (bool): Whether you want Vertex AI to enable interactive shell access to training containers. @@ -2117,6 +2143,8 @@ def _run( test_filter_split: Optional[str] = None, predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, enable_web_access: bool = False, tensorboard: Optional[str] = None, reduction_server_container_uri: Optional[str] = None, @@ -2237,6 +2265,13 @@ def _run( that piece is ignored by the pipeline. Supported only for tabular and time series Datasets. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. enable_web_access (bool): Whether you want Vertex AI to enable interactive shell access to training containers. @@ -2309,6 +2344,8 @@ def _run( base_output_dir=base_output_dir, service_account=service_account, network=network, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, enable_web_access=enable_web_access, tensorboard=tensorboard, ) @@ -2598,6 +2635,8 @@ def run( test_filter_split: Optional[str] = None, predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, enable_web_access: bool = False, tensorboard: Optional[str] = None, sync=True, @@ -2811,6 +2850,13 @@ def run( that piece is ignored by the pipeline. Supported only for tabular and time series Datasets. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. enable_web_access (bool): Whether you want Vertex AI to enable interactive shell access to training containers. @@ -2876,6 +2922,8 @@ def run( test_filter_split=test_filter_split, predefined_split_column_name=predefined_split_column_name, timestamp_split_column_name=timestamp_split_column_name, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, enable_web_access=enable_web_access, tensorboard=tensorboard, reduction_server_container_uri=reduction_server_container_uri @@ -2912,6 +2960,8 @@ def _run( test_filter_split: Optional[str] = None, predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, enable_web_access: bool = False, tensorboard: Optional[str] = None, reduction_server_container_uri: Optional[str] = None, @@ -2965,6 +3015,13 @@ def _run( should be peered. For example, projects/12345/global/networks/myVPC. Private services access must already be configured for the network. If left unspecified, the job is not peered with any network. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. bigquery_destination (str): The BigQuery project location where the training data is to be written to. In the given project a new dataset is created @@ -3094,6 +3151,8 @@ def _run( base_output_dir=base_output_dir, service_account=service_account, network=network, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, enable_web_access=enable_web_access, tensorboard=tensorboard, ) @@ -5682,6 +5741,8 @@ def _run( predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, bigquery_destination: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, enable_web_access: bool = False, tensorboard: Optional[str] = None, reduction_server_container_uri: Optional[str] = None, @@ -5785,6 +5846,13 @@ def _run( that piece is ignored by the pipeline. Supported only for tabular and time series Datasets. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. enable_web_access (bool): Whether you want Vertex AI to enable interactive shell access to training containers. @@ -5851,6 +5919,8 @@ def _run( base_output_dir=base_output_dir, service_account=service_account, network=network, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, enable_web_access=enable_web_access, tensorboard=tensorboard, ) From f41937ffca768f5a7c2a0fdc75e9f498d9944a27 Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Mon, 24 Jan 2022 17:30:44 -0500 Subject: [PATCH 2/8] Added unit tests Fixed tests Fixed test fix: Broken test --- google/cloud/aiplatform/training_jobs.py | 13 ++ tests/unit/aiplatform/test_training_jobs.py | 228 ++++++++++++++++++++ 2 files changed, 241 insertions(+) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 5839edb2d3..8a2986fe65 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -2106,6 +2106,8 @@ def run( test_filter_split=test_filter_split, predefined_split_column_name=predefined_split_column_name, timestamp_split_column_name=timestamp_split_column_name, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, enable_web_access=enable_web_access, tensorboard=tensorboard, reduction_server_container_uri=reduction_server_container_uri @@ -5432,6 +5434,8 @@ def run( test_filter_split: Optional[str] = None, predefined_split_column_name: Optional[str] = None, timestamp_split_column_name: Optional[str] = None, + timeout: Optional[int] = None, + restart_job_on_worker_restart: bool = False, enable_web_access: bool = False, tensorboard: Optional[str] = None, sync=True, @@ -5645,6 +5649,13 @@ def run( that piece is ignored by the pipeline. Supported only for tabular and time series Datasets. + timeout (int): + The maximum job running time in seconds. The default is 7 days. + restart_job_on_worker_restart (bool): + Restarts the entire CustomJob if a worker + gets restarted. This feature can be used by + distributed training jobs that are not resilient + to workers leaving and joining a job. enable_web_access (bool): Whether you want Vertex AI to enable interactive shell access to training containers. @@ -5705,6 +5716,8 @@ def run( predefined_split_column_name=predefined_split_column_name, timestamp_split_column_name=timestamp_split_column_name, bigquery_destination=bigquery_destination, + timeout=timeout, + restart_job_on_worker_restart=restart_job_on_worker_restart, enable_web_access=enable_web_access, tensorboard=tensorboard, reduction_server_container_uri=reduction_server_container_uri diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index b89a69ce24..10446dcf9a 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -183,6 +183,10 @@ _TEST_MODEL_ENCRYPTION_SPEC = gca_encryption_spec.EncryptionSpec( kms_key_name=_TEST_MODEL_ENCRYPTION_KEY_NAME ) + +_TEST_TIMEOUT = 1000 +_TEST_RESTART_JOB_ON_WORKER_RESTART = True + _TEST_ENABLE_WEB_ACCESS = True _TEST_WEB_ACCESS_URIS = {"workerpool0-0": "uri"} @@ -202,6 +206,17 @@ def _get_custom_job_proto_with_enable_web_access(state=None, name=None, version= return custom_job_proto +def _get_custom_job_proto_with_scheduling(state=None, name=None, version="v1"): + custom_job_proto = copy.deepcopy(_TEST_BASE_CUSTOM_JOB_PROTO) + custom_job_proto.name = name + custom_job_proto.state = state + + custom_job_proto.job_spec.enable_web_access = _TEST_ENABLE_WEB_ACCESS + if state == gca_job_state.JobState.JOB_STATE_RUNNING: + custom_job_proto.web_access_uris = _TEST_WEB_ACCESS_URIS + return custom_job_proto + + def local_copy_method(path): shutil.copy(path, ".") return pathlib.Path(path).name @@ -306,6 +321,40 @@ def mock_get_backing_custom_job_with_enable_web_access(): yield get_custom_job_mock +@pytest.fixture +def mock_get_backing_custom_job_with_scheduling(): + with patch.object( + job_service_client.JobServiceClient, "get_custom_job" + ) as get_custom_job_mock: + get_custom_job_mock.side_effect = [ + _get_custom_job_proto_with_scheduling( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_PENDING, + ), + _get_custom_job_proto_with_scheduling( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto_with_scheduling( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto_with_scheduling( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto_with_scheduling( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_SUCCEEDED, + ), + _get_custom_job_proto_with_scheduling( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_SUCCEEDED, + ), + ] + yield get_custom_job_mock + + class TestTrainingScriptPythonPackagerHelpers: def setup_method(self): importlib.reload(initializer) @@ -548,6 +597,23 @@ def make_training_pipeline_with_enable_web_access(state): return training_pipeline +def make_training_pipeline_with_scheduling(state): + training_pipeline = gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, + state=state, + training_task_inputs={ + # "enable_web_access": _TEST_ENABLE_WEB_ACCESS, + "timeout": f"{_TEST_TIMEOUT}s", + "restart_job_on_worker_restart": _TEST_RESTART_JOB_ON_WORKER_RESTART, + }, + ) + if state == gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING: + training_pipeline.training_task_metadata = { + "backingCustomJob": _TEST_CUSTOM_JOB_RESOURCE_NAME + } + return training_pipeline + + @pytest.fixture def mock_pipeline_service_get(): with mock.patch.object( @@ -619,6 +685,35 @@ def mock_pipeline_service_get_with_enable_web_access(): yield mock_get_training_pipeline +@pytest.fixture +def mock_pipeline_service_get_with_scheduling(): + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "get_training_pipeline" + ) as mock_get_training_pipeline: + mock_get_training_pipeline.side_effect = [ + make_training_pipeline_with_scheduling( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_PENDING, + ), + make_training_pipeline_with_scheduling( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING, + ), + make_training_pipeline_with_scheduling( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING, + ), + make_training_pipeline_with_scheduling( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING, + ), + make_training_pipeline_with_scheduling( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED, + ), + make_training_pipeline_with_scheduling( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED, + ), + ] + + yield mock_get_training_pipeline + + @pytest.fixture def mock_pipeline_service_cancel(): with mock.patch.object( @@ -650,6 +745,17 @@ def mock_pipeline_service_create_with_enable_web_access(): yield mock_create_training_pipeline +@pytest.fixture +def mock_pipeline_service_create_with_scheduling(): + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "create_training_pipeline" + ) as mock_create_training_pipeline: + mock_create_training_pipeline.return_value = make_training_pipeline_with_scheduling( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_PENDING, + ) + yield mock_create_training_pipeline + + @pytest.fixture def mock_pipeline_service_get_with_no_model_to_upload(): with mock.patch.object( @@ -1397,6 +1503,47 @@ def test_run_call_pipeline_service_create_with_enable_web_access( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) + @pytest.mark.usefixtures( + "mock_pipeline_service_create_with_scheduling", + "mock_pipeline_service_get_with_scheduling", + "mock_get_backing_custom_job_with_scheduling", + "mock_python_package_to_gcs", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomTrainingJob( + display_name=_TEST_DISPLAY_NAME, + script_path=_TEST_LOCAL_SCRIPT_FILE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + if not sync: + job.wait() + + assert job._gca_resource == make_training_pipeline_with_scheduling( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", @@ -2803,6 +2950,46 @@ def test_run_call_pipeline_service_create_with_enable_web_access( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) + @pytest.mark.usefixtures( + "mock_pipeline_service_create_with_scheduling", + "mock_pipeline_service_get_with_scheduling", + "mock_get_backing_custom_job_with_scheduling", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomContainerTrainingJob( + display_name=_TEST_DISPLAY_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + command=_TEST_TRAINING_CONTAINER_CMD, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + if not sync: + job.wait() + + assert job._gca_resource == make_training_pipeline_with_scheduling( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + @pytest.mark.parametrize("sync", [True, False]) def test_run_returns_none_if_no_model_to_upload( self, @@ -4481,6 +4668,47 @@ def test_run_call_pipeline_service_create_with_enable_web_access( gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) + @pytest.mark.usefixtures( + "mock_pipeline_service_create_with_scheduling", + "mock_pipeline_service_get_with_scheduling", + "mock_get_backing_custom_job_with_scheduling", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomPythonPackageTrainingJob( + display_name=_TEST_DISPLAY_NAME, + python_package_gcs_uri=_TEST_OUTPUT_PYTHON_PACKAGE_PATH, + python_module_name=_TEST_PYTHON_MODULE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + timeout=_TEST_TIMEOUT, + restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART, + sync=sync, + ) + + if not sync: + job.wait() + + assert job._gca_resource == make_training_pipeline_with_scheduling( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", From 490034ec08908469c522ee1a13057ef0c4e92b26 Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Fri, 28 Jan 2022 18:17:33 -0500 Subject: [PATCH 3/8] Added integration test --- tests/system/aiplatform/test_e2e_tabular.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/system/aiplatform/test_e2e_tabular.py b/tests/system/aiplatform/test_e2e_tabular.py index 3fcae149c3..387851a840 100644 --- a/tests/system/aiplatform/test_e2e_tabular.py +++ b/tests/system/aiplatform/test_e2e_tabular.py @@ -109,6 +109,8 @@ def test_end_to_end_tabular(self, shared_state): ds, replica_count=1, model_display_name=self._make_display_name("custom-housing-model"), + timeout=1234, + restart_job_on_worker_restart=True, enable_web_access=True, sync=False, ) @@ -147,6 +149,19 @@ def test_end_to_end_tabular(self, shared_state): # Send online prediction with same instance to both deployed models # This sample is taken from an observation where median_house_value = 94600 custom_endpoint.wait() + + # Check scheduling is correctly set + assert ( + custom_job._gca_resource.training_task_inputs["scheduling"]["timeout"] + == "1234s" + ) + assert ( + custom_job._gca_resource.training_task_inputs["scheduling"][ + "restartJobOnWorkerRestart" + ] + == True + ) + custom_prediction = custom_endpoint.predict([_INSTANCE]) custom_batch_prediction_job.wait() From 0171ccc56b14d64c06cf6533ae31cd52606aedfe Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Fri, 28 Jan 2022 19:05:07 -0500 Subject: [PATCH 4/8] Removed comment --- tests/unit/aiplatform/test_training_jobs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 10446dcf9a..e14c3b4684 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -602,7 +602,6 @@ def make_training_pipeline_with_scheduling(state): name=_TEST_PIPELINE_RESOURCE_NAME, state=state, training_task_inputs={ - # "enable_web_access": _TEST_ENABLE_WEB_ACCESS, "timeout": f"{_TEST_TIMEOUT}s", "restart_job_on_worker_restart": _TEST_RESTART_JOB_ON_WORKER_RESTART, }, From 0d4f9523e4ef952422c3b6af18fe25ddf765ac9c Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Mon, 31 Jan 2022 14:30:10 -0500 Subject: [PATCH 5/8] Updated e2e tabular test --- tests/system/aiplatform/test_e2e_tabular.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/aiplatform/test_e2e_tabular.py b/tests/system/aiplatform/test_e2e_tabular.py index 387851a840..df6c21f2c3 100644 --- a/tests/system/aiplatform/test_e2e_tabular.py +++ b/tests/system/aiplatform/test_e2e_tabular.py @@ -159,7 +159,7 @@ def test_end_to_end_tabular(self, shared_state): custom_job._gca_resource.training_task_inputs["scheduling"][ "restartJobOnWorkerRestart" ] - == True + is True ) custom_prediction = custom_endpoint.predict([_INSTANCE]) From 316c7b4d9cd9c230ea0d16f9bfd4d426e8a0ff2f Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Mon, 31 Jan 2022 17:13:20 -0500 Subject: [PATCH 6/8] Fixed lint issue --- google/cloud/aiplatform/training_jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 8a2986fe65..a0e8ed8125 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -5655,7 +5655,7 @@ def run( Restarts the entire CustomJob if a worker gets restarted. This feature can be used by distributed training jobs that are not resilient - to workers leaving and joining a job. + to workers leaving and joining a job. enable_web_access (bool): Whether you want Vertex AI to enable interactive shell access to training containers. From 7b9db49a855fb6f3f356032afbab5ccc5dc3c3e8 Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Mon, 14 Feb 2022 14:00:41 -0500 Subject: [PATCH 7/8] Simplfied tests --- tests/unit/aiplatform/test_training_jobs.py | 55 +++------------------ 1 file changed, 8 insertions(+), 47 deletions(-) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index e14c3b4684..6486e3b15e 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -69,7 +69,7 @@ from google.cloud import storage from google.protobuf import json_format from google.protobuf import struct_pb2 - +from google.protobuf import duration_pb2 # type: ignore _TEST_BUCKET_NAME = "test-bucket" _TEST_GCS_PATH_WITHOUT_BUCKET = "path/to/folder" @@ -211,9 +211,13 @@ def _get_custom_job_proto_with_scheduling(state=None, name=None, version="v1"): custom_job_proto.name = name custom_job_proto.state = state - custom_job_proto.job_spec.enable_web_access = _TEST_ENABLE_WEB_ACCESS - if state == gca_job_state.JobState.JOB_STATE_RUNNING: - custom_job_proto.web_access_uris = _TEST_WEB_ACCESS_URIS + custom_job_proto.job_spec.scheduling.timeout = duration_pb2.Duration( + seconds=_TEST_TIMEOUT + ) + custom_job_proto.job_spec.scheduling.restart_job_on_worker_restart = ( + _TEST_RESTART_JOB_ON_WORKER_RESTART + ) + return custom_job_proto @@ -321,40 +325,6 @@ def mock_get_backing_custom_job_with_enable_web_access(): yield get_custom_job_mock -@pytest.fixture -def mock_get_backing_custom_job_with_scheduling(): - with patch.object( - job_service_client.JobServiceClient, "get_custom_job" - ) as get_custom_job_mock: - get_custom_job_mock.side_effect = [ - _get_custom_job_proto_with_scheduling( - name=_TEST_CUSTOM_JOB_RESOURCE_NAME, - state=gca_job_state.JobState.JOB_STATE_PENDING, - ), - _get_custom_job_proto_with_scheduling( - name=_TEST_CUSTOM_JOB_RESOURCE_NAME, - state=gca_job_state.JobState.JOB_STATE_RUNNING, - ), - _get_custom_job_proto_with_scheduling( - name=_TEST_CUSTOM_JOB_RESOURCE_NAME, - state=gca_job_state.JobState.JOB_STATE_RUNNING, - ), - _get_custom_job_proto_with_scheduling( - name=_TEST_CUSTOM_JOB_RESOURCE_NAME, - state=gca_job_state.JobState.JOB_STATE_RUNNING, - ), - _get_custom_job_proto_with_scheduling( - name=_TEST_CUSTOM_JOB_RESOURCE_NAME, - state=gca_job_state.JobState.JOB_STATE_SUCCEEDED, - ), - _get_custom_job_proto_with_scheduling( - name=_TEST_CUSTOM_JOB_RESOURCE_NAME, - state=gca_job_state.JobState.JOB_STATE_SUCCEEDED, - ), - ] - yield get_custom_job_mock - - class TestTrainingScriptPythonPackagerHelpers: def setup_method(self): importlib.reload(initializer) @@ -1505,14 +1475,11 @@ def test_run_call_pipeline_service_create_with_enable_web_access( @pytest.mark.usefixtures( "mock_pipeline_service_create_with_scheduling", "mock_pipeline_service_get_with_scheduling", - "mock_get_backing_custom_job_with_scheduling", "mock_python_package_to_gcs", ) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): - caplog.set_level(logging.INFO) - aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME, @@ -2952,13 +2919,10 @@ def test_run_call_pipeline_service_create_with_enable_web_access( @pytest.mark.usefixtures( "mock_pipeline_service_create_with_scheduling", "mock_pipeline_service_get_with_scheduling", - "mock_get_backing_custom_job_with_scheduling", ) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): - caplog.set_level(logging.INFO) - aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME, @@ -4670,13 +4634,10 @@ def test_run_call_pipeline_service_create_with_enable_web_access( @pytest.mark.usefixtures( "mock_pipeline_service_create_with_scheduling", "mock_pipeline_service_get_with_scheduling", - "mock_get_backing_custom_job_with_scheduling", ) @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): - caplog.set_level(logging.INFO) - aiplatform.init( project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME, From ca17bc5a92cebeb7702a98cf4038a2d8f38a3a42 Mon Sep 17 00:00:00 2001 From: ivanmkc Date: Mon, 14 Feb 2022 14:59:40 -0500 Subject: [PATCH 8/8] Added more assertions --- tests/unit/aiplatform/test_training_jobs.py | 30 +++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index 6486e3b15e..6eb1a738cf 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -1510,6 +1510,16 @@ def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) + assert ( + job._gca_resource.state + == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + assert job._gca_resource.training_task_inputs["timeout"] == f"{_TEST_TIMEOUT}s" + assert ( + job._gca_resource.training_task_inputs["restart_job_on_worker_restart"] + == _TEST_RESTART_JOB_ON_WORKER_RESTART + ) + @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload", @@ -2953,6 +2963,16 @@ def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) + assert ( + job._gca_resource.state + == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + assert job._gca_resource.training_task_inputs["timeout"] == f"{_TEST_TIMEOUT}s" + assert ( + job._gca_resource.training_task_inputs["restart_job_on_worker_restart"] + == _TEST_RESTART_JOB_ON_WORKER_RESTART + ) + @pytest.mark.parametrize("sync", [True, False]) def test_run_returns_none_if_no_model_to_upload( self, @@ -4669,6 +4689,16 @@ def test_run_call_pipeline_service_create_with_scheduling(self, sync, caplog): gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED ) + assert ( + job._gca_resource.state + == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + assert job._gca_resource.training_task_inputs["timeout"] == f"{_TEST_TIMEOUT}s" + assert ( + job._gca_resource.training_task_inputs["restart_job_on_worker_restart"] + == _TEST_RESTART_JOB_ON_WORKER_RESTART + ) + @pytest.mark.usefixtures( "mock_pipeline_service_create_with_no_model_to_upload", "mock_pipeline_service_get_with_no_model_to_upload",