Skip to content

Commit

Permalink
feat(sdk): enable loading both JSON and YAML pipelines IR (#1089)
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy authored Apr 6, 2022
1 parent 936ddf8 commit f2e70b1
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 138 deletions.
9 changes: 6 additions & 3 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from google.cloud.aiplatform import base
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import json_utils
from google.cloud.aiplatform.utils import yaml_utils
from google.cloud.aiplatform.utils import pipeline_utils
from google.protobuf import json_format

Expand Down Expand Up @@ -112,7 +112,7 @@ def __init__(
display_name (str):
Required. The user-defined name of this Pipeline.
template_path (str):
Required. The path of PipelineJob or PipelineSpec JSON file. It
Required. The path of PipelineJob or PipelineSpec JSON or YAML file. It
can be a local path or a Google Cloud Storage URI.
Example: "gs://project.name"
job_id (str):
Expand Down Expand Up @@ -173,9 +173,12 @@ def __init__(
self._parent = initializer.global_config.common_location_path(
project=project, location=location
)
pipeline_json = json_utils.load_json(

# this loads both .yaml and .json files because YAML is a superset of JSON
pipeline_json = yaml_utils.load_yaml(
template_path, self.project, self.credentials
)

# Pipeline_json can be either PipelineJob or PipelineSpec.
if pipeline_json.get("pipelineSpec") is not None:
pipeline_job = pipeline_json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,70 +15,83 @@
# limitations under the License.
#

import json
from typing import Any, Dict, Optional

from google.auth import credentials as auth_credentials
from google.cloud import storage


def load_json(
def load_yaml(
path: str,
project: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> Dict[str, Any]:
"""Loads data from a JSON document.
"""Loads data from a YAML document.
Args:
path (str):
Required. The path of the JSON document in Google Cloud Storage or
Required. The path of the YAML document in Google Cloud Storage or
local.
project (str):
Optional. Project to initiate the Storage client with.
credentials (auth_credentials.Credentials):
Optional. Credentials to use with Storage Client.
Returns:
A Dict object representing the JSON document.
A Dict object representing the YAML document.
"""
if path.startswith("gs://"):
return _load_json_from_gs_uri(path, project, credentials)
return _load_yaml_from_gs_uri(path, project, credentials)
else:
return _load_json_from_local_file(path)
return _load_yaml_from_local_file(path)


def _load_json_from_gs_uri(
def _load_yaml_from_gs_uri(
uri: str,
project: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> Dict[str, Any]:
"""Loads data from a JSON document referenced by a GCS URI.
"""Loads data from a YAML document referenced by a GCS URI.
Args:
path (str):
Required. GCS URI for JSON document.
Required. GCS URI for YAML document.
project (str):
Optional. Project to initiate the Storage client with.
credentials (auth_credentials.Credentials):
Optional. Credentials to use with Storage Client.
Returns:
A Dict object representing the JSON document.
A Dict object representing the YAML document.
"""
try:
import yaml
except ImportError:
raise ImportError(
"pyyaml is not installed and is required to parse PipelineJob or PipelineSpec files. "
'Please install the SDK using "pip install google-cloud-aiplatform[pipelines]"'
)
storage_client = storage.Client(project=project, credentials=credentials)
blob = storage.Blob.from_string(uri, storage_client)
return json.loads(blob.download_as_bytes())
return yaml.safe_load(blob.download_as_bytes())


def _load_json_from_local_file(file_path: str) -> Dict[str, Any]:
"""Loads data from a JSON local file.
def _load_yaml_from_local_file(file_path: str) -> Dict[str, Any]:
"""Loads data from a YAML local file.
Args:
file_path (str):
Required. The local file path of the JSON document.
Required. The local file path of the YAML document.
Returns:
A Dict object representing the JSON document.
A Dict object representing the YAML document.
"""
try:
import yaml
except ImportError:
raise ImportError(
"pyyaml is not installed and is required to parse PipelineJob or PipelineSpec files. "
'Please install the SDK using "pip install google-cloud-aiplatform[pipelines]"'
)
with open(file_path) as f:
return json.load(f)
return yaml.safe_load(f)
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@
"pandas >= 1.0.0",
"pyarrow >= 6.0.1",
]

pipelines_extra_requires = [
"pyyaml>=5.3,<6",
]
full_extra_require = list(
set(
tensorboard_extra_require
+ metadata_extra_require
+ xai_extra_require
+ lit_extra_require
+ featurestore_extra_require
+ pipelines_extra_requires
)
)
testing_extra_require = (
Expand Down Expand Up @@ -110,6 +113,7 @@
"xai": xai_extra_require,
"lit": lit_extra_require,
"cloud_profiler": profiler_extra_require,
"pipelines": pipelines_extra_requires,
},
python_requires=">=3.6",
classifiers=[
Expand Down
Loading

0 comments on commit f2e70b1

Please sign in to comment.