Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix Parquet reads with chunk sizing #2658

Merged
merged 2 commits into from
Aug 20, 2024

Conversation

desmondcheongzx
Copy link
Contributor

@desmondcheongzx desmondcheongzx commented Aug 14, 2024

Another followup to #2586.

Problem statement

#2586 incorrectly handles value reading and chunking. In that PR, only local tests were used. Locally, chunk sizes of up to 128 * 1024 rows are allowed, so chunk size exceeded the total number of rows to read. However, non-local reads such as to S3 instead have a default chunk size of 2048. This results in a scenario where chunk size is less than the total number of rows to read.

When this happens, if the row count of a data page aligns with the chunk size, we continue reading the next data page to see if the last row contains more leaf values. If the first value belongs to a new record, then the number of rows seen would be incremented. It would then always be the case that rows read > additional rows to read (which is 0), and the exit condition of rows read == additional rows to read is never fulfilled, so we continue reading values into a chunk until the page runs out of values. This could repeat for every subsequent data page.

The end result is that we can have columns with incorrectly sized chunks that are incongruous with the chunk sizes of other columns, causing Daft to error out.

TLDR: chunk sizes were not being respected during parquet reads.

Solution

Instead of checking whether the rows read == additional rows to read condition at the end of the loop where we iterate through a page's values, we move the check to the start and peek at the value to decide if we should continue iterating for the current chunk.

Additionally, we modify the change in #2643 so that the remaining number of values to read are zeroed out iff the number of rows read is equal to the total number of rows to read, and not when the number of rows read is equal to the number of additional rows to read (which only applies to the current chunk).

Example

As an example, consider a parquet file with the schema <nested struct<field0 string, field1 string>. Let field0 be dictionary encoded while field1 uses fallback encoding. Given 4097 rows we might get the following page layout:

Column: nested.field0
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  2       6.00 B     12 B      
  0-1    data  S R  4097    0.00 B     13 B                0       "a" / "arr"


Column: nested.field1
--------------------------------------------------------------------------------
  page   type  enc  count   avg size   size       rows     nulls   min / max
  0-D    dict  S _  1024    2.931 kB   2.931 MB  
  0-1    data  S R  1024    1.26 B     1.261 kB            0       "a" / "zzrnqokwuddojvhlcrdnmtrad..."
  0-2    data  S _  1024    2.934 kB   2.934 MB            0       "aabhtzyyrmvztyiwyaafodbmh..." / "zyxodymgoooorpuarkpkiqjvi..."
  0-3    data  S _  1024    2.934 kB   2.934 MB            0       "aadupgntgjltmsrybltkimurs..." / "zyqwxllnhjdqrjtyeclpthwwy..."
  0-4    data  S _  1024    2.934 kB   2.934 MB            0       "aaazxwchmmahxhexenhbcssft..." / "zzlfnynbvwkertfrinofztjrk..."
  0-5    data  S _  1       2.939 kB   2.939 kB            0       "mmbzhmnbexeqknrnjftfiawsy..." / "mmbzhmnbexeqknrnjftfiawsy..."

Before this PR, after page 0-2 is read, we've read enough rows to fill up a chunk of size 2048 (which is our default chunk size when reading from S3). However, from #2586, we still read page 0-3 to check if the row contains multiple leaf values. Before #2643, what happens is that we see a repetition level of 0, so we increment the number of rows seen, so rows seen > additional rows to read for the page, and we never fulfill the strict rows seen == additional rows to read condition to stop reading to a chunk. After #2586, we correctly note that the chunk is full and exit, but we also consumed a value that belongs to the next chunk, so we end up with insufficient values in the end.

@desmondcheongzx desmondcheongzx force-pushed the fix-parquet-chunking branch 2 times, most recently from 693ffe9 to bb19dad Compare August 14, 2024 20:51
@desmondcheongzx desmondcheongzx changed the title [WIP] Fix Parquet chunking [BUG] Fix Parquet reads with chunk sizing Aug 14, 2024
@github-actions github-actions bot added the bug Something isn't working label Aug 14, 2024
Add tests

Add integration label

Fix typos; add 2048 test case

Harden parquet reader

Test various chunk sizes
Copy link
Member

@samster25 samster25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

@@ -19,6 +19,8 @@ def get_filesystem_from_path(path: str, **kwargs) -> fsspec.AbstractFileSystem:
return fs


# TODO(desmond): Expected some of these non-working cases to work after recent fixes, but that
# doesn't seem to be the case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to enable some of the struct tests after fixing the casting issues that we saw yesterday.

@samster25 samster25 merged commit fbce3ac into Eventual-Inc:main Aug 20, 2024
42 checks passed
@desmondcheongzx desmondcheongzx deleted the fix-parquet-chunking branch August 20, 2024 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants