Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ Features
client project. Specify the target table ID as ``project.dataset.table`` to
use this feature. (:issue:`321`, :issue:`347`)

Bug fixes
~~~~~~~~~

- Avoid 403 error from ``to_gbq`` when table has ``policyTags``. (:issue:`354`)

Dependencies
~~~~~~~~~~~~

Expand Down
95 changes: 95 additions & 0 deletions pandas_gbq/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Module for checking dependency versions and supported features."""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out of gbq.py, because load.py also needs some of this logic now.


# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
BIGQUERY_MINIMUM_VERSION = "1.11.1"
BIGQUERY_CLIENT_INFO_VERSION = "1.12.0"
BIGQUERY_BQSTORAGE_VERSION = "1.24.0"
BIGQUERY_FROM_DATAFRAME_CSV_VERSION = "2.6.0"
PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0"


class Features:
def __init__(self):
self._bigquery_installed_version = None
self._pandas_installed_version = None

@property
def bigquery_installed_version(self):
import google.cloud.bigquery
import pkg_resources

if self._bigquery_installed_version is not None:
return self._bigquery_installed_version

self._bigquery_installed_version = pkg_resources.parse_version(
google.cloud.bigquery.__version__
)
bigquery_minimum_version = pkg_resources.parse_version(
BIGQUERY_MINIMUM_VERSION
)

if self._bigquery_installed_version < bigquery_minimum_version:
raise ImportError(
"pandas-gbq requires google-cloud-bigquery >= {0}, "
"current version {1}".format(
bigquery_minimum_version, self._bigquery_installed_version
)
)

return self._bigquery_installed_version

@property
def bigquery_has_client_info(self):
import pkg_resources

bigquery_client_info_version = pkg_resources.parse_version(
BIGQUERY_CLIENT_INFO_VERSION
)
return self.bigquery_installed_version >= bigquery_client_info_version

@property
def bigquery_has_bqstorage(self):
import pkg_resources

bigquery_bqstorage_version = pkg_resources.parse_version(
BIGQUERY_BQSTORAGE_VERSION
)
return self.bigquery_installed_version >= bigquery_bqstorage_version

@property
def bigquery_has_from_dataframe_with_csv(self):
import pkg_resources

bigquery_from_dataframe_version = pkg_resources.parse_version(
BIGQUERY_FROM_DATAFRAME_CSV_VERSION
)
return (
self.bigquery_installed_version >= bigquery_from_dataframe_version
)

@property
def pandas_installed_version(self):
import pandas
import pkg_resources

if self._pandas_installed_version is not None:
return self._pandas_installed_version

self._pandas_installed_version = pkg_resources.parse_version(
pandas.__version__
)
return self._pandas_installed_version

@property
def pandas_has_deprecated_verbose(self):
import pkg_resources

# Add check for Pandas version before showing deprecation warning.
# https://github.com/pydata/pandas-gbq/issues/157
pandas_verbosity_deprecation = pkg_resources.parse_version(
PANDAS_VERBOSITY_DEPRECATION_VERSION
)
return self.pandas_installed_version >= pandas_verbosity_deprecation


FEATURES = Features()
100 changes: 20 additions & 80 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,100 +16,45 @@

from pandas_gbq.exceptions import AccessDenied
from pandas_gbq.exceptions import PerformanceWarning
from pandas_gbq import features
from pandas_gbq.features import FEATURES
import pandas_gbq.schema
import pandas_gbq.timestamp


logger = logging.getLogger(__name__)

BIGQUERY_INSTALLED_VERSION = None
BIGQUERY_CLIENT_INFO_VERSION = "1.12.0"
BIGQUERY_BQSTORAGE_VERSION = "1.24.0"
HAS_CLIENT_INFO = False
HAS_BQSTORAGE_SUPPORT = False

try:
import tqdm # noqa
except ImportError:
tqdm = None


def _check_google_client_version():
global BIGQUERY_INSTALLED_VERSION, HAS_CLIENT_INFO, HAS_BQSTORAGE_SUPPORT, SHOW_VERBOSE_DEPRECATION

try:
import pkg_resources

except ImportError:
raise ImportError("Could not import pkg_resources (setuptools).")

# https://github.com/googleapis/python-bigquery/blob/master/CHANGELOG.md
bigquery_minimum_version = pkg_resources.parse_version("1.11.0")
bigquery_client_info_version = pkg_resources.parse_version(
BIGQUERY_CLIENT_INFO_VERSION
)
bigquery_bqstorage_version = pkg_resources.parse_version(
BIGQUERY_BQSTORAGE_VERSION
)
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
"google-cloud-bigquery"
).parsed_version

HAS_CLIENT_INFO = (
BIGQUERY_INSTALLED_VERSION >= bigquery_client_info_version
)
HAS_BQSTORAGE_SUPPORT = (
BIGQUERY_INSTALLED_VERSION >= bigquery_bqstorage_version
)

if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version:
raise ImportError(
"pandas-gbq requires google-cloud-bigquery >= {0}, "
"current version {1}".format(
bigquery_minimum_version, BIGQUERY_INSTALLED_VERSION
)
)

# Add check for Pandas version before showing deprecation warning.
# https://github.com/pydata/pandas-gbq/issues/157
pandas_installed_version = pkg_resources.get_distribution(
"pandas"
).parsed_version
pandas_version_wo_verbosity = pkg_resources.parse_version("0.23.0")
SHOW_VERBOSE_DEPRECATION = (
pandas_installed_version >= pandas_version_wo_verbosity
)


def _test_google_api_imports():
try:
import pkg_resources # noqa
except ImportError as ex:
raise ImportError("pandas-gbq requires setuptools") from ex

try:
import pydata_google_auth # noqa
except ImportError as ex:
raise ImportError(
"pandas-gbq requires pydata-google-auth: {0}".format(ex)
)
raise ImportError("pandas-gbq requires pydata-google-auth") from ex

try:
from google_auth_oauthlib.flow import InstalledAppFlow # noqa
except ImportError as ex:
raise ImportError(
"pandas-gbq requires google-auth-oauthlib: {0}".format(ex)
)
raise ImportError("pandas-gbq requires google-auth-oauthlib") from ex

try:
import google.auth # noqa
except ImportError as ex:
raise ImportError("pandas-gbq requires google-auth: {0}".format(ex))
raise ImportError("pandas-gbq requires google-auth") from ex

try:
from google.cloud import bigquery # noqa
except ImportError as ex:
raise ImportError(
"pandas-gbq requires google-cloud-bigquery: {0}".format(ex)
)

_check_google_client_version()
raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex


class DatasetCreationError(ValueError):
Expand Down Expand Up @@ -416,7 +361,7 @@ def get_client(self):
# In addition to new enough version of google-api-core, a new enough
# version of google-cloud-bigquery is required to populate the
# client_info.
if HAS_CLIENT_INFO:
if FEATURES.bigquery_has_client_info:
return bigquery.Client(
project=self.project_id,
credentials=self.credentials,
Expand Down Expand Up @@ -550,14 +495,15 @@ def _download_results(
if user_dtypes is None:
user_dtypes = {}

if self.use_bqstorage_api and not HAS_BQSTORAGE_SUPPORT:
if self.use_bqstorage_api and not FEATURES.bigquery_has_bqstorage:
warnings.warn(
(
"use_bqstorage_api was set, but have google-cloud-bigquery "
"version {}. Requires google-cloud-bigquery version "
"{} or later."
).format(
BIGQUERY_INSTALLED_VERSION, BIGQUERY_BQSTORAGE_VERSION
FEATURES.bigquery_installed_version,
features.BIGQUERY_BQSTORAGE_VERSION,
),
PerformanceWarning,
stacklevel=4,
Expand All @@ -568,7 +514,7 @@ def _download_results(
create_bqstorage_client = False

to_dataframe_kwargs = {}
if HAS_BQSTORAGE_SUPPORT:
if FEATURES.bigquery_has_bqstorage:
to_dataframe_kwargs[
"create_bqstorage_client"
] = create_bqstorage_client
Expand Down Expand Up @@ -880,7 +826,7 @@ def read_gbq(

_test_google_api_imports()

if verbose is not None and SHOW_VERBOSE_DEPRECATION:
if verbose is not None and FEATURES.pandas_has_deprecated_verbose:
warnings.warn(
"verbose is deprecated and will be removed in "
"a future version. Set logging level in order to vary "
Expand Down Expand Up @@ -1054,7 +1000,7 @@ def to_gbq(

_test_google_api_imports()

if verbose is not None and SHOW_VERBOSE_DEPRECATION:
if verbose is not None and FEATURES.pandas_has_deprecated_verbose:
warnings.warn(
"verbose is deprecated and will be removed in "
"a future version. Set logging level in order to vary "
Expand Down Expand Up @@ -1133,8 +1079,8 @@ def to_gbq(
"schema of the destination table."
)

# Update the local `table_schema` so mode matches.
# See: https://github.com/pydata/pandas-gbq/issues/315
# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)
Expand Down Expand Up @@ -1252,7 +1198,6 @@ def create(self, table_id, schema):
dataframe.
"""
from google.cloud.bigquery import DatasetReference
from google.cloud.bigquery import SchemaField
from google.cloud.bigquery import Table
from google.cloud.bigquery import TableReference

Expand All @@ -1274,12 +1219,7 @@ def create(self, table_id, schema):
DatasetReference(self.project_id, self.dataset_id), table_id
)
table = Table(table_ref)

schema = pandas_gbq.schema.add_default_nullable_mode(schema)

table.schema = [
SchemaField.from_api_repr(field) for field in schema["fields"]
]
table.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)

try:
self.client.create_table(table)
Expand Down
48 changes: 30 additions & 18 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from google.cloud import bigquery

from pandas_gbq.features import FEATURES
import pandas_gbq.schema


Expand All @@ -30,21 +31,21 @@ def encode_chunk(dataframe):
return io.BytesIO(body)


def encode_chunks(dataframe, chunksize=None):
def split_dataframe(dataframe, chunksize=None):
dataframe = dataframe.reset_index(drop=True)
if chunksize is None:
yield 0, encode_chunk(dataframe)
yield 0, dataframe
return

remaining_rows = len(dataframe)
total_rows = remaining_rows
start_index = 0
while start_index < total_rows:
end_index = start_index + chunksize
chunk_buffer = encode_chunk(dataframe[start_index:end_index])
chunk = dataframe[start_index:end_index]
start_index += chunksize
remaining_rows = max(0, remaining_rows - chunksize)
yield remaining_rows, chunk_buffer
yield remaining_rows, chunk


def load_chunks(
Expand All @@ -60,24 +61,35 @@ def load_chunks(
job_config.source_format = "CSV"
job_config.allow_quoted_newlines = True

if schema is None:
# Explicit schema? Use that!
if schema is not None:
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
# If not, let BigQuery determine schema unless we are encoding the CSV files ourselves.
elif not FEATURES.bigquery_has_from_dataframe_with_csv:
schema = pandas_gbq.schema.generate_bq_schema(dataframe)
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)

schema = pandas_gbq.schema.add_default_nullable_mode(schema)
chunks = split_dataframe(dataframe, chunksize=chunksize)
for remaining_rows, chunk in chunks:
yield remaining_rows

job_config.schema = [
bigquery.SchemaField.from_api_repr(field) for field in schema["fields"]
]

chunks = encode_chunks(dataframe, chunksize=chunksize)
for remaining_rows, chunk_buffer in chunks:
try:
yield remaining_rows
client.load_table_from_file(
chunk_buffer,
if FEATURES.bigquery_has_from_dataframe_with_csv:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly necessary (just the "omit policyTags" logic was), but I thought this might be a good opportunity to use more logic from google-cloud-bigquery, per #339

The CSV encoding in google-cloud-bigquery is still relatively new, so I didn't want to bump our minimum google-cloud-bigquery versions yet. Discussion: https://github.com/pydata/pandas-gbq/issues/357

client.load_table_from_dataframe(
chunk,
destination_table_ref,
job_config=job_config,
location=location,
).result()
finally:
chunk_buffer.close()
else:
try:
chunk_buffer = encode_chunk(chunk)
client.load_table_from_file(
chunk_buffer,
destination_table_ref,
job_config=job_config,
location=location,
).result()
finally:
chunk_buffer.close()
Loading