From ee74fdd8793b6bc2da69a530d94be6a26f6006a1 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 18 Aug 2017 16:38:57 -0700 Subject: [PATCH 1/5] BigQuery: reproduce error fetching multiple results with DB-API. Add a system test to call `fetchall()` when multiple rows are expected. --- bigquery/tests/system.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index fab7d4b175bd..b92a480e45d9 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -747,6 +747,16 @@ def test_dbapi_w_standard_sql_types(self): row = Config.CURSOR.fetchone() self.assertIsNone(row) + def test_dbapi_fetchall(self): + query = 'SELECT * FROM UNNEST([(1, 2), (3, 4), (5, 6)])' + + for arraysize in range(1, 5): + Config.CURSOR.execute(query) + self.assertEqual(Config.CURSOR.rowcount, 3, "expected 3 rows") + Config.CURSOR.arraysize = arraysize + rows = Config.CURSOR.fetchall() + self.assertEqual(rows, [(1, 2), (3, 4), (5, 6)]) + def _load_table_for_dml(self, rows, dataset_name, table_name): from google.cloud._testing import _NamedTemporaryFile From e2278318fa70b824e224fdd8310dae8665abb3ce Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 18 Aug 2017 17:12:41 -0700 Subject: [PATCH 2/5] BigQuery: system test to reproduce error of only fetching first page. This error applies to all BigQuery iterators, not just DB-API. --- bigquery/tests/system.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index b92a480e45d9..701da91659db 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -1094,7 +1094,7 @@ def test_large_query_w_public_data(self): query.use_legacy_sql = False query.run() - iterator = query.fetch_data() + iterator = query.fetch_data(max_results=100) rows = list(iterator) self.assertEqual(len(rows), LIMIT) From f67eae2fed301f655df987fecf146c0ac9357b92 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 18 Aug 2017 17:17:58 -0700 Subject: [PATCH 3/5] BigQuery: allow arraysize to be set after execute() It was allowed before, but it didn't result in the correct behavior. --- .../google/cloud/bigquery/dbapi/cursor.py | 41 +++++++++++-------- bigquery/tests/unit/test_dbapi_cursor.py | 2 +- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/bigquery/google/cloud/bigquery/dbapi/cursor.py b/bigquery/google/cloud/bigquery/dbapi/cursor.py index a5f04e15c674..1371a73685a7 100644 --- a/bigquery/google/cloud/bigquery/dbapi/cursor.py +++ b/bigquery/google/cloud/bigquery/dbapi/cursor.py @@ -52,8 +52,7 @@ def __init__(self, connection): # a single row at a time. self.arraysize = 1 self._query_data = None - self._page_token = None - self._has_fetched_all_rows = True + self._query_results = None def close(self): """No-op.""" @@ -133,9 +132,8 @@ def execute(self, operation, parameters=None, job_id=None): :param job_id: (Optional) The job_id to use. If not set, a job ID is generated at random. """ + self._query_data = None self._query_results = None - self._page_token = None - self._has_fetched_all_rows = False client = self.connection._client if job_id is None: job_id = str(uuid.uuid4()) @@ -161,8 +159,7 @@ def execute(self, operation, parameters=None, job_id=None): raise exceptions.DatabaseError(query_job.errors) query_results = query_job.query_results() - self._query_data = iter( - query_results.fetch_data(max_results=self.arraysize)) + self._query_results = query_results self._set_rowcount(query_results) self._set_description(query_results.schema) @@ -178,6 +175,23 @@ def executemany(self, operation, seq_of_parameters): for parameters in seq_of_parameters: self.execute(operation, parameters) + def _try_fetch(self, size=None): + """Try to start fetching data, if not yet started. + + Mutates self to indicate that iteration has started. + """ + if self._query_results is None: + raise exceptions.InterfaceError( + 'No query results: execute() must be called before fetch.') + + if size is None: + size = self.arraysize + + if self._query_data is None: + self._query_data = iter( + self._query_results.fetch_data(max_results=size)) + + def fetchone(self): """Fetch a single row from the results of the last ``execute*()`` call. @@ -188,10 +202,7 @@ def fetchone(self): :raises: :class:`~google.cloud.bigquery.dbapi.InterfaceError` if called before ``execute()``. """ - if self._query_data is None: - raise exceptions.InterfaceError( - 'No query results: execute() must be called before fetch.') - + self._try_fetch() try: return six.next(self._query_data) except StopIteration: @@ -215,17 +226,17 @@ def fetchmany(self, size=None): :raises: :class:`~google.cloud.bigquery.dbapi.InterfaceError` if called before ``execute()``. """ - if self._query_data is None: - raise exceptions.InterfaceError( - 'No query results: execute() must be called before fetch.') if size is None: size = self.arraysize + self._try_fetch(size=size) rows = [] + for row in self._query_data: rows.append(row) if len(rows) >= size: break + return rows def fetchall(self): @@ -236,9 +247,7 @@ def fetchall(self): :raises: :class:`~google.cloud.bigquery.dbapi.InterfaceError` if called before ``execute()``. """ - if self._query_data is None: - raise exceptions.InterfaceError( - 'No query results: execute() must be called before fetch.') + self._try_fetch() return [row for row in self._query_data] def setinputsizes(self, sizes): diff --git a/bigquery/tests/unit/test_dbapi_cursor.py b/bigquery/tests/unit/test_dbapi_cursor.py index 7351db8f670b..be327a8962a2 100644 --- a/bigquery/tests/unit/test_dbapi_cursor.py +++ b/bigquery/tests/unit/test_dbapi_cursor.py @@ -141,8 +141,8 @@ def test_fetchmany_w_arraysize(self): (7, 8, 9), ])) cursor = connection.cursor() - cursor.arraysize = 2 cursor.execute('SELECT a, b, c;') + cursor.arraysize = 2 rows = cursor.fetchmany() self.assertEqual(len(rows), 2) self.assertEqual(rows[0], (1, 2, 3)) From 510837dcdd395ce6cc081d2f38566bd7f5d8af8e Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 18 Aug 2017 17:32:25 -0700 Subject: [PATCH 4/5] max_results in BigQuery API had a different meaning from HTTPIterator. In BigQuery it means the page size, but the HTTPIterator it meant "don't fetch any more pages once you have these many rows." --- bigquery/google/cloud/bigquery/query.py | 7 ++++--- bigquery/google/cloud/bigquery/table.py | 12 ++++++++---- core/google/api/core/page_iterator.py | 9 ++++++--- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/bigquery/google/cloud/bigquery/query.py b/bigquery/google/cloud/bigquery/query.py index c01017af0d30..185b68deb104 100644 --- a/bigquery/google/cloud/bigquery/query.py +++ b/bigquery/google/cloud/bigquery/query.py @@ -440,6 +440,9 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None, if timeout_ms is not None: params['timeoutMs'] = timeout_ms + if max_results is not None: + params['maxResults'] = max_results + path = '/projects/%s/queries/%s' % (self.project, self.name) iterator = page_iterator.HTTPIterator( client=client, @@ -448,12 +451,10 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None, item_to_value=_item_to_row, items_key='rows', page_token=page_token, - max_results=max_results, page_start=_rows_page_start_query, + next_token='pageToken', extra_params=params) iterator.query_result = self - # Over-ride the key used to retrieve the next page token. - iterator._NEXT_TOKEN = 'pageToken' return iterator diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index b26125ec9ef4..9d100c06c711 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -722,6 +722,11 @@ def fetch_data(self, max_results=None, page_token=None, client=None): if len(self._schema) == 0: raise ValueError(_TABLE_HAS_NO_SCHEMA) + params = {} + + if max_results is not None: + params['maxResults'] = max_results + client = self._require_client(client) path = '%s/data' % (self.path,) iterator = page_iterator.HTTPIterator( @@ -731,11 +736,10 @@ def fetch_data(self, max_results=None, page_token=None, client=None): item_to_value=_item_to_row, items_key='rows', page_token=page_token, - max_results=max_results, - page_start=_rows_page_start) + page_start=_rows_page_start, + next_token='pageToken', + extra_params=params) iterator.schema = self._schema - # Over-ride the key used to retrieve the next page token. - iterator._NEXT_TOKEN = 'pageToken' return iterator def row_from_mapping(self, mapping): diff --git a/core/google/api/core/page_iterator.py b/core/google/api/core/page_iterator.py index 23c469f9bc1d..3a38c100cd95 100644 --- a/core/google/api/core/page_iterator.py +++ b/core/google/api/core/page_iterator.py @@ -275,6 +275,8 @@ class HTTPIterator(Iterator): signature takes the :class:`Iterator` that started the page, the :class:`Page` that was started and the dictionary containing the page response. + next_token (str): The name of the field used in the response for page + tokens. .. autoattribute:: pages """ @@ -283,13 +285,13 @@ class HTTPIterator(Iterator): _PAGE_TOKEN = 'pageToken' _MAX_RESULTS = 'maxResults' _NEXT_TOKEN = 'nextPageToken' - _RESERVED_PARAMS = frozenset([_PAGE_TOKEN, _MAX_RESULTS]) + _RESERVED_PARAMS = frozenset([_PAGE_TOKEN]) _HTTP_METHOD = 'GET' def __init__(self, client, api_request, path, item_to_value, items_key=_DEFAULT_ITEMS_KEY, page_token=None, max_results=None, extra_params=None, - page_start=_do_nothing_page_start): + page_start=_do_nothing_page_start, next_token=_NEXT_TOKEN): super(HTTPIterator, self).__init__( client, item_to_value, page_token=page_token, max_results=max_results) @@ -298,6 +300,7 @@ def __init__(self, client, api_request, path, item_to_value, self._items_key = items_key self.extra_params = extra_params self._page_start = page_start + self._next_token = next_token # Verify inputs / provide defaults. if self.extra_params is None: self.extra_params = {} @@ -327,7 +330,7 @@ def _next_page(self): items = response.get(self._items_key, ()) page = Page(self, items, self._item_to_value) self._page_start(self, page, response) - self.next_page_token = response.get(self._NEXT_TOKEN) + self.next_page_token = response.get(self._next_token) return page else: return None From d248c96d5b8d2a6b32d97cb41155da70cba07da5 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Fri, 18 Aug 2017 17:37:43 -0700 Subject: [PATCH 5/5] Fix lint errors --- bigquery/google/cloud/bigquery/dbapi/cursor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/dbapi/cursor.py b/bigquery/google/cloud/bigquery/dbapi/cursor.py index 1371a73685a7..0c56d87231fe 100644 --- a/bigquery/google/cloud/bigquery/dbapi/cursor.py +++ b/bigquery/google/cloud/bigquery/dbapi/cursor.py @@ -191,7 +191,6 @@ def _try_fetch(self, size=None): self._query_data = iter( self._query_results.fetch_data(max_results=size)) - def fetchone(self): """Fetch a single row from the results of the last ``execute*()`` call.