Skip to content

Commit

Permalink
[BUG] Fix parquet reads with limit across row groups (#2751)
Browse files Browse the repository at this point in the history
When reading local parquet files containing multiple row groups with a
limit applied, sometimes the resulting table does not respect the given
limit, causing errors such as `DaftError::ValueError While building a
Table with Table::new_with_size, we found that the Series lengths did
not match. Series named: col had length: 2048 vs the specified Table
length: 1034` to be thrown.

The issue was a small bug where each row group range being read would
take the global limit passed into the parquet read, instead of the
pre-computed row group limit, which is aware of how many rows had been
read by previous row groups. This caused the parquet reader to read more
rows from a row group than specified.

To fix this, we pass the pre-computed row group limit properly to the
reader.

For example, consider a parquet file with the following layout:

```
Column: col
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  1       5.00 B     5 B       
  0-1    data  S R  1024    0.01 B     11 B                0       "b" / "b"
  1-D    dict  S _  1       5.00 B     5 B       
  1-1    data  S R  1024    0.01 B     11 B                0       "b" / "b"
  2-D    dict  S _  1       5.00 B     5 B       
  2-1    data  S R  1024    0.01 B     11 B                0       "b" / "b"
  3-D    dict  S _  1       5.00 B     5 B       
  3-1    data  S R  1024    0.01 B     11 B                0       "b" / "b"
```

When applying a `.limit(1050)` over this parquet file, with the bug, we
would read 1024 rows each from row groups 0 and 1 (data pages `0-1` and
`1-1`). Row groups 2 and 3 are skipped because the pre-computed row
ranges sees that we have the required `1050` rows in the first two row
groups. However, the pre-computed row ranges are aware that we only need
26 entries from row group 1, so we simply pass this information
correctly into the reader.
  • Loading branch information
desmondcheongzx authored Aug 31, 2024
1 parent 3a7a0b4 commit a97d871
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/daft-parquet/src/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ pub(crate) fn local_parquet_read_into_arrow(
rg,
schema.fields.clone(),
Some(chunk_size),
num_rows,
Some(rg_range.num_rows),
None,
);
let single_rg_column_iter = single_rg_column_iter?;
Expand Down
39 changes: 37 additions & 2 deletions tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tempfile
import uuid

import numpy as np
import pyarrow as pa
import pyarrow.parquet as papq
import pytest
Expand Down Expand Up @@ -159,8 +160,6 @@ def test_row_groups():
@pytest.mark.integration()
@pytest.mark.parametrize("chunk_size", [5, 1024, 2048, 4096])
def test_parquet_rows_cross_page_boundaries(tmpdir, minio_io_config, chunk_size):
import numpy as np

int64_min = -(2**63)
int64_max = 2**63 - 1

Expand Down Expand Up @@ -346,3 +345,39 @@ def test_parquet_helper(data_and_type, use_daft_writer):
# One column uses a single dictionary-encoded data page, and the other contains data pages with
# 4096 values each.
test_parquet_helper(get_string_data_and_type(8192, 300, 1), True)


@pytest.mark.integration()
def test_parquet_limits_across_row_groups(tmpdir, minio_io_config):
test_row_group_size = 1024
daft_execution_config = daft.context.get_context().daft_execution_config
default_row_group_size = daft_execution_config.parquet_target_row_group_size
int_array = np.full(shape=4096, fill_value=3, dtype=np.int32)
before = daft.from_pydict({"col": pa.array(int_array, type=pa.int32())})
file_path = f"{tmpdir}/{str(uuid.uuid4())}.parquet"
# Decrease the target row group size before writing the parquet file.
daft.set_execution_config(parquet_target_row_group_size=test_row_group_size)
before.write_parquet(file_path)
assert (
before.limit(test_row_group_size + 10).to_arrow()
== daft.read_parquet(file_path).limit(test_row_group_size + 10).to_arrow()
)
assert (
before.limit(test_row_group_size * 2).to_arrow()
== daft.read_parquet(file_path).limit(test_row_group_size * 2).to_arrow()
)

bucket_name = "my-bucket"
s3_path = f"s3://{bucket_name}/my-folder"
with minio_create_bucket(minio_io_config=minio_io_config, bucket_name=bucket_name):
before.write_parquet(s3_path, io_config=minio_io_config)
assert (
before.limit(test_row_group_size + 10).to_arrow()
== daft.read_parquet(s3_path, io_config=minio_io_config).limit(test_row_group_size + 10).to_arrow()
)
assert (
before.limit(test_row_group_size * 2).to_arrow()
== daft.read_parquet(s3_path, io_config=minio_io_config).limit(test_row_group_size * 2).to_arrow()
)
# Reset the target row group size.
daft.set_execution_config(parquet_target_row_group_size=default_row_group_size)

0 comments on commit a97d871

Please sign in to comment.