-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34863][SQL] Support complex types for Parquet vectorized reader #33695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1bf6e10 to
1d86a47
Compare
|
Test build #142294 has finished for PR 33695 at commit
|
1d86a47 to
4422153
Compare
|
Test build #142297 has finished for PR 33695 at commit
|
4422153 to
430b4b7
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #142298 has finished for PR 33695 at commit
|
|
Test build #142351 has finished for PR 33695 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
3b2155d to
ba6d332
Compare
|
Test build #142356 has finished for PR 33695 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
ba6d332 to
9aec74a
Compare
|
Kubernetes integration test starting |
|
Thank you, @sunchao for great work. Do we have benchmark result? |
|
Kubernetes integration test status success |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
|
Test build #142365 has finished for PR 33695 at commit
|
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumn.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"after the current batch." or "in the current batch."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is after the current batch, suppose we have the following repetition levels:
0, 1, 0, 1, 0, 1, 0, 1, 0, 1
and suppose batch size is 4, we'll need to see the 5th 0 (which is the first list of the next batch) before we know that we've completed 4 lists for the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me know if you have better way to write the comments, it could be confusing.
10fc357 to
9aa1212
Compare
|
Test build #142890 has started for PR 33695 at commit |
|
Kubernetes integration test unable to build dist. exiting with code: 141 |
9aa1212 to
2a74691
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #142891 has finished for PR 33695 at commit
|
|
I think this is ready for review (although I plan to beef up the test coverage a bit more). cc @viirya @dongjoon-hyun @cloud-fan @vkorukanti @sadikovi @michal-databricks @c21 Let me know also if you prefer to break this into sub-PRs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
It's great! Thank you, @sunchao . |
sadikovi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for opening a PR. Fantastic work! I left a few initial comments and will review it in more detail soon-ish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we mention that this is only enabled when vectorised reader is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would it be possible to capitalise all of the javadoc comments in this file? I think it would look neater 😄 . Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the code handle situations when there are no repetition or definition levels for the type? Would it allocate vectors even when the column is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the column is null (i.e., a missing column), the code will exit at line 74 above, so it won't allocate vectors for these. I'm not quite sure what do you mean by "when there are no repetition or definition levels for the type", can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have misunderstood the code, my question was about what happens when you read a primitive type that is non-null and non-repeated. Would the code still allocate the level vectors even if they are not necessarily needed? Again, it probably does not matter that much but maybe it is something we can clarify with a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sadikovi ah I see what you mean. Good point. Let me see if I can remove these in those cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it's a bit tricky to remove these, although possible. Let me mark a TODO here and we can tackle it separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I also thought it might be tricky to fix. It is fine to keep as is, I guess, especially if performance is good anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, isPrimitive == children.isEmpty() would this hold?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it doesn't hold for empty structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point!
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumn.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Let's remove new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
2a74691 to
90f531b
Compare
|
Thanks @sadikovi for taking a look! really appreciate it. |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #143133 has finished for PR 33695 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #143373 has finished for PR 33695 at commit
|
|
I'm going to break this into smaller PRs for easier review. Closing it now. |
|
|
||
| private int readPage() { | ||
| DataPage page = pageReader.readPage(); | ||
| if (page == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious when/why does this case now happen ? Can we run past a column in the middle ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops sorry @agrawaldevesh just saw your comment.
This could happen when we are reading a repeated column (e.g., an int list). In this case, we don't know whether the list is completed or not until we have no more page to read.
I think the newer version of parquet-mr (since 0.11 I think) no longer allow a repeated list to span across multiple pages, but it is allowed in older versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's always great to put these nice details in code comments, as there are so many small tricks people don't know
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll add some comments in the new PR.
|
I've opened a new PR #34659 replacing this one. Will appreciate your review there! |
What changes were proposed in this pull request?
This PR adds support for complex types (e.g., list, map, array) for Spark's vectorized Parquet reader. In particular, this introduces the following changes:
ParquetTypewhich binds a Spark type with its corresponding Parquet definition & repetition level. This is used when Spark assembles a vector of complex type for Parquet.ParquetSchemaConverterand added a new methodconvertTypeInfowhich converts a ParquetMessageTypeto aParquetTypeabove. The existing conversion logic in the class remains the same but now operates withParquetTypeinstead ofDataType, and annotate the former with extra information such as definition & repetition level, column path, column descriptor, etc.ParquetColumnwhich encapsulates all the necessary information needed when reading a Parquet column, including theParquetTypefor the column, the repetition & definition levels (only allocated for a leaf-node of a complex type), as well as the reader for the column. In addition, it also contains logic for assembling nested columnar batches, via interpreting Parquet repetition & definition levels.VectorizedParquetRecordReaderto initialize a list ofParquetColumnfor the columns read.VectorizedColumnReadernow also creates a reader for repetition column. Depending on whether maximum repetition level is 0, the batch read is now split into two code paths, e.g.,readBatchversusreadBatchNested.VectorizedRleValuesReader. For data types involving only struct or primitive types, it still goes with the oldreadBatchmethod which now also saves definition levels into a vector for later assembly. Otherwise, for data types involving array or map, a separate code pathreadBatchNestedis introduced to handle repetition levels.spark.sql.parquet.enableNestedColumnVectorizedReaderto turn on or turn off the feature. By default it is true.WritableColumnVectorto better support null structs. Currently it requires populating null entries to all child vectors when there is a null struct, however this will waste space and also doesn't work well with Parquet scan. This adds an extra fieldstructOffsetswhich records the mapping from a row ID to the position of the row in the child vector, so that child vectors will only need to store real null elements.To test this, the PR introduced an interface
ParquetRowGroupReaderinSpecificParquetRecordReaderBaseto abstract the Parquet file reading logic. The bulk of the tests are inParquetVectorizedSuitewhich covers different batch size & page size, column index, first row index, nulls, etc.The
DataSourceReadBenchmarkis extended with two more cases: reading struct fields of primitive types and reading array of struct & map field.Why are the changes needed?
Whenever read schema containing complex types, at the moment Spark will fallback to the row-based reader in parquet-mr, which is much slower. As benchmark shows, by adding support into the vectorized reader, we can get ~15x on average speed up on reading struct fields, and ~1.5x when reading array of struct and map.
Micro benchmark of reading primitive fields from a struct, over 400m rows:
Does this PR introduce any user-facing change?
With the PR Spark should now support reading complex types in its vectorized Parquet reader. A new config
spark.sql.parquet.enableNestedColumnVectorizedReaderis introduced to turn the feature on or off.How was this patch tested?
Added new unit tests.