From f5785ddbaa68b3394e42d4b0162afe111d6e5a6e Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Tue, 26 Apr 2022 16:13:41 -0400 Subject: [PATCH 1/9] add create_from_dataframe method --- .../aiplatform/datasets/tabular_dataset.py | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index ec9769bb7f..b98414ba39 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -19,12 +19,17 @@ from google.auth import credentials as auth_credentials +from google.cloud import base +from google.cloud import bigquery from google.cloud.aiplatform import datasets from google.cloud.aiplatform.datasets import _datasources from google.cloud.aiplatform import initializer from google.cloud.aiplatform import schema from google.cloud.aiplatform import utils +_AUTOML_TRAINING_MIN_ROWS = 1000 + +_LOGGER = base.Logger(__name__) class TabularDataset(datasets._ColumnNamesDataset): """Managed tabular dataset resource for Vertex AI.""" @@ -146,6 +151,89 @@ def create( create_request_timeout=create_request_timeout, ) + @classmethod + def create_from_dataframe( + cls, + display_name: str, + df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd' + staging_path: str = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + ) -> "TabularDataset": + """Creates a new tabular dataset from a Pandas DataFrame. + + Args: + display_name (str): + Required. User-defined name of the dataset. + df_source (pd.DataFrame): + Required. Pandas DataFrame containing the source data for + ingestion as a TabularDataset + staging_path (str): + Required. The BigQuery table to stage the data + for Vertex. Because Vertex maintains a reference to this source + to create the Vertex Dataset, this BigQuery table should + not be deleted. Example: `bq://my-project.my-dataset.my-table` + project (str): + Project to upload this model to. Overrides project set in + aiplatform.init. + location (str): + Location to upload this model to. Overrides location set in + aiplatform.init. + credentials (auth_credentials.Credentials): + Custom credentials to use to upload this model. Overrides + credentials set in aiplatform.init. + """ + + if len(df_source) < _AUTOML_TRAINING_MIN_ROWS: + _LOGGER.info( + "Your DataFrame has %s rows and AutoML requires %s rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training." + % (len(df_source), _AUTOML_TRAINING_MIN_ROWS), + ) + + try: + import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery' + except ImportError: + raise ImportError( + f"Pyarrow is not installed. Please install pyarrow to use the BigQuery client." + ) + + bigquery_client = bigquery.Client( + project=project or initializer.global_config.project, + credentials=credentials or initializer.global_config.credentials, + ) + + if staging_path.startswith("bq://"): + bq_staging_path = staging_path[len("bq://"):] + else: + raise ValueError("Only BigQuery staging paths are supported. Provide a staging path in the format `bq://your-project.your-dataset.your-table`.") + + try: + parquet_options = bigquery.format_options.ParquetOptions() + parquet_options.enable_list_inference = True + + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET, + parquet_options=parquet_options, + ) + + job = bigquery_client.load_table_from_dataframe( + dataframe=df_source, destination=bq_staging_path, job_config=job_config + ) + + job.result() + + finally: + dataset_from_dataframe = cls.create( + display_name=display_name, + bq_source=staging_path, + project=project, + location=location, + credentials=credentials, + ) + + return dataset_from_dataframe + def import_data(self): raise NotImplementedError( f"{self.__class__.__name__} class does not support 'import_data'" From ae01b66b684efd91a2a8bfaeef8811c87612e990 Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Tue, 26 Apr 2022 17:05:43 -0400 Subject: [PATCH 2/9] add tests for create_from_dataframe --- .../aiplatform/datasets/tabular_dataset.py | 2 +- tests/unit/aiplatform/test_datasets.py | 131 ++++++++++++++++++ 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index b98414ba39..70272931a5 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -19,8 +19,8 @@ from google.auth import credentials as auth_credentials -from google.cloud import base from google.cloud import bigquery +from google.cloud.aiplatform import base from google.cloud.aiplatform import datasets from google.cloud.aiplatform.datasets import _datasources from google.cloud.aiplatform import initializer diff --git a/tests/unit/aiplatform/test_datasets.py b/tests/unit/aiplatform/test_datasets.py index db9d06a25b..8cd92f6ad4 100644 --- a/tests/unit/aiplatform/test_datasets.py +++ b/tests/unit/aiplatform/test_datasets.py @@ -22,6 +22,7 @@ from unittest import mock from importlib import reload from unittest.mock import patch +import pandas as pd from google.api_core import operation from google.auth.exceptions import GoogleAuthError @@ -147,6 +148,29 @@ _TEST_LABELS = {"my_key": "my_value"} +# create_from_dataframe +_TEST_INVALID_SOURCE_URI_BQ = "my-project.my-dataset.table" + +_TEST_BOOL_COL = "bool_col" +_TEST_BOOL_ARR_COL = "bool_array_col" +_TEST_DOUBLE_COL = "double_col" +_TEST_DOUBLE_ARR_COL = "double_array_col" +_TEST_INT_COL = "int64_col" +_TEST_INT_ARR_COL = "int64_array_col" +_TEST_STR_COL = "string_col" +_TEST_STR_ARR_COL = "string_array_col" +_TEST_BYTES_COL = "bytes_col" +_TEST_DF_COLUMN_NAMES = [ + _TEST_BOOL_COL, + _TEST_BOOL_ARR_COL, + _TEST_DOUBLE_COL, + _TEST_DOUBLE_ARR_COL, + _TEST_INT_COL, + _TEST_INT_ARR_COL, + _TEST_STR_COL, + _TEST_STR_ARR_COL, + _TEST_BYTES_COL, +] @pytest.fixture def get_dataset_mock(): @@ -1378,6 +1402,113 @@ def test_create_dataset_with_labels(self, create_dataset_mock, sync): timeout=None, ) + @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") + @pytest.mark.parametrize( + "source_df", + [ + pd.DataFrame( + data=[ + [ + False, + [True, False], + 1.2, + [1.2, 3.4], + 1, + [1, 2], + "test", + ["test1", "test2"], + b"1", + ], + [ + True, + [True, True], + 2.2, + [2.2, 4.4], + 2, + [2, 3], + "test1", + ["test2", "test3"], + b"0", + ], + ], + columns=_TEST_DF_COLUMN_NAMES, + ), + ], + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_create_dataset_tabular_from_dataframe( + self, create_dataset_mock, source_df, bq_client_mock, sync + ): + aiplatform.init(project=_TEST_PROJECT) + + dataset_from_df = datasets.TabularDataset.create_from_dataframe( + display_name=_TEST_DISPLAY_NAME, + df_source=source_df, + staging_path=_TEST_SOURCE_URI_BQ, + ) + + if not sync: + dataset_from_df.wait() + + assert dataset_from_df.metadata_schema_uri == _TEST_METADATA_SCHEMA_URI_TABULAR + + expected_dataset = gca_dataset.Dataset( + display_name=_TEST_DISPLAY_NAME, + metadata_schema_uri=_TEST_METADATA_SCHEMA_URI_TABULAR, + metadata=_TEST_METADATA_TABULAR_BQ, + ) + + create_dataset_mock.assert_called_once_with( + parent=_TEST_PARENT, + dataset=expected_dataset, + metadata=_TEST_REQUEST_METADATA, + timeout=None, + ) + + @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") + @pytest.mark.parametrize( + "source_df", + [ + pd.DataFrame( + data=[ + [ + False, + [True, False], + 1.2, + [1.2, 3.4], + 1, + [1, 2], + "test", + ["test1", "test2"], + b"1", + ], + [ + True, + [True, True], + 2.2, + [2.2, 4.4], + 2, + [2, 3], + "test1", + ["test2", "test3"], + b"0", + ], + ], + columns=_TEST_DF_COLUMN_NAMES, + ), + ], + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_create_dataset_tabular_from_dataframe_with_invalid_bq_uri( + self, create_dataset_mock, source_df, bq_client_mock, sync + ): + aiplatform.init(project=_TEST_PROJECT) + with pytest.raises(ValueError): + datasets.TabularDataset.create_from_dataframe( + display_name=_TEST_DISPLAY_NAME, + df_source=source_df, + staging_path=_TEST_INVALID_SOURCE_URI_BQ, + ) class TestTextDataset: def setup_method(self): From e2de699b2d05a4757770db09eaf9778a8b045a34 Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Tue, 26 Apr 2022 17:38:27 -0400 Subject: [PATCH 3/9] update docstrings and run linter --- .../cloud/aiplatform/datasets/tabular_dataset.py | 15 +++++++++++---- tests/unit/aiplatform/test_datasets.py | 2 ++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index 70272931a5..1801f69461 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -31,6 +31,7 @@ _LOGGER = base.Logger(__name__) + class TabularDataset(datasets._ColumnNamesDataset): """Managed tabular dataset resource for Vertex AI.""" @@ -173,7 +174,11 @@ def create_from_dataframe( Required. The BigQuery table to stage the data for Vertex. Because Vertex maintains a reference to this source to create the Vertex Dataset, this BigQuery table should - not be deleted. Example: `bq://my-project.my-dataset.my-table` + not be deleted. Example: `bq://my-project.my-dataset.my-table`. + If the provided BigQuery table doesn't exist, this method will + create the table. If the provided BigQuery table already exists, + and the schemas of the BigQuery table and your DataFrame match, + this method will append the data in your local DataFrame to the table. project (str): Project to upload this model to. Overrides project set in aiplatform.init. @@ -195,7 +200,7 @@ def create_from_dataframe( import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery' except ImportError: raise ImportError( - f"Pyarrow is not installed. Please install pyarrow to use the BigQuery client." + "Pyarrow is not installed. Please install pyarrow to use the BigQuery client." ) bigquery_client = bigquery.Client( @@ -204,9 +209,11 @@ def create_from_dataframe( ) if staging_path.startswith("bq://"): - bq_staging_path = staging_path[len("bq://"):] + bq_staging_path = staging_path[len("bq://") :] else: - raise ValueError("Only BigQuery staging paths are supported. Provide a staging path in the format `bq://your-project.your-dataset.your-table`.") + raise ValueError( + "Only BigQuery staging paths are supported. Provide a staging path in the format `bq://your-project.your-dataset.your-table`." + ) try: parquet_options = bigquery.format_options.ParquetOptions() diff --git a/tests/unit/aiplatform/test_datasets.py b/tests/unit/aiplatform/test_datasets.py index 8cd92f6ad4..fadb1e2494 100644 --- a/tests/unit/aiplatform/test_datasets.py +++ b/tests/unit/aiplatform/test_datasets.py @@ -172,6 +172,7 @@ _TEST_BYTES_COL, ] + @pytest.fixture def get_dataset_mock(): with patch.object( @@ -1510,6 +1511,7 @@ def test_create_dataset_tabular_from_dataframe_with_invalid_bq_uri( staging_path=_TEST_INVALID_SOURCE_URI_BQ, ) + class TestTextDataset: def setup_method(self): reload(initializer) From 1983471647ca86de0bab72b80d9f3d9f8036a514 Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Wed, 27 Apr 2022 08:37:22 -0400 Subject: [PATCH 4/9] update docstrings and make display_name optional --- .../aiplatform/datasets/tabular_dataset.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index 1801f69461..1e34b6eee5 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -155,21 +155,19 @@ def create( @classmethod def create_from_dataframe( cls, - display_name: str, df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd' - staging_path: str = None, + staging_path: str, + display_name: Optional[str] = None, project: Optional[str] = None, location: Optional[str] = None, credentials: Optional[auth_credentials.Credentials] = None, ) -> "TabularDataset": """Creates a new tabular dataset from a Pandas DataFrame. - Args: - display_name (str): - Required. User-defined name of the dataset. + Args:ers. df_source (pd.DataFrame): Required. Pandas DataFrame containing the source data for - ingestion as a TabularDataset + ingestion as a TabularDataset. staging_path (str): Required. The BigQuery table to stage the data for Vertex. Because Vertex maintains a reference to this source @@ -179,14 +177,18 @@ def create_from_dataframe( create the table. If the provided BigQuery table already exists, and the schemas of the BigQuery table and your DataFrame match, this method will append the data in your local DataFrame to the table. + display_name (str): + Optional. The user-defined name of the Dataset. + The name can be up to 128 characters long and can be consist + of any UTF-8 charact project (str): - Project to upload this model to. Overrides project set in + Optional. Project to upload this dataset to. Overrides project set in aiplatform.init. location (str): - Location to upload this model to. Overrides location set in + Optional. Location to upload this dataset to. Overrides location set in aiplatform.init. credentials (auth_credentials.Credentials): - Custom credentials to use to upload this model. Overrides + Optional. Custom credentials to use to upload this dataset. Overrides credentials set in aiplatform.init. """ From 80b2ef44ed6cfa1ad8ded4bd500b2cce781794d7 Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Wed, 27 Apr 2022 19:18:01 -0400 Subject: [PATCH 5/9] updates from sashas feedback: added integration test, update validations --- .../aiplatform/datasets/tabular_dataset.py | 52 +++++-- setup.py | 4 + tests/system/aiplatform/test_dataset.py | 141 ++++++++++++++++++ tests/unit/aiplatform/test_datasets.py | 118 +++++++-------- 4 files changed, 233 insertions(+), 82 deletions(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index 1e34b6eee5..b5cb786f00 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -157,6 +157,7 @@ def create_from_dataframe( cls, df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd' staging_path: str, + bq_schema: Optional[Union[str, bigquery.SchemaField]] = None, display_name: Optional[str] = None, project: Optional[str] = None, location: Optional[str] = None, @@ -164,10 +165,11 @@ def create_from_dataframe( ) -> "TabularDataset": """Creates a new tabular dataset from a Pandas DataFrame. - Args:ers. + Args: df_source (pd.DataFrame): Required. Pandas DataFrame containing the source data for - ingestion as a TabularDataset. + ingestion as a TabularDataset. This method will use the data + types from the provided DataFrame when creating the dataset. staging_path (str): Required. The BigQuery table to stage the data for Vertex. Because Vertex maintains a reference to this source @@ -177,6 +179,14 @@ def create_from_dataframe( create the table. If the provided BigQuery table already exists, and the schemas of the BigQuery table and your DataFrame match, this method will append the data in your local DataFrame to the table. + The location of the provided BigQuery table should conform to the location requirements + specified here: https://cloud.google.com/vertex-ai/docs/general/locations#bq-locations. + bq_schema (Optional[Union[str, bigquery.SchemaField]]): + Optional. The schema to use when creating the staging table in BigQuery. For more details, + see: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig#google_cloud_bigquery_job_LoadJobConfig_schema + This is not needed if the BigQuery table provided in `staging_path` already exists. + If this is not provided and the provided BigQuery table does not exist, the column types + will be autodetected using the data types in your Pandas DataFrame. display_name (str): Optional. The user-defined name of the Dataset. The name can be up to 128 characters long and can be consist @@ -190,19 +200,35 @@ def create_from_dataframe( credentials (auth_credentials.Credentials): Optional. Custom credentials to use to upload this dataset. Overrides credentials set in aiplatform.init. + Returns: + tabular_dataset (TabularDataset): + Instantiated representation of the managed tabular dataset resource. """ - if len(df_source) < _AUTOML_TRAINING_MIN_ROWS: - _LOGGER.info( - "Your DataFrame has %s rows and AutoML requires %s rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training." - % (len(df_source), _AUTOML_TRAINING_MIN_ROWS), + if staging_path.startswith("bq://"): + bq_staging_path = staging_path[len("bq://") :] + else: + raise ValueError( + "Only BigQuery staging paths are supported. Provide a staging path in the format `bq://your-project.your-dataset.your-table`." ) try: import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery' except ImportError: raise ImportError( - "Pyarrow is not installed. Please install pyarrow to use the BigQuery client." + "Pyarrow is not installed, and is required to use the BigQuery client." + 'Please install the SDK using "pip install google-cloud-aiplatform[datasets]"' + ) + + if bq_schema: + print(type(bq_schema)) + print(bq_schema[0]) + print(type(bq_schema[0])) + + if len(df_source) < _AUTOML_TRAINING_MIN_ROWS: + _LOGGER.info( + "Your DataFrame has %s rows and AutoML requires %s rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training." + % (len(df_source), _AUTOML_TRAINING_MIN_ROWS), ) bigquery_client = bigquery.Client( @@ -210,13 +236,6 @@ def create_from_dataframe( credentials=credentials or initializer.global_config.credentials, ) - if staging_path.startswith("bq://"): - bq_staging_path = staging_path[len("bq://") :] - else: - raise ValueError( - "Only BigQuery staging paths are supported. Provide a staging path in the format `bq://your-project.your-dataset.your-table`." - ) - try: parquet_options = bigquery.format_options.ParquetOptions() parquet_options.enable_list_inference = True @@ -226,6 +245,9 @@ def create_from_dataframe( parquet_options=parquet_options, ) + if bq_schema: + job_config.schema = bq_schema + job = bigquery_client.load_table_from_dataframe( dataframe=df_source, destination=bq_staging_path, job_config=job_config ) diff --git a/setup.py b/setup.py index 398b7ab654..7db8ad5f27 100644 --- a/setup.py +++ b/setup.py @@ -55,6 +55,9 @@ pipelines_extra_requires = [ "pyyaml>=5.3,<6", ] +datasets_extra_require = [ + "pyarrow >= 3.0.0, < 8.0dev", +] full_extra_require = list( set( tensorboard_extra_require @@ -63,6 +66,7 @@ + lit_extra_require + featurestore_extra_require + pipelines_extra_requires + + datasets_extra_require ) ) testing_extra_require = ( diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index d8d8bd53e3..601450d478 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -20,10 +20,14 @@ import pytest import importlib +import pandas as pd + from google import auth as google_auth from google.api_core import exceptions from google.api_core import client_options +from google.cloud import bigquery + from google.cloud import aiplatform from google.cloud import storage from google.cloud.aiplatform import utils @@ -55,6 +59,59 @@ _TEST_TEXT_ENTITY_IMPORT_SCHEMA = "gs://google-cloud-aiplatform/schema/dataset/ioformat/text_extraction_io_format_1.0.0.yaml" _TEST_IMAGE_OBJ_DET_IMPORT_SCHEMA = "gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml" +# create_from_dataframe +_TEST_BOOL_COL = "bool_col" +_TEST_BOOL_ARR_COL = "bool_array_col" +_TEST_DOUBLE_COL = "double_col" +_TEST_DOUBLE_ARR_COL = "double_array_col" +_TEST_INT_COL = "int64_col" +_TEST_INT_ARR_COL = "int64_array_col" +_TEST_STR_COL = "string_col" +_TEST_STR_ARR_COL = "string_array_col" +_TEST_BYTES_COL = "bytes_col" +_TEST_DF_COLUMN_NAMES = [ + _TEST_BOOL_COL, + _TEST_BOOL_ARR_COL, + _TEST_DOUBLE_COL, + _TEST_DOUBLE_ARR_COL, + _TEST_INT_COL, + _TEST_INT_ARR_COL, + _TEST_STR_COL, + _TEST_STR_ARR_COL, + _TEST_BYTES_COL, +] +_TEST_DATAFRAME = pd.DataFrame( + data=[ + [ + False, + [True, False], + 1.2, + [1.2, 3.4], + 1, + [1, 2], + "test", + ["test1", "test2"], + b"1", + ], + [ + True, + [True, True], + 2.2, + [2.2, 4.4], + 2, + [2, 3], + "test1", + ["test2", "test3"], + b"0", + ], + ], + columns=_TEST_DF_COLUMN_NAMES, +) +_TEST_PARTIAL_BQ_SCHEMA = [ + bigquery.SchemaField("bytes_col", "STRING"), + bigquery.SchemaField("int64_col", "FLOAT"), +] + class TestDataset: def setup_method(self): @@ -66,6 +123,25 @@ def shared_state(self): shared_state = {} yield shared_state + @pytest.fixture() + def prepare_bigquery_dataset(self, shared_state): + """Create a bigquery dataset and store bigquery resource object in shared state.""" + + bigquery_client = bigquery.Client(project=_TEST_PROJECT) + shared_state["bigquery_client"] = bigquery_client + + dataset_name = f"tabulardatasettest_{uuid.uuid4()}".replace("-", "_") + shared_state["dataset_name"] = dataset_name + + dataset_id = f"{_TEST_PROJECT}.{dataset_name}" + shared_state["bigquery_dataset_id"] = dataset_id + + dataset = bigquery.Dataset(dataset_id) + dataset.location = _TEST_LOCATION + shared_state["bigquery_dataset"] = bigquery_client.create_dataset(dataset) + + yield + @pytest.fixture() def create_staging_bucket(self, shared_state): new_staging_bucket = f"temp-sdk-integration-{uuid.uuid4()}" @@ -253,6 +329,71 @@ def test_create_tabular_dataset(self, dataset_gapic_client, shared_state): == aiplatform.schema.dataset.metadata.tabular ) + @pytest.mark.usefixtures("delete_new_dataset", "prepare_bigquery_dataset") + def test_create_tabular_dataset_from_dataframe( + self, dataset_gapic_client, shared_state + ): + """Use the Dataset.create_from_dataframe() method to create a new tabular dataset. + Then confirm the dataset was successfully created and references GCS source.""" + + assert shared_state["dataset_name"] + assert shared_state["bigquery_dataset"] + + bigquery_dataset_id = shared_state["bigquery_dataset_id"] + bq_staging_table = f"bq://{bigquery_dataset_id}.test_table{uuid.uuid4()}" + + aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) + + tabular_dataset = aiplatform.TabularDataset.create_from_dataframe( + df_source=_TEST_DATAFRAME, + staging_path=bq_staging_table, + display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}", + ) + + shared_state["dataset_name"] = tabular_dataset.resource_name + + gapic_metadata = tabular_dataset.to_dict()["metadata"] + bq_source = gapic_metadata["inputConfig"]["bigquerySource"]["uri"] + + assert bq_staging_table == bq_source + assert ( + tabular_dataset.metadata_schema_uri + == aiplatform.schema.dataset.metadata.tabular + ) + + @pytest.mark.usefixtures("delete_new_dataset", "prepare_bigquery_dataset") + def test_create_tabular_dataset_from_dataframe_with_provided_schema( + self, dataset_gapic_client, shared_state + ): + """Use the Dataset.create_from_dataframe() method to create a new tabular dataset. + Then confirm the dataset was successfully created and references GCS source.""" + + assert shared_state["dataset_name"] + assert shared_state["bigquery_dataset"] + + bigquery_dataset_id = shared_state["bigquery_dataset_id"] + bq_staging_table = f"bq://{bigquery_dataset_id}.test_table{uuid.uuid4()}" + + aiplatform.init(project=_TEST_PROJECT, location=_TEST_LOCATION) + + tabular_dataset = aiplatform.TabularDataset.create_from_dataframe( + df_source=_TEST_DATAFRAME, + staging_path=bq_staging_table, + display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}", + bq_schema=_TEST_PARTIAL_BQ_SCHEMA, + ) + + shared_state["dataset_name"] = tabular_dataset.resource_name + + gapic_metadata = tabular_dataset.to_dict()["metadata"] + bq_source = gapic_metadata["inputConfig"]["bigquerySource"]["uri"] + + assert bq_staging_table == bq_source + assert ( + tabular_dataset.metadata_schema_uri + == aiplatform.schema.dataset.metadata.tabular + ) + # TODO(vinnys): Remove pytest skip once persistent resources are accessible @pytest.mark.skip(reason="System tests cannot access persistent test resources") @pytest.mark.usefixtures("create_staging_bucket", "delete_staging_bucket") diff --git a/tests/unit/aiplatform/test_datasets.py b/tests/unit/aiplatform/test_datasets.py index fadb1e2494..84f439b503 100644 --- a/tests/unit/aiplatform/test_datasets.py +++ b/tests/unit/aiplatform/test_datasets.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2020 Google LLC +# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,12 +17,13 @@ import os +import pandas as pd + import pytest from unittest import mock from importlib import reload from unittest.mock import patch -import pandas as pd from google.api_core import operation from google.auth.exceptions import GoogleAuthError @@ -171,6 +172,34 @@ _TEST_STR_ARR_COL, _TEST_BYTES_COL, ] +_TEST_DATAFRAME = pd.DataFrame( + data=[ + [ + False, + [True, False], + 1.2, + [1.2, 3.4], + 1, + [1, 2], + "test", + ["test1", "test2"], + b"1", + ], + [ + True, + [True, True], + 2.2, + [2.2, 4.4], + 2, + [2, 3], + "test1", + ["test2", "test3"], + b"0", + ], + ], + columns=_TEST_DF_COLUMN_NAMES, +) +_TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials()) @pytest.fixture @@ -1406,50 +1435,26 @@ def test_create_dataset_with_labels(self, create_dataset_mock, sync): @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") @pytest.mark.parametrize( "source_df", - [ - pd.DataFrame( - data=[ - [ - False, - [True, False], - 1.2, - [1.2, 3.4], - 1, - [1, 2], - "test", - ["test1", "test2"], - b"1", - ], - [ - True, - [True, True], - 2.2, - [2.2, 4.4], - 2, - [2, 3], - "test1", - ["test2", "test3"], - b"0", - ], - ], - columns=_TEST_DF_COLUMN_NAMES, - ), - ], + [_TEST_DATAFRAME], ) - @pytest.mark.parametrize("sync", [True, False]) def test_create_dataset_tabular_from_dataframe( - self, create_dataset_mock, source_df, bq_client_mock, sync + self, + create_dataset_mock, + source_df, + bq_client_mock, ): - aiplatform.init(project=_TEST_PROJECT) + aiplatform.init( + project=_TEST_PROJECT, + credentials=_TEST_CREDENTIALS, + ) dataset_from_df = datasets.TabularDataset.create_from_dataframe( display_name=_TEST_DISPLAY_NAME, df_source=source_df, staging_path=_TEST_SOURCE_URI_BQ, ) - if not sync: - dataset_from_df.wait() + dataset_from_df.wait() assert dataset_from_df.metadata_schema_uri == _TEST_METADATA_SCHEMA_URI_TABULAR @@ -1466,42 +1471,21 @@ def test_create_dataset_tabular_from_dataframe( timeout=None, ) + assert bq_client_mock.call_args_list[0] == mock.call( + project=_TEST_PROJECT, + credentials=_TEST_CREDENTIALS, + ) + @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") @pytest.mark.parametrize( "source_df", - [ - pd.DataFrame( - data=[ - [ - False, - [True, False], - 1.2, - [1.2, 3.4], - 1, - [1, 2], - "test", - ["test1", "test2"], - b"1", - ], - [ - True, - [True, True], - 2.2, - [2.2, 4.4], - 2, - [2, 3], - "test1", - ["test2", "test3"], - b"0", - ], - ], - columns=_TEST_DF_COLUMN_NAMES, - ), - ], + [_TEST_DATAFRAME], ) - @pytest.mark.parametrize("sync", [True, False]) def test_create_dataset_tabular_from_dataframe_with_invalid_bq_uri( - self, create_dataset_mock, source_df, bq_client_mock, sync + self, + create_dataset_mock, + source_df, + bq_client_mock, ): aiplatform.init(project=_TEST_PROJECT) with pytest.raises(ValueError): From 19d856552da3e24de68fc0e602f84ac5188bb7dc Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Thu, 28 Apr 2022 09:23:09 -0400 Subject: [PATCH 6/9] remove some logging --- google/cloud/aiplatform/datasets/tabular_dataset.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index b5cb786f00..9e0631bd59 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -220,11 +220,6 @@ def create_from_dataframe( 'Please install the SDK using "pip install google-cloud-aiplatform[datasets]"' ) - if bq_schema: - print(type(bq_schema)) - print(bq_schema[0]) - print(type(bq_schema[0])) - if len(df_source) < _AUTOML_TRAINING_MIN_ROWS: _LOGGER.info( "Your DataFrame has %s rows and AutoML requires %s rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training." From 0f81b3d091860e2b3dac49362ce54582d805e0f3 Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Thu, 28 Apr 2022 13:01:55 -0400 Subject: [PATCH 7/9] update error handling on bq_schema arg --- .../aiplatform/datasets/tabular_dataset.py | 27 +++++- tests/system/aiplatform/test_dataset.py | 22 +++-- tests/unit/aiplatform/test_datasets.py | 85 +++++++++++++++++++ 3 files changed, 126 insertions(+), 8 deletions(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index 9e0631bd59..53ce8add4c 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -20,6 +20,7 @@ from google.auth import credentials as auth_credentials from google.cloud import bigquery +from google.cloud.bigquery import _pandas_helpers from google.cloud.aiplatform import base from google.cloud.aiplatform import datasets from google.cloud.aiplatform.datasets import _datasources @@ -152,6 +153,25 @@ def create( create_request_timeout=create_request_timeout, ) + @staticmethod + def _validate_bq_schema(dataframe, schema) -> bool: + """Validates whether the user-provided BigQuery schema is compatible + with the data types in their Pandas DataFrame. + + Args: + dataframe (pd.DataFrame): + Required. Pandas DataFrame containing the source data for + ingestion as a TabularDataset. + bq_schema (Optional[Union[str, bigquery.SchemaField]]): + Required. The user-provided BigQuery schema. + """ + try: + _pandas_helpers.dataframe_to_arrow(dataframe, schema) + return True + except Exception as e: + _LOGGER.warning(f"Warning: {e}") + return False + @classmethod def create_from_dataframe( cls, @@ -186,7 +206,7 @@ def create_from_dataframe( see: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig#google_cloud_bigquery_job_LoadJobConfig_schema This is not needed if the BigQuery table provided in `staging_path` already exists. If this is not provided and the provided BigQuery table does not exist, the column types - will be autodetected using the data types in your Pandas DataFrame. + will be auto-detected using the data types in your Pandas DataFrame. display_name (str): Optional. The user-defined name of the Dataset. The name can be up to 128 characters long and can be consist @@ -220,6 +240,11 @@ def create_from_dataframe( 'Please install the SDK using "pip install google-cloud-aiplatform[datasets]"' ) + if bq_schema and not TabularDataset._validate_bq_schema( + dataframe=df_source, schema=bq_schema + ): + raise ValueError("The provided `bq_schema` is not valid.") + if len(df_source) < _AUTOML_TRAINING_MIN_ROWS: _LOGGER.info( "Your DataFrame has %s rows and AutoML requires %s rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training." diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index 601450d478..ca1275f3a2 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -107,9 +107,16 @@ ], columns=_TEST_DF_COLUMN_NAMES, ) -_TEST_PARTIAL_BQ_SCHEMA = [ - bigquery.SchemaField("bytes_col", "STRING"), - bigquery.SchemaField("int64_col", "FLOAT"), +_TEST_DATAFRAME_BQ_SCHEMA = [ + bigquery.SchemaField(name="bool_col", field_type="BOOL"), + bigquery.SchemaField(name="bool_array_col", field_type="BOOL", mode="REPEATED"), + bigquery.SchemaField(name="double_col", field_type="FLOAT"), + bigquery.SchemaField(name="double_array_col", field_type="FLOAT", mode="REPEATED"), + bigquery.SchemaField(name="int64_col", field_type="INTEGER"), + bigquery.SchemaField(name="int64_array_col", field_type="INTEGER", mode="REPEATED"), + bigquery.SchemaField(name="string_col", field_type="STRING"), + bigquery.SchemaField(name="string_array_col", field_type="STRING", mode="REPEATED"), + bigquery.SchemaField(name="bytes_col", field_type="STRING"), ] @@ -334,7 +341,7 @@ def test_create_tabular_dataset_from_dataframe( self, dataset_gapic_client, shared_state ): """Use the Dataset.create_from_dataframe() method to create a new tabular dataset. - Then confirm the dataset was successfully created and references GCS source.""" + Then confirm the dataset was successfully created and references the BQ source.""" assert shared_state["dataset_name"] assert shared_state["bigquery_dataset"] @@ -365,8 +372,9 @@ def test_create_tabular_dataset_from_dataframe( def test_create_tabular_dataset_from_dataframe_with_provided_schema( self, dataset_gapic_client, shared_state ): - """Use the Dataset.create_from_dataframe() method to create a new tabular dataset. - Then confirm the dataset was successfully created and references GCS source.""" + """Use the Dataset.create_from_dataframe() method to create a new tabular dataset, + passing in the optional `bq_schema` argument. Then confirm the dataset was successfully + created and references the BQ source.""" assert shared_state["dataset_name"] assert shared_state["bigquery_dataset"] @@ -380,7 +388,7 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema( df_source=_TEST_DATAFRAME, staging_path=bq_staging_table, display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}", - bq_schema=_TEST_PARTIAL_BQ_SCHEMA, + bq_schema=_TEST_DATAFRAME_BQ_SCHEMA, ) shared_state["dataset_name"] = tabular_dataset.resource_name diff --git a/tests/unit/aiplatform/test_datasets.py b/tests/unit/aiplatform/test_datasets.py index 84f439b503..9ca92e6c2f 100644 --- a/tests/unit/aiplatform/test_datasets.py +++ b/tests/unit/aiplatform/test_datasets.py @@ -200,6 +200,20 @@ columns=_TEST_DF_COLUMN_NAMES, ) _TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials()) +_TEST_DATAFRAME_BQ_SCHEMA = [ + bigquery.SchemaField(name="bool_col", field_type="BOOL"), + bigquery.SchemaField(name="bool_array_col", field_type="BOOL", mode="REPEATED"), + bigquery.SchemaField(name="double_col", field_type="FLOAT"), + bigquery.SchemaField(name="double_array_col", field_type="FLOAT", mode="REPEATED"), + bigquery.SchemaField(name="int64_col", field_type="INTEGER"), + bigquery.SchemaField(name="int64_array_col", field_type="INTEGER", mode="REPEATED"), + bigquery.SchemaField(name="string_col", field_type="STRING"), + bigquery.SchemaField(name="string_array_col", field_type="STRING", mode="REPEATED"), + bigquery.SchemaField(name="bytes_col", field_type="STRING"), +] +_TEST_DATAFRAME_INVALID_BQ_SCHEMA = [ + bigquery.SchemaField(name="bool_col", field_type="BOOL"), +] @pytest.fixture @@ -1476,6 +1490,77 @@ def test_create_dataset_tabular_from_dataframe( credentials=_TEST_CREDENTIALS, ) + @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") + @pytest.mark.parametrize( + "source_df", + [_TEST_DATAFRAME], + ) + def test_create_dataset_tabular_from_dataframe_with_schema( + self, + create_dataset_mock, + source_df, + bq_client_mock, + ): + + aiplatform.init( + project=_TEST_PROJECT, + credentials=_TEST_CREDENTIALS, + ) + + dataset_from_df = datasets.TabularDataset.create_from_dataframe( + display_name=_TEST_DISPLAY_NAME, + df_source=source_df, + staging_path=_TEST_SOURCE_URI_BQ, + bq_schema=_TEST_DATAFRAME_BQ_SCHEMA, + ) + + dataset_from_df.wait() + + assert dataset_from_df.metadata_schema_uri == _TEST_METADATA_SCHEMA_URI_TABULAR + + expected_dataset = gca_dataset.Dataset( + display_name=_TEST_DISPLAY_NAME, + metadata_schema_uri=_TEST_METADATA_SCHEMA_URI_TABULAR, + metadata=_TEST_METADATA_TABULAR_BQ, + ) + + create_dataset_mock.assert_called_once_with( + parent=_TEST_PARENT, + dataset=expected_dataset, + metadata=_TEST_REQUEST_METADATA, + timeout=None, + ) + + assert bq_client_mock.call_args_list[0] == mock.call( + project=_TEST_PROJECT, + credentials=_TEST_CREDENTIALS, + ) + + @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") + @pytest.mark.parametrize( + "source_df", + [_TEST_DATAFRAME], + ) + def test_create_dataset_tabular_from_dataframe_with_invalid_schema_raises( + self, + create_dataset_mock, + source_df, + bq_client_mock, + ): + + aiplatform.init( + project=_TEST_PROJECT, + credentials=_TEST_CREDENTIALS, + ) + + with pytest.raises(ValueError): + datasets.TabularDataset.create_from_dataframe( + display_name=_TEST_DISPLAY_NAME, + df_source=source_df, + staging_path=_TEST_SOURCE_URI_BQ, + bq_schema=_TEST_DATAFRAME_INVALID_BQ_SCHEMA, + ) + @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") @pytest.mark.parametrize( "source_df", From 833d9f5fc32995c9c25fe5bbfdc2d9ade64bf265 Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Mon, 2 May 2022 21:56:54 -0400 Subject: [PATCH 8/9] updates from sashas feedback --- .../aiplatform/datasets/tabular_dataset.py | 25 ------- tests/system/aiplatform/test_dataset.py | 66 ++++++------------- tests/unit/aiplatform/test_datasets.py | 25 ------- 3 files changed, 21 insertions(+), 95 deletions(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index 53ce8add4c..d0fe060d47 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -20,7 +20,6 @@ from google.auth import credentials as auth_credentials from google.cloud import bigquery -from google.cloud.bigquery import _pandas_helpers from google.cloud.aiplatform import base from google.cloud.aiplatform import datasets from google.cloud.aiplatform.datasets import _datasources @@ -153,25 +152,6 @@ def create( create_request_timeout=create_request_timeout, ) - @staticmethod - def _validate_bq_schema(dataframe, schema) -> bool: - """Validates whether the user-provided BigQuery schema is compatible - with the data types in their Pandas DataFrame. - - Args: - dataframe (pd.DataFrame): - Required. Pandas DataFrame containing the source data for - ingestion as a TabularDataset. - bq_schema (Optional[Union[str, bigquery.SchemaField]]): - Required. The user-provided BigQuery schema. - """ - try: - _pandas_helpers.dataframe_to_arrow(dataframe, schema) - return True - except Exception as e: - _LOGGER.warning(f"Warning: {e}") - return False - @classmethod def create_from_dataframe( cls, @@ -240,11 +220,6 @@ def create_from_dataframe( 'Please install the SDK using "pip install google-cloud-aiplatform[datasets]"' ) - if bq_schema and not TabularDataset._validate_bq_schema( - dataframe=df_source, schema=bq_schema - ): - raise ValueError("The provided `bq_schema` is not valid.") - if len(df_source) < _AUTOML_TRAINING_MIN_ROWS: _LOGGER.info( "Your DataFrame has %s rows and AutoML requires %s rows to train on tabular data. You can still train a custom model once your dataset has been uploaded to Vertex, but you will not be able to use AutoML for training." diff --git a/tests/system/aiplatform/test_dataset.py b/tests/system/aiplatform/test_dataset.py index ca1275f3a2..81b5e420e9 100644 --- a/tests/system/aiplatform/test_dataset.py +++ b/tests/system/aiplatform/test_dataset.py @@ -37,6 +37,8 @@ from test_utils.vpcsc_config import vpcsc_config +from tests.system.aiplatform import e2e_base + # TODO(vinnys): Replace with env var `BUILD_SPECIFIC_GCP_PROJECT` once supported _, _TEST_PROJECT = google_auth.default() TEST_BUCKET = os.environ.get( @@ -120,58 +122,30 @@ ] -class TestDataset: +@pytest.mark.usefixtures( + "prepare_staging_bucket", + "delete_staging_bucket", + "prepare_bigquery_dataset", + "delete_bigquery_dataset", + "tear_down_resources", +) +class TestDataset(e2e_base.TestEndToEnd): + + _temp_prefix = "temp-vertex-sdk-dataset-test" + def setup_method(self): importlib.reload(initializer) importlib.reload(aiplatform) - @pytest.fixture() - def shared_state(self): - shared_state = {} - yield shared_state - - @pytest.fixture() - def prepare_bigquery_dataset(self, shared_state): - """Create a bigquery dataset and store bigquery resource object in shared state.""" - - bigquery_client = bigquery.Client(project=_TEST_PROJECT) - shared_state["bigquery_client"] = bigquery_client - - dataset_name = f"tabulardatasettest_{uuid.uuid4()}".replace("-", "_") - shared_state["dataset_name"] = dataset_name - - dataset_id = f"{_TEST_PROJECT}.{dataset_name}" - shared_state["bigquery_dataset_id"] = dataset_id - - dataset = bigquery.Dataset(dataset_id) - dataset.location = _TEST_LOCATION - shared_state["bigquery_dataset"] = bigquery_client.create_dataset(dataset) - - yield - @pytest.fixture() def create_staging_bucket(self, shared_state): new_staging_bucket = f"temp-sdk-integration-{uuid.uuid4()}" - storage_client = storage.Client() storage_client.create_bucket(new_staging_bucket) shared_state["storage_client"] = storage_client shared_state["staging_bucket"] = new_staging_bucket yield - @pytest.fixture() - def delete_staging_bucket(self, shared_state): - yield - storage_client = shared_state["storage_client"] - - # Delete temp staging bucket - bucket_to_delete = storage_client.get_bucket(shared_state["staging_bucket"]) - bucket_to_delete.delete(force=True) - - # Close Storage Client - storage_client._http._auth_request.session.close() - storage_client._http.close() - @pytest.fixture() def dataset_gapic_client(self): gapic_client = dataset_service.DatasetServiceClient( @@ -336,16 +310,17 @@ def test_create_tabular_dataset(self, dataset_gapic_client, shared_state): == aiplatform.schema.dataset.metadata.tabular ) - @pytest.mark.usefixtures("delete_new_dataset", "prepare_bigquery_dataset") + @pytest.mark.usefixtures("delete_new_dataset") def test_create_tabular_dataset_from_dataframe( self, dataset_gapic_client, shared_state ): """Use the Dataset.create_from_dataframe() method to create a new tabular dataset. Then confirm the dataset was successfully created and references the BQ source.""" - assert shared_state["dataset_name"] assert shared_state["bigquery_dataset"] + shared_state["resources"] = [] + bigquery_dataset_id = shared_state["bigquery_dataset_id"] bq_staging_table = f"bq://{bigquery_dataset_id}.test_table{uuid.uuid4()}" @@ -356,7 +331,7 @@ def test_create_tabular_dataset_from_dataframe( staging_path=bq_staging_table, display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}", ) - + shared_state["resources"].extend([tabular_dataset]) shared_state["dataset_name"] = tabular_dataset.resource_name gapic_metadata = tabular_dataset.to_dict()["metadata"] @@ -368,7 +343,7 @@ def test_create_tabular_dataset_from_dataframe( == aiplatform.schema.dataset.metadata.tabular ) - @pytest.mark.usefixtures("delete_new_dataset", "prepare_bigquery_dataset") + @pytest.mark.usefixtures("delete_new_dataset") def test_create_tabular_dataset_from_dataframe_with_provided_schema( self, dataset_gapic_client, shared_state ): @@ -376,9 +351,10 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema( passing in the optional `bq_schema` argument. Then confirm the dataset was successfully created and references the BQ source.""" - assert shared_state["dataset_name"] assert shared_state["bigquery_dataset"] + shared_state["resources"] = [] + bigquery_dataset_id = shared_state["bigquery_dataset_id"] bq_staging_table = f"bq://{bigquery_dataset_id}.test_table{uuid.uuid4()}" @@ -390,7 +366,7 @@ def test_create_tabular_dataset_from_dataframe_with_provided_schema( display_name=f"temp_sdk_integration_create_and_import_dataset_from_dataframe{uuid.uuid4()}", bq_schema=_TEST_DATAFRAME_BQ_SCHEMA, ) - + shared_state["resources"].extend([tabular_dataset]) shared_state["dataset_name"] = tabular_dataset.resource_name gapic_metadata = tabular_dataset.to_dict()["metadata"] diff --git a/tests/unit/aiplatform/test_datasets.py b/tests/unit/aiplatform/test_datasets.py index 617ba57942..13ef13aebd 100644 --- a/tests/unit/aiplatform/test_datasets.py +++ b/tests/unit/aiplatform/test_datasets.py @@ -1596,31 +1596,6 @@ def test_create_dataset_tabular_from_dataframe_with_schema( credentials=_TEST_CREDENTIALS, ) - @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") - @pytest.mark.parametrize( - "source_df", - [_TEST_DATAFRAME], - ) - def test_create_dataset_tabular_from_dataframe_with_invalid_schema_raises( - self, - create_dataset_mock, - source_df, - bq_client_mock, - ): - - aiplatform.init( - project=_TEST_PROJECT, - credentials=_TEST_CREDENTIALS, - ) - - with pytest.raises(ValueError): - datasets.TabularDataset.create_from_dataframe( - display_name=_TEST_DISPLAY_NAME, - df_source=source_df, - staging_path=_TEST_SOURCE_URI_BQ, - bq_schema=_TEST_DATAFRAME_INVALID_BQ_SCHEMA, - ) - @pytest.mark.usefixtures("get_dataset_tabular_bq_mock") @pytest.mark.parametrize( "source_df", From 87ac7f9c0dddd9c9a0475640c44c179ce43e5e14 Mon Sep 17 00:00:00 2001 From: Sara Robinson Date: Wed, 4 May 2022 10:20:05 -0400 Subject: [PATCH 9/9] update bq_schema docstring --- google/cloud/aiplatform/datasets/tabular_dataset.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/google/cloud/aiplatform/datasets/tabular_dataset.py b/google/cloud/aiplatform/datasets/tabular_dataset.py index d0fe060d47..732cebe26f 100644 --- a/google/cloud/aiplatform/datasets/tabular_dataset.py +++ b/google/cloud/aiplatform/datasets/tabular_dataset.py @@ -182,11 +182,9 @@ def create_from_dataframe( The location of the provided BigQuery table should conform to the location requirements specified here: https://cloud.google.com/vertex-ai/docs/general/locations#bq-locations. bq_schema (Optional[Union[str, bigquery.SchemaField]]): - Optional. The schema to use when creating the staging table in BigQuery. For more details, + Optional. If not set, BigQuery will autodetect the schema using your DataFrame's column types. + If set, BigQuery will use the schema you provide when creating the staging table. For more details, see: https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.LoadJobConfig#google_cloud_bigquery_job_LoadJobConfig_schema - This is not needed if the BigQuery table provided in `staging_path` already exists. - If this is not provided and the provided BigQuery table does not exist, the column types - will be auto-detected using the data types in your Pandas DataFrame. display_name (str): Optional. The user-defined name of the Dataset. The name can be up to 128 characters long and can be consist