Skip to content

Commit

Permalink
feat(bigquery): add read_csv, read_json, read_parquet support
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Nov 15, 2023
1 parent c3cf316 commit ff83110
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 62 deletions.
139 changes: 139 additions & 0 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

from __future__ import annotations

import concurrent.futures
import contextlib
import glob
import os
import re
import warnings
from functools import partial
Expand Down Expand Up @@ -40,6 +43,7 @@

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping
from pathlib import Path

import pyarrow as pa
from google.cloud.bigquery.table import RowIterator
Expand Down Expand Up @@ -147,6 +151,141 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
)
load_job.result()

def _read_file(
self,
path: str | Path,
*,
table_name: str | None = None,
job_config: bq.LoadJobConfig,
) -> ir.Table:
self._make_session()

if table_name is None:
table_name = util.gen_name(f"bq_read_{job_config.source_format}")

table_ref = self._session_dataset.table(table_name)

schema = self._session_dataset.dataset_id
database = self._session_dataset.project

# drop the table if it exists
#
# we could do this with write_disposition = WRITE_TRUNCATE but then the
# concurrent append jobs aren't possible
#
# dropping the table first means all write_dispositions can be
# WRITE_APPEND
self.drop_table(table_name, schema=schema, database=database, force=True)

if os.path.isdir(path):
raise NotImplementedError("Reading from a directory is not supported.")
elif str(path).startswith("gs://"):
load_job = self.client.load_table_from_uri(
path, table_ref, job_config=job_config
)
load_job.result()
else:

def load(file: str) -> None:
with open(file, mode="rb") as f:
load_job = self.client.load_table_from_file(
f, table_ref, job_config=job_config
)
load_job.result()

job_config.write_disposition = bq.WriteDisposition.WRITE_APPEND

with concurrent.futures.ThreadPoolExecutor() as executor:
for fut in concurrent.futures.as_completed(
executor.submit(load, file) for file in glob.glob(str(path))
):
fut.result()

return self.table(table_name, schema=schema, database=database)

def read_parquet(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
):
"""Read Parquet data into a BigQuery table.
Parameters
----------
path
Path to a Parquet file on GCS or the local filesystem. Globs are supported.
table_name
Optional table name
kwargs
Additional keyword arguments passed to `google.cloud.bigquery.LoadJobConfig`.
Returns
-------
Table
An Ibis table expression
"""
return self._read_file(
path,
table_name=table_name,
job_config=bq.LoadJobConfig(
source_format=bq.SourceFormat.PARQUET, **kwargs
),
)

def read_csv(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
"""Read CSV data into a BigQuery table.
Parameters
----------
path
Path to a CSV file on GCS or the local filesystem. Globs are supported.
table_name
Optional table name
kwargs
Additional keyword arguments passed to
`google.cloud.bigquery.LoadJobConfig`.
Returns
-------
Table
An Ibis table expression
"""
job_config = bq.LoadJobConfig(
source_format=bq.SourceFormat.CSV,
autodetect=True,
skip_leading_rows=1,
**kwargs,
)
return self._read_file(path, table_name=table_name, job_config=job_config)

def read_json(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
) -> ir.Table:
"""Read newline-delimited JSON data into a BigQuery table.
Parameters
----------
path
Path to a newline-delimited JSON file on GCS or the local
filesystem. Globs are supported.
table_name
Optional table name
kwargs
Additional keyword arguments passed to
`google.cloud.bigquery.LoadJobConfig`.
Returns
-------
Table
An Ibis table expression
"""
job_config = bq.LoadJobConfig(
source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
**kwargs,
)
return self._read_file(path, table_name=table_name, job_config=job_config)

def _from_url(self, url: str, **kwargs):
result = urlparse(url)
params = parse_qs(result.query)
Expand Down
78 changes: 16 additions & 62 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,29 +389,16 @@ def test_register_garbage(con, monkeypatch):


@pytest.mark.parametrize(
("fname", "in_table_name", "out_table_name"),
("fname", "in_table_name"),
[
(
"functional_alltypes.parquet",
None,
"ibis_read_parquet",
),
("functional_alltypes.parquet", "funk_all", "funk_all"),
("functional_alltypes.parquet", None),
("functional_alltypes.parquet", "funk_all"),
],
)
@pytest.mark.notyet(
[
"bigquery",
"flink",
"impala",
"mssql",
"mysql",
"postgres",
"sqlite",
"trino",
]
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
)
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name, out_table_name):
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
pq = pytest.importorskip("pyarrow.parquet")

fname = Path(fname)
Expand All @@ -426,10 +413,9 @@ def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name, out_table_n
fname = str(Path(fname).absolute())
table = con.read_parquet(fname, table_name=in_table_name)

assert any(out_table_name in t for t in con.list_tables())

if con.name != "datafusion":
table.count().execute()
if in_table_name is not None:
assert table.op().name == in_table_name
assert table.count().execute()


@pytest.fixture(scope="module")
Expand All @@ -441,17 +427,7 @@ def ft_data(data_dir):


@pytest.mark.notyet(
[
"bigquery",
"flink",
"impala",
"mssql",
"mysql",
"pandas",
"postgres",
"sqlite",
"trino",
]
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
)
def test_read_parquet_glob(con, tmp_path, ft_data):
pq = pytest.importorskip("pyarrow.parquet")
Expand All @@ -470,17 +446,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data):


@pytest.mark.notyet(
[
"bigquery",
"flink",
"impala",
"mssql",
"mysql",
"pandas",
"postgres",
"sqlite",
"trino",
]
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
)
def test_read_csv_glob(con, tmp_path, ft_data):
pc = pytest.importorskip("pyarrow.csv")
Expand All @@ -500,7 +466,6 @@ def test_read_csv_glob(con, tmp_path, ft_data):

@pytest.mark.notyet(
[
"bigquery",
"clickhouse",
"dask",
"datafusion",
Expand Down Expand Up @@ -554,33 +519,22 @@ def num_diamonds(data_dir):


@pytest.mark.parametrize(
("in_table_name", "out_table_name"),
[
param(None, "ibis_read_csv_", id="default"),
param("fancy_stones", "fancy_stones", id="file_name"),
],
"in_table_name",
[param(None, id="default"), param("fancy_stones", id="file_name")],
)
@pytest.mark.notyet(
[
"bigquery",
"flink",
"impala",
"mssql",
"mysql",
"postgres",
"sqlite",
"trino",
]
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
)
def test_read_csv(con, data_dir, in_table_name, out_table_name, num_diamonds):
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
fname = "diamonds.csv"
with pushd(data_dir / "csv"):
if con.name == "pyspark":
# pyspark doesn't respect CWD
fname = str(Path(fname).absolute())
table = con.read_csv(fname, table_name=in_table_name)

assert any(out_table_name in t for t in con.list_tables())
if in_table_name is not None:
assert table.op().name == in_table_name

special_types = DIAMONDS_COLUMN_TYPES.get(con.name, {})

Expand Down

0 comments on commit ff83110

Please sign in to comment.