add support for reading multiple sorted files per bucket#730
Closed
add support for reading multiple sorted files per bucket#730
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain)
This PR is a modified version of an unmerged PR upstream (that will be reopened by the author soon): apache#29625. However, since we are not fully caught up with the 3.0 branch and we need this feature internally, I have modified it to work on our branch with the least amount of changes required.
What changes were proposed in this pull request?
Quick background: When there are multiple files in a single bucket, spark does not propagate the sort ordering to the
FileSourceScanExecnode. This means that if a parent operator requires a child ordering that is equal to the file ordering in the buckets, we still end up sorting every partition. This PR propagates the sort ordering and creates an RDD that produces rows by merging these sorted iterators.The diff looks a bit large but the actual changes are minimal:
FileScanRDDused to contain all the logic to produce the next rows. In this PR, we pass it aScanModeinstead that then delegates to a different iterator if we need a sorted bucketed scan.FileScanIteratorscontains aBaseFileScanIteratorand 3 subclasses for row based scans, column batch scans and the sorted bucketed scan.BaseFileScanIteratorare a literal copy paste of what used to be inFileScanRDDexcept https://github.com/palantir/spark/pull/730/files#diff-c64b05200405088131067d856ed7d9d29290d47881018c7a7b0db4668ddda9d3R140-R143.nextmethods implemented by theFileRowScanIteratorandFileBatchScanIteratorare also exact copy pastes fromFileScanRDD, except that we have removed thisif-elsecheck here and split it into 2 different iterators similar to the upstream PR - this is purely for cleanup and I can merge them back if you prefer.nextmethod inFileSortedBucketScanIteratoris the core logic of this change - this is a literal copy paste from the upstream PR. It holds a min heap of the next element in the backing iterators and returns the head. This will require a higher memory footprint for the vectorized readers since it holds the next batch from all of the backing iterators in memory.Whatever conflicts this causes with out 3.0 branch, I can take responsibility for resolving those. Once the upstream PR has merged and we are up to date with 3.0, I will revert this PR and cherry pick the upstream one.
How was this patch tested?
Unit tests. It is also hidden behind a flag like the upstream PR, so we can selectively enable it initially before rolling out more widely.
cc @mattsills