Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix parquet reads when a top-level column's final row spans more than one data page #2586

Conversation

desmondcheongzx
Copy link
Contributor

@desmondcheongzx desmondcheongzx commented Jul 31, 2024

This PR addresses issue #2537 by fixing parquet reads in the case where the final row of a top-level column spans more than one data page.

Previously, it was implicitly assumed that all nested values for a top-level row was stored in the same data page. So once we had a nested value for each top-level row, we assumed that it meant we had read the final data page for a column.

However, this assumption isn't guaranteed in parquet. In the event that the final row of a top-level column spans multiple data pages, we stop reading data pages for the column at the first page where we first encountered the row. This could either lead to data loss (we stopped reading nested data early) or cause Daft to error out (in the case where the nested fields are in a struct for example, one field having a different number of values from other fields causes Daft to throw an error).

To fix this, in addition to checking that we've seen every row before we stop reading data pages for a column, we also check the number of values we've read against the number of values we expect from the column's metadata.

Example

Consider the following set up:

  • We have a dataframe with the schema <nested list<struct<field1 string, field2 string>>>.
  • The strings in nested.field1 are large and unique enough that dictionary encoding falls back to plain encoding.
  • Each data page stores at most 1024 of nested.field1's strings.
  • We have two top-level rows in our dataframe, with the first row containing a list of length 1, and the second row containing a list of length 3072.

This gives us the following column layout within the parquet file:

Column: nested.list.item.field1
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  1024    2.931 kB   2.931 MB  
  0-1    data  S R  1024    1.27 B     1.270 kB            0       "a" / "zzeirptdzfmtwpousxsenxqaz..."
  0-2    data  S _  1024    2.934 kB   2.934 MB            0       "aaddwlmkjpjkueymxhffkwvqk..." / "zzhwmmqrhngwpqbusvfdbrzqw..."
  0-3    data  S _  1024    2.934 kB   2.934 MB            0       "aakrkhbhhknfwffdlisrbymvf..." / "zwuuxcjwkdxkbkjrehcriurzt..."
  0-4    data  S _  1       2.945 kB   2.945 kB            0       "brweggkctrffxwkkfcpkegfzz..." / "brweggkctrffxwkkfcpkegfzz..."


Column: nested.list.item.field2
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  2       5.00 B     10 B      
  0-1    data  S R  3073    0.01 B     22 B                0       "b" / "r"

Since the last row of nested.field contains 3072 values, it overflows page 0-1 and goes into pages 0-2, 0-3, and 0-4.

If we stop reading data pages once we've seen a value from every top-level row in a column, then we would only read column nested.field up to page 0-1, and miss the next three pages of data. Instead, we can use the column's metadata so that we know to expect 3072 values and continue processing data pages until we've seen all these values.

@desmondcheongzx desmondcheongzx added the bug Something isn't working label Jul 31, 2024
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me! Thanks!

@@ -184,7 +184,11 @@ pub fn next_dict<K: DictionaryKey, I: Pages, F: Fn(&DictPage) -> Box<dyn Array>>
init,
items,
None,
remaining,
(remaining, &mut 0), // TODO(issue#2537): Daft does not currently support Arrow's
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to panic here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sweating a little remembering my kernel days with Greg KH telling off the email thread for using WARN instead of throwing an error.

But since we expect this code to be unreachable for now I guess it's fine. Changed this to panic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL! Yeah I think it's better for now to be explict and have the user see the error rather than return weird results

@@ -355,10 +355,11 @@ pub(super) fn extend<'a, D: NestedDecoder<'a>>(
init: &[InitNested],
items: &mut VecDeque<(NestedState, D::DecodedState)>,
dict: Option<&'a D::Dictionary>,
remaining: &mut usize,
remaining: (&mut usize, &mut i64),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep both these types as usize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, change num_values to usize at all places

@desmondcheongzx desmondcheongzx merged commit 491886f into Eventual-Inc:main Aug 1, 2024
42 checks passed
@desmondcheongzx desmondcheongzx deleted the fix-parquet-reads-for-cross-page-rows branch August 1, 2024 04:41
desmondcheongzx added a commit that referenced this pull request Aug 13, 2024
#2586 did not consider the case when `.show()` or `.limit()` was used.
In these cases, the result is typically truncated to a few rows of data.
With the fix in #2586, when calling `.show()` or `.limit()` on a parquet
file with a `list` column, our parquet reader would read every data page
until all values have been read despite only needing a few data pages.
This gives result Series that have mismatching lengths, e.g. one might
see an error: `daft.exceptions.DaftCoreException: DaftError::ValueError
While building a Table with Table::new_with_size, we found that the
Series lengths did not match. Series named: line_ids had length: 23054
vs the specified Table length: 8`.

This PR fixes the issue by zeroing out the number of values left to read
in a nested column once we know that we have seen all values in the
requested rows. To do this, we either have to read all values (in the
case where the user wants to read all rows), or we have to read a
parquet repetition level of `0` (which indicates the start of a new
record) after we have encountered the desired number of rows.
samster25 pushed a commit that referenced this pull request Aug 20, 2024
Another followup to #2586.

## Problem statement

#2586 incorrectly handles value reading and chunking. In that PR, only
local tests were used. Locally, chunk sizes of up to `128 * 1024` rows
are allowed, so chunk size exceeded the total number of rows to read.
However, non-local reads such as to S3 instead have a default chunk size
of `2048`. This results in a scenario where chunk size is less than the
total number of rows to read.

When this happens, if the row count of a data page aligns with the chunk
size, we continue reading the next data page to see if the last row
contains more leaf values. If the first value belongs to a new record,
then the number of rows seen would be incremented. It would then always
be the case that `rows read > additional rows to read (which is 0)`, and
the exit condition of `rows read == additional rows to read` is never
fulfilled, so we continue reading values into a chunk until the page
runs out of values. This could repeat for every subsequent data page.

The end result is that we can have columns with incorrectly sized chunks
that are incongruous with the chunk sizes of other columns, causing Daft
to error out.

**TLDR: chunk sizes were not being respected during parquet reads.**

## Solution

Instead of checking whether the `rows read == additional rows to read`
condition at the end of the loop where we iterate through a page's
values, we move the check to the start and `peek` at the value to decide
if we should continue iterating for the current chunk.

Additionally, we modify the change in #2643 so that the remaining number
of values to read are zeroed out iff the number of rows read is equal to
the total number of rows to read, and not when the number of rows read
is equal to the number of additional rows to read (which only applies to
the current chunk).

## Example

As an example, consider a parquet file with the schema `<nested
struct<field0 string, field1 string>`. Let `field0` be dictionary
encoded while `field1` uses fallback encoding. Given `4097` rows we
might get the following page layout:

```
Column: nested.field0
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  2       6.00 B     12 B      
  0-1    data  S R  4097    0.00 B     13 B                0       "a" / "arr"


Column: nested.field1
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  1024    2.931 kB   2.931 MB  
  0-1    data  S R  1024    1.26 B     1.261 kB            0       "a" / "zzrnqokwuddojvhlcrdnmtrad..."
  0-2    data  S _  1024    2.934 kB   2.934 MB            0       "aabhtzyyrmvztyiwyaafodbmh..." / "zyxodymgoooorpuarkpkiqjvi..."
  0-3    data  S _  1024    2.934 kB   2.934 MB            0       "aadupgntgjltmsrybltkimurs..." / "zyqwxllnhjdqrjtyeclpthwwy..."
  0-4    data  S _  1024    2.934 kB   2.934 MB            0       "aaazxwchmmahxhexenhbcssft..." / "zzlfnynbvwkertfrinofztjrk..."
  0-5    data  S _  1       2.939 kB   2.939 kB            0       "mmbzhmnbexeqknrnjftfiawsy..." / "mmbzhmnbexeqknrnjftfiawsy..."
```

Before this PR, after page `0-2` is read, we've read enough rows to fill
up a chunk of size `2048` (which is our default chunk size when reading
from S3). However, from #2586, we still read page `0-3` to check if the
row contains multiple leaf values. Before #2643, what happens is that we
see a repetition level of 0, so we increment the number of rows seen, so
`rows seen > additional rows to read for the page`, and we never fulfill
the strict `rows seen == additional rows to read` condition to stop
reading to a chunk. After #2586, we correctly note that the chunk is
full and exit, but we also consumed a value that belongs to the next
chunk, so we end up with insufficient values in the end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants