From 9371b4fd3f7529636fc323a7914960d0c6a70db4 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Thu, 15 Jun 2023 11:38:26 -0700 Subject: [PATCH] feat: Add additional scheduled pipelines client getters and unit tests. PiperOrigin-RevId: 540643400 --- .../aiplatform/preview/schedule/schedules.py | 40 ++++ .../aiplatform/test_pipeline_job_schedules.py | 208 ++++++++++++++++++ 2 files changed, 248 insertions(+) diff --git a/google/cloud/aiplatform/preview/schedule/schedules.py b/google/cloud/aiplatform/preview/schedule/schedules.py index 4209fb741c..3b34626814 100644 --- a/google/cloud/aiplatform/preview/schedule/schedules.py +++ b/google/cloud/aiplatform/preview/schedule/schedules.py @@ -156,6 +156,46 @@ def state(self) -> Optional[gca_schedule.Schedule.State]: self._sync_gca_resource() return self._gca_resource.state + @property + def max_run_count(self) -> int: + """Current Schedule max_run_count. + + Returns: + Schedule max_run_count. + """ + self._sync_gca_resource() + return self._gca_resource.max_run_count + + @property + def cron_expression(self) -> str: + """Current Schedule cron expression. + + Returns: + Schedule cron expression. + """ + self._sync_gca_resource() + return self._gca_resource.cron + + @property + def max_concurrent_run_count(self) -> int: + """Current Schedule max_concurrent_run_count. + + Returns: + Schedule max_concurrent_run_count. + """ + self._sync_gca_resource() + return self._gca_resource.max_concurrent_run_count + + @property + def allow_queueing(self) -> bool: + """Whether current Schedule allows queueing. + + Returns: + Schedule allow_queueing. + """ + self._sync_gca_resource() + return self._gca_resource.allow_queueing + def _block_until_complete(self) -> None: """Helper method to block and check on Schedule until complete.""" # Used these numbers so failures surface fast diff --git a/tests/unit/aiplatform/test_pipeline_job_schedules.py b/tests/unit/aiplatform/test_pipeline_job_schedules.py index 019c6f69e9..85368aab1f 100644 --- a/tests/unit/aiplatform/test_pipeline_job_schedules.py +++ b/tests/unit/aiplatform/test_pipeline_job_schedules.py @@ -1377,3 +1377,211 @@ def test_call_schedule_service_update_before_create( assert e.match( regexp=r"Not updating PipelineJobSchedule: PipelineJobSchedule must be active or completed." ) + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_get_max_run_count_before_create( + self, + mock_schedule_service_create, + mock_schedule_service_get, + mock_schedule_bucket_exists, + job_spec, + mock_load_yaml_and_json, + ): + """Gets the PipelineJobSchedule max_run_count before creating. + + Raises error because PipelineJobSchedule should be created first. + """ + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS, + enable_caching=True, + ) + + pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule( + pipeline_job=job, + display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME, + ) + + with pytest.raises(RuntimeError) as e: + pipeline_job_schedule.max_run_count + + assert e.match(regexp=r"PipelineJobSchedule resource has not been created.") + + pipeline_job_schedule.create( + cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION, + max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, + max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + create_request_timeout=None, + ) + + pipeline_job_schedule.max_run_count + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_get_cron_expression_before_create( + self, + mock_schedule_service_create, + mock_schedule_service_get, + mock_schedule_bucket_exists, + job_spec, + mock_load_yaml_and_json, + ): + """Gets the PipelineJobSchedule cron before creating. + + Raises error because PipelineJobSchedule should be created first. + """ + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS, + enable_caching=True, + ) + + pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule( + pipeline_job=job, + display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME, + ) + + with pytest.raises(RuntimeError) as e: + pipeline_job_schedule.cron_expression + + assert e.match(regexp=r"PipelineJobSchedule resource has not been created.") + + pipeline_job_schedule.create( + cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION, + max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, + max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + create_request_timeout=None, + ) + + pipeline_job_schedule.cron_expression + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_get_max_concurrent_run_count_before_create( + self, + mock_schedule_service_create, + mock_schedule_service_get, + mock_schedule_bucket_exists, + job_spec, + mock_load_yaml_and_json, + ): + """Gets the PipelineJobSchedule max_concurrent_run_count before creating. + + Raises error because PipelineJobSchedule should be created first. + """ + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS, + enable_caching=True, + ) + + pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule( + pipeline_job=job, + display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME, + ) + + with pytest.raises(RuntimeError) as e: + pipeline_job_schedule.max_concurrent_run_count + + assert e.match(regexp=r"PipelineJobSchedule resource has not been created.") + + pipeline_job_schedule.create( + cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION, + max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, + max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + create_request_timeout=None, + ) + + pipeline_job_schedule.max_concurrent_run_count + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_get_allow_queueing_before_create( + self, + mock_schedule_service_create, + mock_schedule_service_get, + mock_schedule_bucket_exists, + job_spec, + mock_load_yaml_and_json, + ): + """Gets the PipelineJobSchedule allow_queueing before creating. + + Raises error because PipelineJobSchedule should be created first. + """ + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + input_artifacts=_TEST_PIPELINE_INPUT_ARTIFACTS, + enable_caching=True, + ) + + pipeline_job_schedule = pipeline_job_schedules.PipelineJobSchedule( + pipeline_job=job, + display_name=_TEST_PIPELINE_JOB_SCHEDULE_DISPLAY_NAME, + ) + + with pytest.raises(RuntimeError) as e: + pipeline_job_schedule.allow_queueing + + assert e.match(regexp=r"PipelineJobSchedule resource has not been created.") + + pipeline_job_schedule.create( + cron_expression=_TEST_PIPELINE_JOB_SCHEDULE_CRON_EXPRESSION, + max_concurrent_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_CONCURRENT_RUN_COUNT, + max_run_count=_TEST_PIPELINE_JOB_SCHEDULE_MAX_RUN_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + create_request_timeout=None, + ) + + pipeline_job_schedule.allow_queueing