Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pandas_gbq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import logging
import warnings

from pandas_gbq import version as pandas_gbq_version
from pandas_gbq.contexts import Context, context
from pandas_gbq.core.sample import sample

from . import _versions_helpers
from .gbq import read_gbq, to_gbq # noqa
Expand All @@ -21,6 +23,8 @@
FutureWarning,
)

logger = logging.Logger(__name__)

__version__ = pandas_gbq_version.__version__

__all__ = [
Expand All @@ -29,4 +33,5 @@
"read_gbq",
"Context",
"context",
"sample",
]
7 changes: 7 additions & 0 deletions pandas_gbq/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import google.api_core.exceptions

# BigQuery uses powers of 2 in calculating data sizes. See:
# https://cloud.google.com/bigquery/pricing#data The documentation uses
# GiB rather than GB to disambiguate from the alternative base 10 units.
Expand All @@ -10,3 +12,8 @@
BYTES_IN_MIB = 1024 * BYTES_IN_KIB
BYTES_IN_GIB = 1024 * BYTES_IN_MIB
BYTES_TO_RECOMMEND_BIGFRAMES = BYTES_IN_GIB

HTTP_ERRORS = (
google.api_core.exceptions.ClientError,
google.api_core.exceptions.GoogleAPIError,
)
179 changes: 179 additions & 0 deletions pandas_gbq/core/read.py
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from gbq_connector.py for easier reuse.

Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# Copyright (c) 2025 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

from __future__ import annotations

import typing
from typing import Any, Dict, Optional, Sequence
import warnings

import google.cloud.bigquery
import google.cloud.bigquery.table
import numpy as np

import pandas_gbq
import pandas_gbq.constants
import pandas_gbq.exceptions
import pandas_gbq.features
import pandas_gbq.timestamp

# Only import at module-level at type checking time to avoid circular
# dependencies in the pandas package, which has an optional dependency on
# pandas-gbq.
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas


def _bqschema_to_nullsafe_dtypes(schema_fields):
"""Specify explicit dtypes based on BigQuery schema.

This function only specifies a dtype when the dtype allows nulls.
Otherwise, use pandas's default dtype choice.

See: http://pandas.pydata.org/pandas-docs/dev/missing_data.html
#missing-data-casting-rules-and-indexing
"""
import db_dtypes

# If you update this mapping, also update the table at
# `docs/reading.rst`.
dtype_map = {
"FLOAT": np.dtype(float),
"INTEGER": "Int64",
"TIME": db_dtypes.TimeDtype(),
# Note: Other types such as 'datetime64[ns]' and db_types.DateDtype()
# are not included because the pandas range does not align with the
# BigQuery range. We need to attempt a conversion to those types and
# fall back to 'object' when there are out-of-range values.
}

# Amend dtype_map with newer extension types if pandas version allows.
if pandas_gbq.features.FEATURES.pandas_has_boolean_dtype:
dtype_map["BOOLEAN"] = "boolean"

dtypes = {}
for field in schema_fields:
name = str(field["name"])
# Array BigQuery type is represented as an object column containing
# list objects.
if field["mode"].upper() == "REPEATED":
dtypes[name] = "object"
continue

dtype = dtype_map.get(field["type"].upper())
if dtype:
dtypes[name] = dtype

return dtypes


def _finalize_dtypes(
df: pandas.DataFrame, schema_fields: Sequence[Dict[str, Any]]
) -> pandas.DataFrame:
"""
Attempt to change the dtypes of those columns that don't map exactly.

For example db_dtypes.DateDtype() and datetime64[ns] cannot represent
0001-01-01, but they can represent dates within a couple hundred years of
1970. See:
https://github.com/googleapis/python-bigquery-pandas/issues/365
"""
import db_dtypes
import pandas.api.types

# If you update this mapping, also update the table at
# `docs/reading.rst`.
dtype_map = {
"DATE": db_dtypes.DateDtype(),
"DATETIME": "datetime64[ns]",
"TIMESTAMP": "datetime64[ns]",
}

for field in schema_fields:
# This method doesn't modify ARRAY/REPEATED columns.
if field["mode"].upper() == "REPEATED":
continue

name = str(field["name"])
dtype = dtype_map.get(field["type"].upper())

# Avoid deprecated conversion to timezone-naive dtype by only casting
# object dtypes.
if dtype and pandas.api.types.is_object_dtype(df[name]):
df[name] = df[name].astype(dtype, errors="ignore")

# Ensure any TIMESTAMP columns are tz-aware.
df = pandas_gbq.timestamp.localize_df(df, schema_fields)

return df


def download_results(
results: google.cloud.bigquery.table.RowIterator,
*,
bqclient: google.cloud.bigquery.Client,
progress_bar_type: Optional[str],
warn_on_large_results: bool = True,
max_results: Optional[int],
user_dtypes: Optional[dict],
use_bqstorage_api: bool,
) -> Optional[pandas.DataFrame]:
# No results are desired, so don't bother downloading anything.
if max_results == 0:
return None

if user_dtypes is None:
user_dtypes = {}

create_bqstorage_client = use_bqstorage_api
if max_results is not None:
create_bqstorage_client = False

# If we're downloading a large table, BigQuery DataFrames might be a
# better fit. Not all code paths will populate rows_iter._table, but
# if it's not populated that means we are working with a small result
# set.
if (
warn_on_large_results
and (table_ref := getattr(results, "_table", None)) is not None
):
table = bqclient.get_table(table_ref)
if (
isinstance((num_bytes := table.num_bytes), int)
and num_bytes > pandas_gbq.constants.BYTES_TO_RECOMMEND_BIGFRAMES
):
num_gib = num_bytes / pandas_gbq.constants.BYTES_IN_GIB
warnings.warn(
f"Recommendation: Your results are {num_gib:.1f} GiB. "
"Consider using BigQuery DataFrames (https://bit.ly/bigframes-intro)"
"to process large results with pandas compatible APIs with transparent SQL "
"pushdown to BigQuery engine. This provides an opportunity to save on costs "
"and improve performance. "
"Please reach out to [email protected] with any "
"questions or concerns. To disable this message, run "
"warnings.simplefilter('ignore', category=pandas_gbq.exceptions.LargeResultsWarning)",
category=pandas_gbq.exceptions.LargeResultsWarning,
# user's code
# -> read_gbq
# -> run_query
# -> download_results
stacklevel=4,
)

try:
schema_fields = [field.to_api_repr() for field in results.schema]
conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
conversion_dtypes.update(user_dtypes)
df = results.to_dataframe(
dtypes=conversion_dtypes,
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
)
except pandas_gbq.constants.HTTP_ERRORS as ex:
raise pandas_gbq.exceptions.translate_exception(ex) from ex

df = _finalize_dtypes(df, schema_fields)

pandas_gbq.logger.debug("Got {} rows.\n".format(results.total_rows))
return df
Loading
Loading