From 8ed8334503e40a83501580b861ff9936788ba61b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 4 Feb 2019 15:15:04 -0800 Subject: [PATCH 01/10] BigQuery Storage API sample for reading pandas dataframe How to get a pandas DataFrame, fast! The first two examples use the existing BigQuery client. These examples create a thread pool and read in parallel. The final example shows using just the new BigQuery Storage client, but only shows how to read with a single thread. --- bigquery_storage/pandas/requirements.txt | 5 ++ bigquery_storage/pandas/to_dataframe.py | 102 +++++++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 bigquery_storage/pandas/requirements.txt create mode 100644 bigquery_storage/pandas/to_dataframe.py diff --git a/bigquery_storage/pandas/requirements.txt b/bigquery_storage/pandas/requirements.txt new file mode 100644 index 00000000000..29d1de558f0 --- /dev/null +++ b/bigquery_storage/pandas/requirements.txt @@ -0,0 +1,5 @@ +google-auth==1.6.2 +google-cloud-bigquery-storage==0.2.0 +google-cloud-bigquery==1.8.1 +fastavro==0.21.17 +pandas==0.24.0 \ No newline at end of file diff --git a/bigquery_storage/pandas/to_dataframe.py b/bigquery_storage/pandas/to_dataframe.py new file mode 100644 index 00000000000..7bf96babf34 --- /dev/null +++ b/bigquery_storage/pandas/to_dataframe.py @@ -0,0 +1,102 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def main(dataset_id="query_results"): + # [START bigquerystorage_pandas_create_client] + import google.auth + from google.cloud import bigquery + from google.cloud import bigquery_storage_v1beta1 + + # Explicitly create a credentials object. This allows you to use the same + # credentials for both the BigQuery and BigQuery Storage clients, avoiding + # unnecessary API calls to fetch duplicate authentication tokens. + credentials, project_id = google.auth.default( + scopes=['https://www.googleapis.com/auth/cloud-platform'] + ) + + # Make clients. + bqclient = bigquery.Client(credentials=credentials, project=project_id) + bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=credentials) + # [END bigquerystorage_pandas_create_client] + + # [START bigquerystorage_pandas_read_table] + # Download a table. + table = bqclient.get_table( + "bigquery-public-data.utility_us.country_code_iso" + ) + rows = bqclient.list_rows(table) + dataframe = rows.to_dataframe(bqstorageclient) + print(dataframe.head()) + # [END bigquerystorage_pandas_read_table] + + dataset = bqclient.dataset(dataset_id) + + # [START bigquerystorage_pandas_read_query_results] + # Download query results. + query_string = """ +SELECT + CONCAT( + 'https://stackoverflow.com/questions/', + CAST(id as STRING)) as url, + view_count +FROM `bigquery-public-data.stackoverflow.posts_questions` +WHERE tags like '%google-bigquery%' +ORDER BY view_count DESC +""" + query_config = bigquery.QueryJobConfig( + # Due to a known issue in the BigQuery Storage API (TODO: link to + # public issue), small result sets cannot be downloaded. To workaround + # this issue, write your results to a destination table. + destination=dataset.table('query_results_table'), + write_disposition="WRITE_TRUNCATE" + ) + + dataframe = ( + bqclient.query(query_string, job_config=query_config) + .result() + .to_dataframe(bqstorageclient) + ) + print(dataframe.head()) + # [END bigquerystorage_pandas_read_query_results] + + # [START bigquerystorage_pandas_read_session] + table = bigquery_storage_v1beta1.types.TableReference() + table.project_id = "bigquery-public-data" + table.dataset_id = "utility_us" + table.table_id = "country_code_iso" + + parent = "projects/{}".format(project_id) + session = bqstorageclient.create_read_session(table, parent) + + # Don't try to read from an empty table. + if len(session.streams) == 0: + return + + # This example reads from only a single stream. Read from multiple streams + # to fetch data faster. + stream = session.streams[0] + position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) + reader = bqstorageclient.read_rows(position) + + # Parse all Avro blocks and create a dataframe. This call requires a + # session, because the session contains the schema for the row blocks. + dataframe = reader.to_dataframe(session) + print(dataframe.head()) + # [END bigquerystorage_pandas_read_session] + + +if __name__ == "__main__": + main() From b7fae9d0df2193265d207fef47c9118f2074ac37 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 4 Feb 2019 16:36:09 -0800 Subject: [PATCH 02/10] Add tests. --- bigquery_storage/to_dataframe/__init__.py | 0 .../to_dataframe.py => to_dataframe/main.py} | 76 ++++++++++++++++--- bigquery_storage/to_dataframe/main_test.py | 23 ++++++ .../{pandas => to_dataframe}/requirements.txt | 0 4 files changed, 88 insertions(+), 11 deletions(-) create mode 100644 bigquery_storage/to_dataframe/__init__.py rename bigquery_storage/{pandas/to_dataframe.py => to_dataframe/main.py} (58%) create mode 100644 bigquery_storage/to_dataframe/main_test.py rename bigquery_storage/{pandas => to_dataframe}/requirements.txt (100%) diff --git a/bigquery_storage/to_dataframe/__init__.py b/bigquery_storage/to_dataframe/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bigquery_storage/pandas/to_dataframe.py b/bigquery_storage/to_dataframe/main.py similarity index 58% rename from bigquery_storage/pandas/to_dataframe.py rename to bigquery_storage/to_dataframe/main.py index 7bf96babf34..915fab24205 100644 --- a/bigquery_storage/pandas/to_dataframe.py +++ b/bigquery_storage/to_dataframe/main.py @@ -12,8 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import argparse + def main(dataset_id="query_results"): + print("Creating clients.") + bqclient, bqstorageclient, project_id = create_clients() + print("\n\nReading a table:") + table_to_dataframe(bqclient, bqstorageclient) + print("\n\nReading query results:") + query_to_dataframe(bqclient, bqstorageclient, dataset_id) + print("\n\nReading a table, using the BQ Storage API directly:") + session_to_dataframe(bqstorageclient, project_id) + + +def create_clients(): # [START bigquerystorage_pandas_create_client] import google.auth from google.cloud import bigquery @@ -31,20 +44,47 @@ def main(dataset_id="query_results"): bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient( credentials=credentials) # [END bigquerystorage_pandas_create_client] + return bqclient, bqstorageclient, project_id + + +def table_to_dataframe(bqclient, bqstorageclient): + from google.cloud import bigquery # [START bigquerystorage_pandas_read_table] # Download a table. - table = bqclient.get_table( + table = bigquery.TableReference.from_string( "bigquery-public-data.utility_us.country_code_iso" ) - rows = bqclient.list_rows(table) + rows = bqclient.list_rows( + table, + selected_fields=[ + bigquery.SchemaField("country_name", "STRING"), + bigquery.SchemaField("fips_code", "STRING"), + ] + ) dataframe = rows.to_dataframe(bqstorageclient) print(dataframe.head()) # [END bigquerystorage_pandas_read_table] - dataset = bqclient.dataset(dataset_id) + +def query_to_dataframe(bqclient, bqstorageclient, dataset_id): + from google.cloud import bigquery # [START bigquerystorage_pandas_read_query_results] + import uuid + + # Due to a known issue in the BigQuery Storage API (TODO: link to + # public issue), small query result sets cannot be downloaded. To + # workaround this issue, write results to a destination table. + + # TODO: Set dataset_id to a dataset that will store temporary query + # results. Set the default table expiration time to ensure data is + # deleted after the results have been downloaded. + # dataset_id = "temporary_dataset_for_query_results" + dataset = bqclient.dataset(dataset_id) + table_id = "queryresults_" + uuid.uuid4().hex + table = dataset.table(table_id) + # Download query results. query_string = """ SELECT @@ -57,10 +97,7 @@ def main(dataset_id="query_results"): ORDER BY view_count DESC """ query_config = bigquery.QueryJobConfig( - # Due to a known issue in the BigQuery Storage API (TODO: link to - # public issue), small result sets cannot be downloaded. To workaround - # this issue, write your results to a destination table. - destination=dataset.table('query_results_table'), + destination=table, write_disposition="WRITE_TRUNCATE" ) @@ -72,14 +109,28 @@ def main(dataset_id="query_results"): print(dataframe.head()) # [END bigquerystorage_pandas_read_query_results] + +def session_to_dataframe(bqstorageclient, project_id): + from google.cloud import bigquery_storage_v1beta1 + # [START bigquerystorage_pandas_read_session] table = bigquery_storage_v1beta1.types.TableReference() table.project_id = "bigquery-public-data" - table.dataset_id = "utility_us" - table.table_id = "country_code_iso" + table.dataset_id = "new_york_trees" + table.table_id = "tree_species" + + # Specify read options to select columns to read. If no read options are + # specified, the whole table is read. + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.selected_fields.append("species_common_name") + read_options.selected_fields.append("fall_color") parent = "projects/{}".format(project_id) - session = bqstorageclient.create_read_session(table, parent) + session = bqstorageclient.create_read_session( + table, + parent, + read_options=read_options + ) # Don't try to read from an empty table. if len(session.streams) == 0: @@ -99,4 +150,7 @@ def main(dataset_id="query_results"): if __name__ == "__main__": - main() + parser = argparse.ArgumentParser() + parser.add_argument('dataset_id') + args = parser.parse_args() + main(args.dataset_id) diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py new file mode 100644 index 00000000000..9a50a969bf2 --- /dev/null +++ b/bigquery_storage/to_dataframe/main_test.py @@ -0,0 +1,23 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import main + + +def test_main(capsys): + main.main() + out, _ = capsys.readouterr() + assert 'country_name' in out + assert 'stackoverflow' in out + assert 'species_common_name' in out diff --git a/bigquery_storage/pandas/requirements.txt b/bigquery_storage/to_dataframe/requirements.txt similarity index 100% rename from bigquery_storage/pandas/requirements.txt rename to bigquery_storage/to_dataframe/requirements.txt From 6ded541794827bbaf968df0e638ea99ffb05d84d Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 5 Feb 2019 12:14:32 -0800 Subject: [PATCH 03/10] Add fixture to create temporary dataset. Blacken sample. --- bigquery_storage/to_dataframe/main.py | 16 ++++++-------- bigquery_storage/to_dataframe/main_test.py | 25 +++++++++++++++++----- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/bigquery_storage/to_dataframe/main.py b/bigquery_storage/to_dataframe/main.py index 915fab24205..099bc91ee03 100644 --- a/bigquery_storage/to_dataframe/main.py +++ b/bigquery_storage/to_dataframe/main.py @@ -36,13 +36,14 @@ def create_clients(): # credentials for both the BigQuery and BigQuery Storage clients, avoiding # unnecessary API calls to fetch duplicate authentication tokens. credentials, project_id = google.auth.default( - scopes=['https://www.googleapis.com/auth/cloud-platform'] + scopes=["https://www.googleapis.com/auth/cloud-platform"] ) # Make clients. bqclient = bigquery.Client(credentials=credentials, project=project_id) bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient( - credentials=credentials) + credentials=credentials + ) # [END bigquerystorage_pandas_create_client] return bqclient, bqstorageclient, project_id @@ -60,7 +61,7 @@ def table_to_dataframe(bqclient, bqstorageclient): selected_fields=[ bigquery.SchemaField("country_name", "STRING"), bigquery.SchemaField("fips_code", "STRING"), - ] + ], ) dataframe = rows.to_dataframe(bqstorageclient) print(dataframe.head()) @@ -97,8 +98,7 @@ def query_to_dataframe(bqclient, bqstorageclient, dataset_id): ORDER BY view_count DESC """ query_config = bigquery.QueryJobConfig( - destination=table, - write_disposition="WRITE_TRUNCATE" + destination=table, write_disposition="WRITE_TRUNCATE" ) dataframe = ( @@ -127,9 +127,7 @@ def session_to_dataframe(bqstorageclient, project_id): parent = "projects/{}".format(project_id) session = bqstorageclient.create_read_session( - table, - parent, - read_options=read_options + table, parent, read_options=read_options ) # Don't try to read from an empty table. @@ -151,6 +149,6 @@ def session_to_dataframe(bqstorageclient, project_id): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('dataset_id') + parser.add_argument("dataset_id") args = parser.parse_args() main(args.dataset_id) diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py index 9a50a969bf2..22c811cc4e9 100644 --- a/bigquery_storage/to_dataframe/main_test.py +++ b/bigquery_storage/to_dataframe/main_test.py @@ -12,12 +12,27 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid + +from google.cloud import bigquery +import pytest + from . import main -def test_main(capsys): - main.main() +@pytest.fixture +def temporary_dataset(): + client = bigquery.Client() + dataset_id = "bqstorage_to_dataset_{}".format(uuid.uuid4().hex) + dataset_ref = client.dataset(dataset_id) + client.create_dataset(dataset_ref) + yield dataset_id + client.delete_dataset(dataset_id, delete_contents=True) + + +def test_main(capsys, temporary_dataset): + main.main(dataset_id=temporary_dataset) out, _ = capsys.readouterr() - assert 'country_name' in out - assert 'stackoverflow' in out - assert 'species_common_name' in out + assert "country_name" in out + assert "stackoverflow" in out + assert "species_common_name" in out From 3ccc0021237fae8df4d969be1df16b6690fe8f13 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 5 Feb 2019 12:55:48 -0800 Subject: [PATCH 04/10] Remove unnecessary CLI logic. * Move imports inside region tags. * Adjust query indentation to match region tags. --- bigquery_storage/to_dataframe/main.py | 35 ++++++++++----------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/bigquery_storage/to_dataframe/main.py b/bigquery_storage/to_dataframe/main.py index 099bc91ee03..146ac3e0e49 100644 --- a/bigquery_storage/to_dataframe/main.py +++ b/bigquery_storage/to_dataframe/main.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse - def main(dataset_id="query_results"): print("Creating clients.") @@ -49,9 +47,9 @@ def create_clients(): def table_to_dataframe(bqclient, bqstorageclient): + # [START bigquerystorage_pandas_read_table] from google.cloud import bigquery - # [START bigquerystorage_pandas_read_table] # Download a table. table = bigquery.TableReference.from_string( "bigquery-public-data.utility_us.country_code_iso" @@ -69,11 +67,11 @@ def table_to_dataframe(bqclient, bqstorageclient): def query_to_dataframe(bqclient, bqstorageclient, dataset_id): - from google.cloud import bigquery - # [START bigquerystorage_pandas_read_query_results] import uuid + from google.cloud import bigquery + # Due to a known issue in the BigQuery Storage API (TODO: link to # public issue), small query result sets cannot be downloaded. To # workaround this issue, write results to a destination table. @@ -88,15 +86,15 @@ def query_to_dataframe(bqclient, bqstorageclient, dataset_id): # Download query results. query_string = """ -SELECT - CONCAT( - 'https://stackoverflow.com/questions/', - CAST(id as STRING)) as url, - view_count -FROM `bigquery-public-data.stackoverflow.posts_questions` -WHERE tags like '%google-bigquery%' -ORDER BY view_count DESC -""" + SELECT + CONCAT( + 'https://stackoverflow.com/questions/', + CAST(id as STRING)) as url, + view_count + FROM `bigquery-public-data.stackoverflow.posts_questions` + WHERE tags like '%google-bigquery%' + ORDER BY view_count DESC + """ query_config = bigquery.QueryJobConfig( destination=table, write_disposition="WRITE_TRUNCATE" ) @@ -111,9 +109,9 @@ def query_to_dataframe(bqclient, bqstorageclient, dataset_id): def session_to_dataframe(bqstorageclient, project_id): + # [START bigquerystorage_pandas_read_session] from google.cloud import bigquery_storage_v1beta1 - # [START bigquerystorage_pandas_read_session] table = bigquery_storage_v1beta1.types.TableReference() table.project_id = "bigquery-public-data" table.dataset_id = "new_york_trees" @@ -145,10 +143,3 @@ def session_to_dataframe(bqstorageclient, project_id): dataframe = reader.to_dataframe(session) print(dataframe.head()) # [END bigquerystorage_pandas_read_session] - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("dataset_id") - args = parser.parse_args() - main(args.dataset_id) From 2274a873a4c132ad15555174d79445621ce3c64b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 5 Feb 2019 15:06:24 -0800 Subject: [PATCH 05/10] Move sample code to test file. --- bigquery_storage/to_dataframe/main.py | 145 ------------------- bigquery_storage/to_dataframe/main_test.py | 158 +++++++++++++++++++-- 2 files changed, 149 insertions(+), 154 deletions(-) delete mode 100644 bigquery_storage/to_dataframe/main.py diff --git a/bigquery_storage/to_dataframe/main.py b/bigquery_storage/to_dataframe/main.py deleted file mode 100644 index 146ac3e0e49..00000000000 --- a/bigquery_storage/to_dataframe/main.py +++ /dev/null @@ -1,145 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -def main(dataset_id="query_results"): - print("Creating clients.") - bqclient, bqstorageclient, project_id = create_clients() - print("\n\nReading a table:") - table_to_dataframe(bqclient, bqstorageclient) - print("\n\nReading query results:") - query_to_dataframe(bqclient, bqstorageclient, dataset_id) - print("\n\nReading a table, using the BQ Storage API directly:") - session_to_dataframe(bqstorageclient, project_id) - - -def create_clients(): - # [START bigquerystorage_pandas_create_client] - import google.auth - from google.cloud import bigquery - from google.cloud import bigquery_storage_v1beta1 - - # Explicitly create a credentials object. This allows you to use the same - # credentials for both the BigQuery and BigQuery Storage clients, avoiding - # unnecessary API calls to fetch duplicate authentication tokens. - credentials, project_id = google.auth.default( - scopes=["https://www.googleapis.com/auth/cloud-platform"] - ) - - # Make clients. - bqclient = bigquery.Client(credentials=credentials, project=project_id) - bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient( - credentials=credentials - ) - # [END bigquerystorage_pandas_create_client] - return bqclient, bqstorageclient, project_id - - -def table_to_dataframe(bqclient, bqstorageclient): - # [START bigquerystorage_pandas_read_table] - from google.cloud import bigquery - - # Download a table. - table = bigquery.TableReference.from_string( - "bigquery-public-data.utility_us.country_code_iso" - ) - rows = bqclient.list_rows( - table, - selected_fields=[ - bigquery.SchemaField("country_name", "STRING"), - bigquery.SchemaField("fips_code", "STRING"), - ], - ) - dataframe = rows.to_dataframe(bqstorageclient) - print(dataframe.head()) - # [END bigquerystorage_pandas_read_table] - - -def query_to_dataframe(bqclient, bqstorageclient, dataset_id): - # [START bigquerystorage_pandas_read_query_results] - import uuid - - from google.cloud import bigquery - - # Due to a known issue in the BigQuery Storage API (TODO: link to - # public issue), small query result sets cannot be downloaded. To - # workaround this issue, write results to a destination table. - - # TODO: Set dataset_id to a dataset that will store temporary query - # results. Set the default table expiration time to ensure data is - # deleted after the results have been downloaded. - # dataset_id = "temporary_dataset_for_query_results" - dataset = bqclient.dataset(dataset_id) - table_id = "queryresults_" + uuid.uuid4().hex - table = dataset.table(table_id) - - # Download query results. - query_string = """ - SELECT - CONCAT( - 'https://stackoverflow.com/questions/', - CAST(id as STRING)) as url, - view_count - FROM `bigquery-public-data.stackoverflow.posts_questions` - WHERE tags like '%google-bigquery%' - ORDER BY view_count DESC - """ - query_config = bigquery.QueryJobConfig( - destination=table, write_disposition="WRITE_TRUNCATE" - ) - - dataframe = ( - bqclient.query(query_string, job_config=query_config) - .result() - .to_dataframe(bqstorageclient) - ) - print(dataframe.head()) - # [END bigquerystorage_pandas_read_query_results] - - -def session_to_dataframe(bqstorageclient, project_id): - # [START bigquerystorage_pandas_read_session] - from google.cloud import bigquery_storage_v1beta1 - - table = bigquery_storage_v1beta1.types.TableReference() - table.project_id = "bigquery-public-data" - table.dataset_id = "new_york_trees" - table.table_id = "tree_species" - - # Specify read options to select columns to read. If no read options are - # specified, the whole table is read. - read_options = bigquery_storage_v1beta1.types.TableReadOptions() - read_options.selected_fields.append("species_common_name") - read_options.selected_fields.append("fall_color") - - parent = "projects/{}".format(project_id) - session = bqstorageclient.create_read_session( - table, parent, read_options=read_options - ) - - # Don't try to read from an empty table. - if len(session.streams) == 0: - return - - # This example reads from only a single stream. Read from multiple streams - # to fetch data faster. - stream = session.streams[0] - position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) - reader = bqstorageclient.read_rows(position) - - # Parse all Avro blocks and create a dataframe. This call requires a - # session, because the session contains the schema for the row blocks. - dataframe = reader.to_dataframe(session) - print(dataframe.head()) - # [END bigquerystorage_pandas_read_session] diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py index 22c811cc4e9..b43ac046da2 100644 --- a/bigquery_storage/to_dataframe/main_test.py +++ b/bigquery_storage/to_dataframe/main_test.py @@ -14,25 +14,165 @@ import uuid -from google.cloud import bigquery import pytest -from . import main + +@pytest.fixture +def clients(): + # [START bigquerystorage_pandas_tutorial_create_client] + import google.auth + from google.cloud import bigquery + from google.cloud import bigquery_storage_v1beta1 + + # Explicitly create a credentials object. This allows you to use the same + # credentials for both the BigQuery and BigQuery Storage clients, avoiding + # unnecessary API calls to fetch duplicate authentication tokens. + credentials, project_id = google.auth.default( + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) + + # Make clients. + bqclient = bigquery.Client(credentials=credentials, project=project_id) + bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=credentials + ) + # [END bigquerystorage_pandas_tutorial_create_client] + return bqclient, bqstorageclient, project_id @pytest.fixture -def temporary_dataset(): - client = bigquery.Client() +def temporary_dataset(clients): + bqclient, _, _ = clients dataset_id = "bqstorage_to_dataset_{}".format(uuid.uuid4().hex) - dataset_ref = client.dataset(dataset_id) - client.create_dataset(dataset_ref) + + # [START bigquerystorage_pandas_tutorial_create_dataset] + from google.cloud import bigquery + + # TODO: Set the dataset_id to the dataset used to store temporary results. + # dataset_id = "query_results_dataset" + + dataset_ref = bqclient.dataset(dataset_id) + dataset = bigquery.Dataset(dataset_ref) + + # Remove tables after 24 hours. + dataset.default_table_expiration_ms = 1000 * 60 * 60 * 24 + + bqclient.create_dataset(dataset) # API request. + # [END bigquerystorage_pandas_tutorial_create_dataset] yield dataset_id - client.delete_dataset(dataset_id, delete_contents=True) + bqclient.delete_dataset(dataset_ref, delete_contents=True) -def test_main(capsys, temporary_dataset): - main.main(dataset_id=temporary_dataset) +def test_table_to_dataframe(capsys, clients): + bqclient, bqstorageclient, _ = clients + + # [START bigquerystorage_pandas_tutorial_read_table] + from google.cloud import bigquery + + # Download a table. + table = bigquery.TableReference.from_string( + "bigquery-public-data.utility_us.country_code_iso" + ) + rows = bqclient.list_rows( + table, + selected_fields=[ + bigquery.SchemaField("country_name", "STRING"), + bigquery.SchemaField("fips_code", "STRING"), + ], + ) + dataframe = rows.to_dataframe(bqstorageclient) + print(dataframe.head()) + # [END bigquerystorage_pandas_tutorial_read_table] + out, _ = capsys.readouterr() assert "country_name" in out + + +def test_query_to_dataframe(capsys, clients, temporary_dataset): + bqclient, bqstorageclient, _ = clients + dataset_id = temporary_dataset + + # [START bigquerystorage_pandas_tutorial_read_query_results] + import uuid + + from google.cloud import bigquery + + # Due to a known issue in the BigQuery Storage API (TODO: link to + # public issue), small query result sets cannot be downloaded. To + # workaround this issue, write results to a destination table. + + # TODO: Set dataset_id to a dataset that will store temporary query + # results. Set the default table expiration time to ensure data is + # deleted after the results have been downloaded. + # dataset_id = "temporary_dataset_for_query_results" + dataset = bqclient.dataset(dataset_id) + table_id = "queryresults_" + uuid.uuid4().hex + table = dataset.table(table_id) + + # Download query results. + query_string = """ + SELECT + CONCAT( + 'https://stackoverflow.com/questions/', + CAST(id as STRING)) as url, + view_count + FROM `bigquery-public-data.stackoverflow.posts_questions` + WHERE tags like '%google-bigquery%' + ORDER BY view_count DESC + """ + query_config = bigquery.QueryJobConfig( + destination=table, write_disposition="WRITE_TRUNCATE" + ) + + dataframe = ( + bqclient.query(query_string, job_config=query_config) + .result() + .to_dataframe(bqstorageclient) + ) + print(dataframe.head()) + # [END bigquerystorage_pandas_tutorial_read_query_results] + + out, _ = capsys.readouterr() assert "stackoverflow" in out + + +def test_session_to_dataframe(capsys, clients): + bqclient, bqstorageclient, project_id = clients + + # [START bigquerystorage_pandas_tutorial_read_session] + from google.cloud import bigquery_storage_v1beta1 + + table = bigquery_storage_v1beta1.types.TableReference() + table.project_id = "bigquery-public-data" + table.dataset_id = "new_york_trees" + table.table_id = "tree_species" + + # Select columns to read with read options. If no read options are + # specified, the whole table is read. + read_options = bigquery_storage_v1beta1.types.TableReadOptions() + read_options.selected_fields.append("species_common_name") + read_options.selected_fields.append("fall_color") + + parent = "projects/{}".format(project_id) + session = bqstorageclient.create_read_session( + table, parent, read_options=read_options + ) + + # Don't try to read from an empty table. + if len(session.streams) == 0: + return + + # This example reads from only a single stream. Read from multiple streams + # to fetch data faster. + stream = session.streams[0] + position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) + reader = bqstorageclient.read_rows(position) + + # Parse all Avro blocks and create a dataframe. This call requires a + # session, because the session contains the schema for the row blocks. + dataframe = reader.to_dataframe(session) + print(dataframe.head()) + # [END bigquerystorage_pandas_tutorial_read_session] + + out, _ = capsys.readouterr() assert "species_common_name" in out From 1be1a6ecefa75be505c72d73444100ac57a6cf7c Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Feb 2019 11:10:53 -0800 Subject: [PATCH 06/10] Rename project_id to your_project_id Move duplicate imports out of region tags. Add region tag for the whole sample. --- bigquery_storage/to_dataframe/main_test.py | 80 ++++++++++++---------- 1 file changed, 45 insertions(+), 35 deletions(-) diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py index b43ac046da2..11464591f5f 100644 --- a/bigquery_storage/to_dataframe/main_test.py +++ b/bigquery_storage/to_dataframe/main_test.py @@ -19,6 +19,7 @@ @pytest.fixture def clients(): + # [START bigquerystorage_pandas_tutorial_all] # [START bigquerystorage_pandas_tutorial_create_client] import google.auth from google.cloud import bigquery @@ -27,30 +28,37 @@ def clients(): # Explicitly create a credentials object. This allows you to use the same # credentials for both the BigQuery and BigQuery Storage clients, avoiding # unnecessary API calls to fetch duplicate authentication tokens. - credentials, project_id = google.auth.default( + credentials, your_project_id = google.auth.default( scopes=["https://www.googleapis.com/auth/cloud-platform"] ) # Make clients. - bqclient = bigquery.Client(credentials=credentials, project=project_id) + bqclient = bigquery.Client(credentials=credentials, project=your_project_id) bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient( credentials=credentials ) # [END bigquerystorage_pandas_tutorial_create_client] - return bqclient, bqstorageclient, project_id + # [END bigquerystorage_pandas_tutorial_all] + return bqclient, bqstorageclient @pytest.fixture def temporary_dataset(clients): - bqclient, _, _ = clients - dataset_id = "bqstorage_to_dataset_{}".format(uuid.uuid4().hex) + from google.cloud import bigquery + + bqclient, _ = clients + # [START bigquerystorage_pandas_tutorial_all] # [START bigquerystorage_pandas_tutorial_create_dataset] - from google.cloud import bigquery + # Set the dataset_id to the dataset used to store temporary results. + dataset_id = "query_results_dataset" + # [END bigquerystorage_pandas_tutorial_create_dataset] + # [END bigquerystorage_pandas_tutorial_all] - # TODO: Set the dataset_id to the dataset used to store temporary results. - # dataset_id = "query_results_dataset" + dataset_id = "bqstorage_to_dataset_{}".format(uuid.uuid4().hex) + # [START bigquerystorage_pandas_tutorial_all] + # [START bigquerystorage_pandas_tutorial_create_dataset] dataset_ref = bqclient.dataset(dataset_id) dataset = bigquery.Dataset(dataset_ref) @@ -59,16 +67,18 @@ def temporary_dataset(clients): bqclient.create_dataset(dataset) # API request. # [END bigquerystorage_pandas_tutorial_create_dataset] - yield dataset_id + # [END bigquerystorage_pandas_tutorial_all] + yield dataset_ref bqclient.delete_dataset(dataset_ref, delete_contents=True) def test_table_to_dataframe(capsys, clients): - bqclient, bqstorageclient, _ = clients - - # [START bigquerystorage_pandas_tutorial_read_table] from google.cloud import bigquery + bqclient, bqstorageclient = clients + + # [START bigquerystorage_pandas_tutorial_all] + # [START bigquerystorage_pandas_tutorial_read_table] # Download a table. table = bigquery.TableReference.from_string( "bigquery-public-data.utility_us.country_code_iso" @@ -80,34 +90,24 @@ def test_table_to_dataframe(capsys, clients): bigquery.SchemaField("fips_code", "STRING"), ], ) - dataframe = rows.to_dataframe(bqstorageclient) + dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient) print(dataframe.head()) # [END bigquerystorage_pandas_tutorial_read_table] + # [END bigquerystorage_pandas_tutorial_all] out, _ = capsys.readouterr() assert "country_name" in out def test_query_to_dataframe(capsys, clients, temporary_dataset): - bqclient, bqstorageclient, _ = clients - dataset_id = temporary_dataset - - # [START bigquerystorage_pandas_tutorial_read_query_results] - import uuid - from google.cloud import bigquery - # Due to a known issue in the BigQuery Storage API (TODO: link to - # public issue), small query result sets cannot be downloaded. To - # workaround this issue, write results to a destination table. + bqclient, bqstorageclient = clients + dataset_ref = temporary_dataset - # TODO: Set dataset_id to a dataset that will store temporary query - # results. Set the default table expiration time to ensure data is - # deleted after the results have been downloaded. - # dataset_id = "temporary_dataset_for_query_results" - dataset = bqclient.dataset(dataset_id) - table_id = "queryresults_" + uuid.uuid4().hex - table = dataset.table(table_id) + # [START bigquerystorage_pandas_tutorial_all] + # [START bigquerystorage_pandas_tutorial_read_query_results] + import uuid # Download query results. query_string = """ @@ -120,28 +120,37 @@ def test_query_to_dataframe(capsys, clients, temporary_dataset): WHERE tags like '%google-bigquery%' ORDER BY view_count DESC """ + # Use a random table name to avoid overwriting existing tables. + table_id = "queryresults_" + uuid.uuid4().hex + table = dataset_ref.table(table_id) query_config = bigquery.QueryJobConfig( - destination=table, write_disposition="WRITE_TRUNCATE" + # Due to a known issue in the BigQuery Storage API, small query result + # sets cannot be downloaded. To workaround this issue, write results to + # a destination table. + destination=table ) dataframe = ( bqclient.query(query_string, job_config=query_config) .result() - .to_dataframe(bqstorageclient) + .to_dataframe(bqstorage_client=bqstorageclient) ) print(dataframe.head()) # [END bigquerystorage_pandas_tutorial_read_query_results] + # [END bigquerystorage_pandas_tutorial_all] out, _ = capsys.readouterr() assert "stackoverflow" in out def test_session_to_dataframe(capsys, clients): - bqclient, bqstorageclient, project_id = clients - - # [START bigquerystorage_pandas_tutorial_read_session] from google.cloud import bigquery_storage_v1beta1 + bqclient, bqstorageclient = clients + your_project_id = bqclient.project + + # [START bigquerystorage_pandas_tutorial_all] + # [START bigquerystorage_pandas_tutorial_read_session] table = bigquery_storage_v1beta1.types.TableReference() table.project_id = "bigquery-public-data" table.dataset_id = "new_york_trees" @@ -153,7 +162,7 @@ def test_session_to_dataframe(capsys, clients): read_options.selected_fields.append("species_common_name") read_options.selected_fields.append("fall_color") - parent = "projects/{}".format(project_id) + parent = "projects/{}".format(your_project_id) session = bqstorageclient.create_read_session( table, parent, read_options=read_options ) @@ -173,6 +182,7 @@ def test_session_to_dataframe(capsys, clients): dataframe = reader.to_dataframe(session) print(dataframe.head()) # [END bigquerystorage_pandas_tutorial_read_session] + # [END bigquerystorage_pandas_tutorial_all] out, _ = capsys.readouterr() assert "species_common_name" in out From 0dbcca67b7b7574c450e26e0bd04c1de89f42385 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Feb 2019 11:25:36 -0800 Subject: [PATCH 07/10] Remove check for no streams. Let index accesssor fail, instead. --- bigquery_storage/to_dataframe/main_test.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py index 11464591f5f..4e8791bd191 100644 --- a/bigquery_storage/to_dataframe/main_test.py +++ b/bigquery_storage/to_dataframe/main_test.py @@ -167,12 +167,9 @@ def test_session_to_dataframe(capsys, clients): table, parent, read_options=read_options ) - # Don't try to read from an empty table. - if len(session.streams) == 0: - return - # This example reads from only a single stream. Read from multiple streams - # to fetch data faster. + # to fetch data faster. Note that the session may not contain any streams + # if there are no rows to read. stream = session.streams[0] position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream) reader = bqstorageclient.read_rows(position) From 398a2de1034b49a68f1a49483ac76510c269a34f Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Feb 2019 11:29:12 -0800 Subject: [PATCH 08/10] Move temporary dataset creation... to just above the sample where it is used. This makes the complete source code for the sample make more sense (bigquerystorage_pandas_tutorial_all) --- bigquery_storage/to_dataframe/main_test.py | 54 +++++++++++----------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py index 4e8791bd191..351715ec4b1 100644 --- a/bigquery_storage/to_dataframe/main_test.py +++ b/bigquery_storage/to_dataframe/main_test.py @@ -42,6 +42,33 @@ def clients(): return bqclient, bqstorageclient +def test_table_to_dataframe(capsys, clients): + from google.cloud import bigquery + + bqclient, bqstorageclient = clients + + # [START bigquerystorage_pandas_tutorial_all] + # [START bigquerystorage_pandas_tutorial_read_table] + # Download a table. + table = bigquery.TableReference.from_string( + "bigquery-public-data.utility_us.country_code_iso" + ) + rows = bqclient.list_rows( + table, + selected_fields=[ + bigquery.SchemaField("country_name", "STRING"), + bigquery.SchemaField("fips_code", "STRING"), + ], + ) + dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient) + print(dataframe.head()) + # [END bigquerystorage_pandas_tutorial_read_table] + # [END bigquerystorage_pandas_tutorial_all] + + out, _ = capsys.readouterr() + assert "country_name" in out + + @pytest.fixture def temporary_dataset(clients): from google.cloud import bigquery @@ -72,33 +99,6 @@ def temporary_dataset(clients): bqclient.delete_dataset(dataset_ref, delete_contents=True) -def test_table_to_dataframe(capsys, clients): - from google.cloud import bigquery - - bqclient, bqstorageclient = clients - - # [START bigquerystorage_pandas_tutorial_all] - # [START bigquerystorage_pandas_tutorial_read_table] - # Download a table. - table = bigquery.TableReference.from_string( - "bigquery-public-data.utility_us.country_code_iso" - ) - rows = bqclient.list_rows( - table, - selected_fields=[ - bigquery.SchemaField("country_name", "STRING"), - bigquery.SchemaField("fips_code", "STRING"), - ], - ) - dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient) - print(dataframe.head()) - # [END bigquerystorage_pandas_tutorial_read_table] - # [END bigquerystorage_pandas_tutorial_all] - - out, _ = capsys.readouterr() - assert "country_name" in out - - def test_query_to_dataframe(capsys, clients, temporary_dataset): from google.cloud import bigquery From a4b674022a80a72b3930423c9ad75d3a18d10eaf Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 6 Feb 2019 13:26:38 -0800 Subject: [PATCH 09/10] Trim line length. --- bigquery_storage/to_dataframe/main_test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py index 351715ec4b1..18df909eeaa 100644 --- a/bigquery_storage/to_dataframe/main_test.py +++ b/bigquery_storage/to_dataframe/main_test.py @@ -33,7 +33,10 @@ def clients(): ) # Make clients. - bqclient = bigquery.Client(credentials=credentials, project=your_project_id) + bqclient = bigquery.Client( + credentials=credentials, + project=your_project_id + ) bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient( credentials=credentials ) From 925fe3ba1ccafe6a2acc709b84ceb22fcf9d2691 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 7 Feb 2019 10:46:38 -0800 Subject: [PATCH 10/10] Add region tags for use in cleanup section. --- bigquery_storage/to_dataframe/main_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bigquery_storage/to_dataframe/main_test.py b/bigquery_storage/to_dataframe/main_test.py index 18df909eeaa..053bd778918 100644 --- a/bigquery_storage/to_dataframe/main_test.py +++ b/bigquery_storage/to_dataframe/main_test.py @@ -99,7 +99,9 @@ def temporary_dataset(clients): # [END bigquerystorage_pandas_tutorial_create_dataset] # [END bigquerystorage_pandas_tutorial_all] yield dataset_ref + # [START bigquerystorage_pandas_tutorial_cleanup] bqclient.delete_dataset(dataset_ref, delete_contents=True) + # [END bigquerystorage_pandas_tutorial_cleanup] def test_query_to_dataframe(capsys, clients, temporary_dataset):