Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Pandas DataFrame support to TabularDataset #1185

Merged
merged 12 commits into from
May 5, 2022
141 changes: 140 additions & 1 deletion google/cloud/aiplatform/datasets/tabular_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

# Copyright 2020 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,12 +19,19 @@

sararob marked this conversation as resolved.
Show resolved Hide resolved
from google.auth import credentials as auth_credentials

from google.cloud import bigquery
from google.cloud.bigquery import _pandas_helpers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't rely on a private module from a dependent python package because it may break between minor versions.

from google.cloud.aiplatform import base
from google.cloud.aiplatform import datasets
from google.cloud.aiplatform.datasets import _datasources
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import schema
from google.cloud.aiplatform import utils

_AUTOML_TRAINING_MIN_ROWS = 1000

_LOGGER = base.Logger(__name__)


class TabularDataset(datasets._ColumnNamesDataset):
"""Managed tabular dataset resource for Vertex AI."""
Expand Down Expand Up @@ -146,6 +153,138 @@ def create(
create_request_timeout=create_request_timeout,
)

@staticmethod
def _validate_bq_schema(dataframe, schema) -> bool:
"""Validates whether the user-provided BigQuery schema is compatible
with the data types in their Pandas DataFrame.

Args:
dataframe (pd.DataFrame):
Required. Pandas DataFrame containing the source data for
ingestion as a TabularDataset.
bq_schema (Optional[Union[str, bigquery.SchemaField]]):
Required. The user-provided BigQuery schema.
"""
try:
_pandas_helpers.dataframe_to_arrow(dataframe, schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the BQ client handles this validation we can rely on the client and avoid importing this private module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this validation check and letting the BQ client handle it.

return True
except Exception as e:
_LOGGER.warning(f"Warning: {e}")
return False

@classmethod
def create_from_dataframe(
cls,
df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd'
staging_path: str,
bq_schema: Optional[Union[str, bigquery.SchemaField]] = None,
display_name: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> "TabularDataset":
"""Creates a new tabular dataset from a Pandas DataFrame.

Args:
df_source (pd.DataFrame):
Required. Pandas DataFrame containing the source data for
ingestion as a TabularDataset. This method will use the data
types from the provided DataFrame when creating the dataset.
staging_path (str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the location requirements should also be documented or a reference to this documentation should be provided: https://cloud.google.com/vertex-ai/docs/general/locations#bq-locations

If possible they should be validated but not a hard requirement. Is it possible for the dataset create to fail because of the regional requirements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I just tested it and it can fail if the dataset location doesn't match the project location or the service doesn't have the right access to the dataset. I'll update the docstring to link to that page.

In terms of validating, the BQ client throws this error: google.api_core.exceptions.FailedPrecondition: 400 BigQuery Dataset location eu must be in the same location as the service location us-central1.

Do you think we should validate as well or let the BQ client handle validation? If we do validation, we'd need to use the BQ client to check the location of the provided BQ dataset string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with relying on BQ client.

Required. The BigQuery table to stage the data
for Vertex. Because Vertex maintains a reference to this source
to create the Vertex Dataset, this BigQuery table should
not be deleted. Example: `bq://my-project.my-dataset.my-table`.
If the provided BigQuery table doesn't exist, this method will
create the table. If the provided BigQuery table already exists,
and the schemas of the BigQuery table and your DataFrame match,
this method will append the data in your local DataFrame to the table.
The location of the provided BigQuery table should conform to the location requirements
specified here: https://cloud.google.com/vertex-ai/docs/general/locations#bq-locations.
bq_schema (Optional[Union[str, bigquery.SchemaField]]):
Optional. The schema to use when creating the staging table in BigQuery. For more details,
see: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig#google_cloud_bigquery_job_LoadJobConfig_schema
This is not needed if the BigQuery table provided in `staging_path` already exists.
Copy link

@helinwang helinwang May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply when the staging table does not exit, user need to pass in BQ schema? No existing staging table is the most common case, and this would be a bit hard to use - users need to learn how to create BQ schema, etc.
Given df_source already has a schema, can we auto generate the BQ schema for users?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best case is BQ client support inferencing schema natively, could you check on that? BQ UI supports inferencing schema for JSONL. Haven't tried Parquet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BQ client will autodetect the schema using the DataFrame column types if the user doesn't provide this bq_schema parameter, this happens here. So I'm guessing most users won't use this schema param to override the autodetect.

I think my docstring was confusing. I updated it, let me know if this is clearer:

Optional. If not set, BigQuery will autodetect the schema using your DataFrame's column types.
If set, BigQuery will use the schema you provide when creating the staging table. For more details,
see: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig#google_cloud_bigquery_job_LoadJobConfig_schema

If this is not provided and the provided BigQuery table does not exist, the column types
will be auto-detected using the data types in your Pandas DataFrame.
display_name (str):
Optional. The user-defined name of the Dataset.
The name can be up to 128 characters long and can be consist
of any UTF-8 charact
project (str):
Optional. Project to upload this dataset to. Overrides project set in
aiplatform.init.
location (str):
Optional. Location to upload this dataset to. Overrides location set in
aiplatform.init.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to upload this dataset. Overrides
credentials set in aiplatform.init.
Returns:
tabular_dataset (TabularDataset):
Instantiated representation of the managed tabular dataset resource.
"""

if staging_path.startswith("bq://"):
bq_staging_path = staging_path[len("bq://") :]
else:
raise ValueError(
"Only BigQuery staging paths are supported. Provide a staging path in the format `bq://your-project.your-dataset.your-table`."
)

try:
import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery'
except ImportError:
raise ImportError(
"Pyarrow is not installed, and is required to use the BigQuery client."
'Please install the SDK using "pip install google-cloud-aiplatform[datasets]"'
)

if bq_schema and not TabularDataset._validate_bq_schema(
dataframe=df_source, schema=bq_schema
):
raise ValueError("The provided `bq_schema` is not valid.")

if len(df_source) < _AUTOML_TRAINING_MIN_ROWS:
_LOGGER.info(
"Your DataFrame has %s rows and AutoML requires %s rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training."
% (len(df_source), _AUTOML_TRAINING_MIN_ROWS),
)

bigquery_client = bigquery.Client(
project=project or initializer.global_config.project,
credentials=credentials or initializer.global_config.credentials,
)

try:
parquet_options = bigquery.format_options.ParquetOptions()
parquet_options.enable_list_inference = True

job_config = bigquery.LoadJobConfig(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this config infer all the types? I see the enable_list_inference but I couldn't find a reference in the BQ docs for non list type inference.

Copy link
Contributor Author

@sararob sararob Apr 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will infer the data types from the DF. From the BQ docs:

            The destination table to use for loading the data. If it is an
            existing table, the schema of the :class:`~pandas.DataFrame`
            must match the schema of the destination table. If the table
            does not yet exist, the schema is inferred from the
            :class:`~pandas.DataFrame`.

I added a bq_schema param to give the user the option to override the data type autodection, but I think I may need to add more client-side validation on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can rely on BQ client validation.

source_format=bigquery.SourceFormat.PARQUET,
parquet_options=parquet_options,
)

if bq_schema:
job_config.schema = bq_schema

job = bigquery_client.load_table_from_dataframe(
dataframe=df_source, destination=bq_staging_path, job_config=job_config
)

job.result()

finally:
dataset_from_dataframe = cls.create(
display_name=display_name,
bq_source=staging_path,
project=project,
location=location,
credentials=credentials,
)

return dataset_from_dataframe

def import_data(self):
raise NotImplementedError(
f"{self.__class__.__name__} class does not support 'import_data'"
Expand Down
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
pipelines_extra_requires = [
"pyyaml>=5.3,<6",
]
datasets_extra_require = [
"pyarrow >= 3.0.0, < 8.0dev",
]
full_extra_require = list(
set(
tensorboard_extra_require
Expand All @@ -63,6 +66,7 @@
+ lit_extra_require
+ featurestore_extra_require
+ pipelines_extra_requires
+ datasets_extra_require
)
)
testing_extra_require = (
Expand Down
149 changes: 149 additions & 0 deletions tests/system/aiplatform/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import pytest
import importlib

import pandas as pd

from google import auth as google_auth
from google.api_core import exceptions
from google.api_core import client_options

from google.cloud import bigquery

from google.cloud import aiplatform
from google.cloud import storage
from google.cloud.aiplatform import utils
Expand Down Expand Up @@ -55,6 +59,66 @@
_TEST_TEXT_ENTITY_IMPORT_SCHEMA = "gs://google-cloud-aiplatform/schema/dataset/ioformat/text_extraction_io_format_1.0.0.yaml"
_TEST_IMAGE_OBJ_DET_IMPORT_SCHEMA = "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"

# create_from_dataframe
_TEST_BOOL_COL = "bool_col"
_TEST_BOOL_ARR_COL = "bool_array_col"
_TEST_DOUBLE_COL = "double_col"
_TEST_DOUBLE_ARR_COL = "double_array_col"
_TEST_INT_COL = "int64_col"
_TEST_INT_ARR_COL = "int64_array_col"
_TEST_STR_COL = "string_col"
_TEST_STR_ARR_COL = "string_array_col"
_TEST_BYTES_COL = "bytes_col"
_TEST_DF_COLUMN_NAMES = [
_TEST_BOOL_COL,
_TEST_BOOL_ARR_COL,
_TEST_DOUBLE_COL,
_TEST_DOUBLE_ARR_COL,
_TEST_INT_COL,
_TEST_INT_ARR_COL,
_TEST_STR_COL,
_TEST_STR_ARR_COL,
_TEST_BYTES_COL,
]
_TEST_DATAFRAME = pd.DataFrame(
data=[
[
False,
[True, False],
1.2,
[1.2, 3.4],
1,
[1, 2],
"test",
["test1", "test2"],
b"1",
],
[
True,
[True, True],
2.2,
[2.2, 4.4],
2,
[2, 3],
"test1",
["test2", "test3"],
b"0",
],
],
columns=_TEST_DF_COLUMN_NAMES,
)
_TEST_DATAFRAME_BQ_SCHEMA = [
bigquery.SchemaField(name="bool_col", field_type="BOOL"),
bigquery.SchemaField(name="bool_array_col", field_type="BOOL", mode="REPEATED"),
bigquery.SchemaField(name="double_col", field_type="FLOAT"),
bigquery.SchemaField(name="double_array_col", field_type="FLOAT", mode="REPEATED"),
bigquery.SchemaField(name="int64_col", field_type="INTEGER"),
bigquery.SchemaField(name="int64_array_col", field_type="INTEGER", mode="REPEATED"),
bigquery.SchemaField(name="string_col", field_type="STRING"),
bigquery.SchemaField(name="string_array_col", field_type="STRING", mode="REPEATED"),
bigquery.SchemaField(name="bytes_col", field_type="STRING"),
]


class TestDataset:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should enherit from the base e2e class so it can take advantage of the resource cleanup functionality: https://github.com/googleapis/python-aiplatform/blob/main/tests/system/aiplatform/e2e_base.py#L37

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to inherit from e2e_base. I noticed there was some duplication of other fixtures in this file so I removed that and inherited the fixtures from the base class.

def setup_method(self):
Expand All @@ -66,6 +130,25 @@ def shared_state(self):
shared_state = {}
yield shared_state

@pytest.fixture()
def prepare_bigquery_dataset(self, shared_state):
"""Create a bigquery dataset and store bigquery resource object in shared state."""

bigquery_client = bigquery.Client(project=_TEST_PROJECT)
shared_state["bigquery_client"] = bigquery_client

dataset_name = f"tabulardatasettest_{uuid.uuid4()}".replace("-", "_")
shared_state["dataset_name"] = dataset_name

dataset_id = f"{_TEST_PROJECT}.{dataset_name}"
shared_state["bigquery_dataset_id"] = dataset_id

dataset = bigquery.Dataset(dataset_id)
dataset.location = _TEST_LOCATION
shared_state["bigquery_dataset"] = bigquery_client.create_dataset(dataset)

yield
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like BQ dataset deletion should be handled after the yield.


@pytest.fixture()
def create_staging_bucket(self, shared_state):
new_staging_bucket = f"temp-sdk-integration-{uuid.uuid4()}"
Expand Down Expand Up @@ -253,6 +336,72 @@ def test_create_tabular_dataset(self, dataset_gapic_client, shared_state):
== aiplatform.schema.dataset.metadata.tabular
)

@pytest.mark.usefixtures("delete_new_dataset", "prepare_bigquery_dataset")
def test_create_tabular_dataset_from_dataframe(
self, dataset_gapic_client, shared_state
):
"""Use the Dataset.create_from_dataframe() method to create a new tabular dataset.
Then confirm the dataset was successfully created and references the BQ source."""

assert shared_state["dataset_name"]
assert shared_state["bigquery_dataset"]

bigquery_dataset_id = shared_state["bigquery_dataset_id"]
bq_staging_table = f"bq://{bigquery_dataset_id}.test_table{uuid.uuid4()}"

aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION)

tabular_dataset = aiplatform.TabularDataset.create_from_dataframe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tabular_dataset should be appended to shared_state['resources'] so it's deleted after the test

see example: https://github.com/googleapis/python-aiplatform/blob/main/tests/system/aiplatform/test_e2e_tabular.py#L89

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated both new tests to use shared_state['resources']

df_source=_TEST_DATAFRAME,
staging_path=bq_staging_table,
display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}",
)

shared_state["dataset_name"] = tabular_dataset.resource_name

gapic_metadata = tabular_dataset.to_dict()["metadata"]
bq_source = gapic_metadata["inputConfig"]["bigquerySource"]["uri"]

assert bq_staging_table == bq_source
assert (
tabular_dataset.metadata_schema_uri
== aiplatform.schema.dataset.metadata.tabular
)

@pytest.mark.usefixtures("delete_new_dataset", "prepare_bigquery_dataset")
def test_create_tabular_dataset_from_dataframe_with_provided_schema(
self, dataset_gapic_client, shared_state
):
"""Use the Dataset.create_from_dataframe() method to create a new tabular dataset,
passing in the optional `bq_schema` argument. Then confirm the dataset was successfully
created and references the BQ source."""

assert shared_state["dataset_name"]
assert shared_state["bigquery_dataset"]

bigquery_dataset_id = shared_state["bigquery_dataset_id"]
bq_staging_table = f"bq://{bigquery_dataset_id}.test_table{uuid.uuid4()}"

aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION)

tabular_dataset = aiplatform.TabularDataset.create_from_dataframe(
df_source=_TEST_DATAFRAME,
staging_path=bq_staging_table,
display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}",
bq_schema=_TEST_DATAFRAME_BQ_SCHEMA,
)

shared_state["dataset_name"] = tabular_dataset.resource_name

gapic_metadata = tabular_dataset.to_dict()["metadata"]
bq_source = gapic_metadata["inputConfig"]["bigquerySource"]["uri"]

assert bq_staging_table == bq_source
assert (
tabular_dataset.metadata_schema_uri
== aiplatform.schema.dataset.metadata.tabular
)

# TODO(vinnys): Remove pytest skip once persistent resources are accessible
@pytest.mark.skip(reason="System tests cannot access persistent test resources")
@pytest.mark.usefixtures("create_staging_bucket", "delete_staging_bucket")
Expand Down
Loading