From 4e7d11a072c2b3bdb8e6233ff879ec2c31a626ea Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Tue, 20 Jun 2023 13:03:32 -0700 Subject: [PATCH] fix: Fix bug where scheduled pipeline jobs were not running. PiperOrigin-RevId: 542024449 --- .../pipeline_job_schedules.py | 2 +- .../aiplatform/test_pipeline_job_schedule.py | 21 +++++++++++++----- .../aiplatform/test_pipeline_job_schedules.py | 22 +++++++++++++------ 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py b/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py index 8c7defd3bf..008d32ab84 100644 --- a/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py +++ b/google/cloud/aiplatform/preview/pipelinejobschedule/pipeline_job_schedules.py @@ -104,7 +104,7 @@ def __init__( "parent": self._parent, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_job.pipeline_spec}, + "pipeline_spec": pipeline_job.pipeline_spec, }, } pipeline_job_schedule_args = { diff --git a/tests/system/aiplatform/test_pipeline_job_schedule.py b/tests/system/aiplatform/test_pipeline_job_schedule.py index 27df27316f..400938060d 100644 --- a/tests/system/aiplatform/test_pipeline_job_schedule.py +++ b/tests/system/aiplatform/test_pipeline_job_schedule.py @@ -16,8 +16,12 @@ # from google.cloud import aiplatform -from google.cloud.aiplatform.compat.types import schedule_v1beta1 as gca_schedule -from google.cloud.aiplatform.preview.pipelinejobschedule import pipeline_job_schedules +from google.cloud.aiplatform.compat.types import ( + schedule_v1beta1 as gca_schedule, +) +from google.cloud.aiplatform.preview.pipelinejobschedule import ( + pipeline_job_schedules, +) from tests.system.aiplatform import e2e_base from kfp import components @@ -61,7 +65,7 @@ def training_pipeline(number_of_epochs: int = 2): compiler.Compiler().compile( pipeline_func=training_pipeline, package_path=ir_file, - pipeline_name="training-pipeline", + pipeline_name="system-test-training-pipeline", ) job = aiplatform.PipelineJob( template_path=ir_file, @@ -72,14 +76,19 @@ def training_pipeline(number_of_epochs: int = 2): pipeline_job=job, display_name="pipeline_job_schedule_display_name" ) - pipeline_job_schedule.create(cron_expression="*/2 * * * *", max_run_count=2) + max_run_count = 2 + pipeline_job_schedule.create( + cron_expression="*/5 * * * *", + max_run_count=max_run_count, + max_concurrent_run_count=2, + ) shared_state.setdefault("resources", []).append(pipeline_job_schedule) pipeline_job_schedule.pause() assert pipeline_job_schedule.state == gca_schedule.Schedule.State.PAUSED - pipeline_job_schedule.resume() + pipeline_job_schedule.resume(catch_up=True) assert pipeline_job_schedule.state == gca_schedule.Schedule.State.ACTIVE pipeline_job_schedule.wait() @@ -87,6 +96,8 @@ def training_pipeline(number_of_epochs: int = 2): list_jobs_with_read_mask = pipeline_job_schedule.list_jobs( enable_simple_view=True ) + assert len(list_jobs_with_read_mask) == max_run_count + list_jobs_without_read_mask = pipeline_job_schedule.list_jobs() # enable_simple_view=True should apply the `read_mask` filter to limit PipelineJob fields returned diff --git a/tests/unit/aiplatform/test_pipeline_job_schedules.py b/tests/unit/aiplatform/test_pipeline_job_schedules.py index 85368aab1f..95029bad79 100644 --- a/tests/unit/aiplatform/test_pipeline_job_schedules.py +++ b/tests/unit/aiplatform/test_pipeline_job_schedules.py @@ -18,6 +18,7 @@ from datetime import datetime from importlib import reload import json +from typing import Any, Dict from unittest import mock from unittest.mock import patch from urllib import request @@ -48,6 +49,7 @@ import pytest import yaml +from google.protobuf import struct_pb2 from google.protobuf import json_format _TEST_PROJECT = "test-project" @@ -405,6 +407,12 @@ def mock_request_urlopen(job_spec): yield mock_urlopen +def dict_to_struct(d: Dict[str, Any]) -> struct_pb2.Struct: + s = struct_pb2.Struct() + s.update(d) + return s + + @pytest.mark.usefixtures("google_auth_mock") class TestPipelineJobSchedule: def setup_method(self): @@ -481,7 +489,7 @@ def test_call_schedule_service_create( "parent": _TEST_PARENT, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_spec}, + "pipeline_spec": dict_to_struct(pipeline_spec), "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, @@ -565,7 +573,7 @@ def test_call_schedule_service_create_with_different_timezone( "parent": _TEST_PARENT, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_spec}, + "pipeline_spec": dict_to_struct(pipeline_spec), "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, @@ -647,7 +655,7 @@ def test_call_schedule_service_create_artifact_registry( "parent": _TEST_PARENT, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_spec}, + "pipeline_spec": dict_to_struct(pipeline_spec), "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, @@ -729,7 +737,7 @@ def test_call_schedule_service_create_https( "parent": _TEST_PARENT, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_spec}, + "pipeline_spec": dict_to_struct(pipeline_spec), "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, @@ -810,7 +818,7 @@ def test_call_schedule_service_create_with_timeout( "parent": _TEST_PARENT, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_spec}, + "pipeline_spec": dict_to_struct(pipeline_spec), "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, @@ -890,7 +898,7 @@ def test_call_schedule_service_create_with_timeout_not_explicitly_set( "parent": _TEST_PARENT, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_spec}, + "pipeline_spec": dict_to_struct(pipeline_spec), "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, }, @@ -958,7 +966,7 @@ def test_call_pipeline_job_create_schedule( "parent": _TEST_PARENT, "pipeline_job": { "runtime_config": runtime_config, - "pipeline_spec": {"fields": pipeline_spec}, + "pipeline_spec": dict_to_struct(pipeline_spec), "service_account": _TEST_SERVICE_ACCOUNT, "network": _TEST_NETWORK, },