diff --git a/google/cloud/aiplatform/pipeline_jobs.py b/google/cloud/aiplatform/pipeline_jobs.py index 42c65ca6d4..3efa6c1e84 100644 --- a/google/cloud/aiplatform/pipeline_jobs.py +++ b/google/cloud/aiplatform/pipeline_jobs.py @@ -628,13 +628,6 @@ def done(self) -> bool: return self.state in _PIPELINE_COMPLETE_STATES - def _has_failed(self) -> bool: - """Return True if PipelineJob has Failed.""" - if not self._gca_resource: - return False - - return self.state in _PIPELINE_ERROR_STATES - def _get_context(self) -> context.Context: """Returns the PipelineRun Context for this PipelineJob in the MetadataStore. @@ -655,7 +648,7 @@ def _get_context(self) -> context.Context: time.sleep(1) if not pipeline_run_context: - if self._has_failed: + if self.has_failed: raise RuntimeError( f"Cannot associate PipelineJob to Experiment: {self.gca_resource.error}" ) diff --git a/tests/unit/aiplatform/test_pipeline_jobs.py b/tests/unit/aiplatform/test_pipeline_jobs.py index efa3bafc47..0011758085 100644 --- a/tests/unit/aiplatform/test_pipeline_jobs.py +++ b/tests/unit/aiplatform/test_pipeline_jobs.py @@ -1567,6 +1567,61 @@ def test_pipeline_failure_raises(self, mock_load_yaml_and_json, sync): if not sync: job.wait() + @pytest.mark.usefixtures( + "mock_pipeline_service_create", + "mock_pipeline_service_get_with_fail", + "mock_pipeline_bucket_exists", + ) + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_pipeline_job_has_failed_property(self, mock_load_yaml_and_json): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_GCS_BUCKET_NAME, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + ) + + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + enable_caching=True, + ) + + job.submit( + service_account=_TEST_SERVICE_ACCOUNT, + network=_TEST_NETWORK, + ) + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING + assert job.has_failed + + @pytest.mark.parametrize( + "job_spec", + [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB], + ) + def test_pipeline_job_has_failed_property_with_no_submit( + self, mock_load_yaml_and_json + ): + job = pipeline_jobs.PipelineJob( + display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME, + template_path=_TEST_TEMPLATE_PATH, + job_id=_TEST_PIPELINE_JOB_ID, + parameter_values=_TEST_PIPELINE_PARAMETER_VALUES, + enable_caching=True, + ) + + with pytest.raises( + RuntimeError, + match=r"PipelineJob resource has not been created\.", + ): + assert job.has_failed + @pytest.mark.parametrize( "job_spec", [_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],