Skip to content

Commit 6cdf64b

Browse files
fix: Fix issue with stream upload batch size upload limit (#2290)
1 parent c4cb39d commit 6cdf64b

File tree

3 files changed

+86
-12
lines changed

3 files changed

+86
-12
lines changed

bigframes/core/local_data.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,13 @@ def to_arrow(
124124
geo_format: Literal["wkb", "wkt"] = "wkt",
125125
duration_type: Literal["int", "duration"] = "duration",
126126
json_type: Literal["string"] = "string",
127+
max_chunksize: Optional[int] = None,
127128
) -> tuple[pa.Schema, Iterable[pa.RecordBatch]]:
128129
if geo_format != "wkt":
129130
raise NotImplementedError(f"geo format {geo_format} not yet implemented")
130131
assert json_type == "string"
131132

132-
batches = self.data.to_batches()
133+
batches = self.data.to_batches(max_chunksize=max_chunksize)
133134
schema = self.data.schema
134135
if duration_type == "int":
135136
schema = _schema_durations_to_ints(schema)

bigframes/session/loader.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import datetime
2020
import io
2121
import itertools
22+
import math
2223
import os
2324
import typing
2425
from typing import (
@@ -397,6 +398,15 @@ def stream_data(
397398
offsets_col: str,
398399
) -> bq_data.BigqueryDataSource:
399400
"""Load managed data into bigquery"""
401+
MAX_BYTES = 10000000 # streaming api has 10MB limit
402+
SAFETY_MARGIN = (
403+
40 # Perf seems bad for large chunks, so do 40x smaller than max
404+
)
405+
batch_count = math.ceil(
406+
data.metadata.total_bytes / (MAX_BYTES // SAFETY_MARGIN)
407+
)
408+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
409+
400410
schema_w_offsets = data.schema.append(
401411
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
402412
)
@@ -410,16 +420,24 @@ def stream_data(
410420
)
411421
rows_w_offsets = ((*row, offset) for offset, row in enumerate(rows))
412422

413-
for errors in self._bqclient.insert_rows(
414-
load_table_destination,
415-
rows_w_offsets,
416-
selected_fields=bq_schema,
417-
row_ids=map(str, itertools.count()), # used to ensure only-once insertion
418-
):
419-
if errors:
420-
raise ValueError(
421-
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
422-
)
423+
# TODO: don't use batched
424+
batches = _batched(rows_w_offsets, rows_per_batch)
425+
ids_iter = map(str, itertools.count())
426+
427+
for batch in batches:
428+
batch_rows = list(batch)
429+
row_ids = itertools.islice(ids_iter, len(batch_rows))
430+
431+
for errors in self._bqclient.insert_rows(
432+
load_table_destination,
433+
batch_rows,
434+
selected_fields=bq_schema,
435+
row_ids=row_ids, # used to ensure only-once insertion
436+
):
437+
if errors:
438+
raise ValueError(
439+
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
440+
)
423441
destination_table = self._bqclient.get_table(load_table_destination)
424442
return bq_data.BigqueryDataSource(
425443
bq_data.GbqTable.from_table(destination_table),
@@ -434,6 +452,15 @@ def write_data(
434452
offsets_col: str,
435453
) -> bq_data.BigqueryDataSource:
436454
"""Load managed data into bigquery"""
455+
MAX_BYTES = 10000000 # streaming api has 10MB limit
456+
SAFETY_MARGIN = (
457+
4 # aim for 2.5mb to account for row variance, format differences, etc.
458+
)
459+
batch_count = math.ceil(
460+
data.metadata.total_bytes / (MAX_BYTES // SAFETY_MARGIN)
461+
)
462+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
463+
437464
schema_w_offsets = data.schema.append(
438465
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
439466
)
@@ -450,7 +477,9 @@ def write_data(
450477

451478
def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]:
452479
schema, batches = data.to_arrow(
453-
offsets_col=offsets_col, duration_type="int"
480+
offsets_col=offsets_col,
481+
duration_type="int",
482+
max_chunksize=rows_per_batch,
454483
)
455484
offset = 0
456485
for batch in batches:
@@ -1332,3 +1361,10 @@ def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype):
13321361
f"Nested JSON types, found in column `{name}`: `{column_type}`', "
13331362
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
13341363
)
1364+
1365+
1366+
# itertools.batched not available in python <3.12, so we use this instead
1367+
def _batched(iterator: Iterable, n: int) -> Iterable:
1368+
assert n > 0
1369+
while batch := tuple(itertools.islice(iterator, n)):
1370+
yield batch

tests/system/large/test_session.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,50 @@
1717

1818
import google.cloud.bigquery as bigquery
1919
import google.cloud.exceptions
20+
import numpy as np
21+
import pandas as pd
2022
import pytest
2123

2224
import bigframes
2325
import bigframes.pandas as bpd
2426
import bigframes.session._io.bigquery
2527

2628

29+
@pytest.fixture
30+
def large_pd_df():
31+
nrows = 1000000
32+
33+
np_int1 = np.random.randint(0, 1000, size=nrows, dtype=np.int32)
34+
np_int2 = np.random.randint(10000, 20000, size=nrows, dtype=np.int64)
35+
np_bool = np.random.choice([True, False], size=nrows)
36+
np_float1 = np.random.rand(nrows).astype(np.float32)
37+
np_float2 = np.random.normal(loc=50.0, scale=10.0, size=nrows).astype(np.float64)
38+
39+
return pd.DataFrame(
40+
{
41+
"int_col_1": np_int1,
42+
"int_col_2": np_int2,
43+
"bool_col": np_bool,
44+
"float_col_1": np_float1,
45+
"float_col_2": np_float2,
46+
}
47+
)
48+
49+
50+
@pytest.mark.parametrize(
51+
("write_engine"),
52+
[
53+
("bigquery_load"),
54+
("bigquery_streaming"),
55+
("bigquery_write"),
56+
],
57+
)
58+
def test_read_pandas_large_df(session, large_pd_df, write_engine: str):
59+
df = session.read_pandas(large_pd_df, write_engine=write_engine)
60+
assert len(df.peek(5)) == 5
61+
assert len(large_pd_df) == 1000000
62+
63+
2764
def test_close(session: bigframes.Session):
2865
# we will create two tables and confirm that they are deleted
2966
# when the session is closed

0 commit comments

Comments
 (0)