Skip to content

Commit

Permalink
implement failure_policy
Browse files Browse the repository at this point in the history
  • Loading branch information
chongyouquan committed Jun 9, 2022
1 parent 7a54637 commit 72cdd9e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
2 changes: 2 additions & 0 deletions google/cloud/aiplatform/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
types.model_evaluation_slice = types.model_evaluation_slice_v1beta1
types.model_service = types.model_service_v1beta1
types.operation = types.operation_v1beta1
types.pipeline_failure_policy = types.pipeline_failure_policy_v1beta1
types.pipeline_job = types.pipeline_job_v1beta1
types.pipeline_service = types.pipeline_service_v1beta1
types.pipeline_state = types.pipeline_state_v1beta1
Expand Down Expand Up @@ -176,6 +177,7 @@
types.model_evaluation_slice = types.model_evaluation_slice_v1
types.model_service = types.model_service_v1
types.operation = types.operation_v1
types.pipeline_failure_policy = types.pipeline_failure_policy_v1
types.pipeline_job = types.pipeline_job_v1
types.pipeline_service = types.pipeline_service_v1
types.pipeline_state = types.pipeline_state_v1
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/compat/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
model_evaluation_slice as model_evaluation_slice_v1beta1,
model_service as model_service_v1beta1,
operation as operation_v1beta1,
pipeline_failure_policy as pipeline_failure_policy_v1beta1,
pipeline_job as pipeline_job_v1beta1,
pipeline_service as pipeline_service_v1beta1,
pipeline_state as pipeline_state_v1beta1,
Expand Down Expand Up @@ -122,6 +123,7 @@
model_evaluation_slice as model_evaluation_slice_v1,
model_service as model_service_v1,
operation as operation_v1,
pipeline_failure_policy as pipeline_failure_policy_v1,
pipeline_job as pipeline_job_v1,
pipeline_service as pipeline_service_v1,
pipeline_state as pipeline_state_v1,
Expand Down Expand Up @@ -185,6 +187,7 @@
model_evaluation_slice_v1,
model_service_v1,
operation_v1,
pipeline_failure_policy_v1beta1,
pipeline_job_v1,
pipeline_service_v1,
pipeline_state_v1,
Expand Down Expand Up @@ -245,6 +248,7 @@
model_evaluation_slice_v1beta1,
model_service_v1beta1,
operation_v1beta1,
pipeline_failure_policy_v1beta1,
pipeline_job_v1beta1,
pipeline_service_v1beta1,
pipeline_state_v1beta1,
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def __init__(
credentials: Optional[auth_credentials.Credentials] = None,
project: Optional[str] = None,
location: Optional[str] = None,
failure_policy: Optional[str] = None,
):
"""Retrieves a PipelineJob resource and instantiates its
representation.
Expand Down Expand Up @@ -155,6 +156,15 @@ def __init__(
location (str):
Optional. Location to create PipelineJob. If not set,
location set in aiplatform.init will be used.
failure_policy (str):
Optional. The failure policy - "slow" or "fast".
Currently, the default of a pipeline is that the pipeline will continue to
run until no more tasks can be executed, also known as
PIPELINE_FAILURE_POLICY_FAIL_SLOW (corresponds to "slow").
However, if a pipeline is set to
PIPELINE_FAILURE_POLICY_FAIL_FAST (corresponds to "fast"),
it will stop scheduling any new tasks when a task has failed. Any
scheduled tasks will continue to completion.
Raises:
ValueError: If job_id or labels have incorrect format.
Expand Down Expand Up @@ -201,6 +211,7 @@ def __init__(
)
builder.update_pipeline_root(pipeline_root)
builder.update_runtime_parameters(parameter_values)
builder.update_failure_policy(failure_policy)
runtime_config_dict = builder.build()

runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb
Expand Down
38 changes: 36 additions & 2 deletions google/cloud/aiplatform/utils/pipeline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import copy
import json
from typing import Any, Dict, Mapping, Optional, Union
from google.cloud.aiplatform.compat.types import pipeline_failure_policy
import packaging.version


Expand All @@ -32,6 +33,7 @@ def __init__(
schema_version: str,
parameter_types: Mapping[str, str],
parameter_values: Optional[Dict[str, Any]] = None,
failure_policy: Optional[str] = None,
):
"""Creates a PipelineRuntimeConfigBuilder object.
Expand All @@ -44,11 +46,20 @@ def __init__(
Required. The mapping from pipeline parameter name to its type.
parameter_values (Dict[str, Any]):
Optional. The mapping from runtime parameter name to its value.
failure_policy (pipeline_failure_policy.PipelineFailurePolicy):
Optional. Represents the failure policy of a pipeline. Currently, the
default of a pipeline is that the pipeline will continue to
run until no more tasks can be executed, also known as
PIPELINE_FAILURE_POLICY_FAIL_SLOW. However, if a pipeline is
set to PIPELINE_FAILURE_POLICY_FAIL_FAST, it will stop
scheduling any new tasks when a task has failed. Any
scheduled tasks will continue to completion.
"""
self._pipeline_root = pipeline_root
self._schema_version = schema_version
self._parameter_types = parameter_types
self._parameter_values = copy.deepcopy(parameter_values or {})
self._failure_policy = failure_policy

@classmethod
def from_job_spec_json(
Expand Down Expand Up @@ -80,7 +91,8 @@ def from_job_spec_json(

pipeline_root = runtime_config_spec.get("gcsOutputDirectory")
parameter_values = _parse_runtime_parameters(runtime_config_spec)
return cls(pipeline_root, schema_version, parameter_types, parameter_values)
failure_policy = runtime_config_spec.get("failurePolicy")
return cls(pipeline_root, schema_version, parameter_types, parameter_values, failure_policy)

def update_pipeline_root(self, pipeline_root: Optional[str]) -> None:
"""Updates pipeline_root value.
Expand Down Expand Up @@ -111,6 +123,16 @@ def update_runtime_parameters(
parameters[k] = json.dumps(v)
self._parameter_values.update(parameters)

def update_failure_policy(self, failure_policy: Optional[str] = None) -> None:
"""Merges runtime failure policy.
Args:
failure_policy (str):
Optional. The failure policy - "slow" or "fast".
"""
if failure_policy:
self._failure_policy = _FAILURE_POLICY_TO_ENUM_VALUE[failure_policy]

def build(self) -> Dict[str, Any]:
"""Build a RuntimeConfig proto.
Expand All @@ -128,7 +150,8 @@ def build(self) -> Dict[str, Any]:
parameter_values_key = "parameterValues"
else:
parameter_values_key = "parameters"
return {

runtime_config = {
"gcsOutputDirectory": self._pipeline_root,
parameter_values_key: {
k: self._get_vertex_value(k, v)
Expand All @@ -137,6 +160,11 @@ def build(self) -> Dict[str, Any]:
},
}

if self._failure_policy:
runtime_config["failurePolicy"]: self._failure_policy

return runtime_config

def _get_vertex_value(
self, name: str, value: Union[int, float, str, bool, list, dict]
) -> Union[int, float, str, bool, list, dict]:
Expand Down Expand Up @@ -205,3 +233,9 @@ def _parse_runtime_parameters(
else:
raise TypeError("Got unknown type of value: {}".format(value))
return result

_FAILURE_POLICY_TO_ENUM_VALUE = {
"slow": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW,
"fast": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST,
None: pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_UNSPECIFIED,
}

0 comments on commit 72cdd9e

Please sign in to comment.