[SPARK-24528][SQL] Add support to read multiple sorted bucket files for data source v1#29625
[SPARK-24528][SQL] Add support to read multiple sorted bucket files for data source v1#29625c21 wants to merge 4 commits intoapache:masterfrom
Conversation
|
cc @cloud-fan , @viirya , @imback82 , and @sameeragarwal to take a look when you guys have time, thanks. |
c21
left a comment
There was a problem hiding this comment.
Just highlight the change from FileScanRDD.compute() to BaseFileScanIterator, to help review more easily.
|
|
||
| override def next(): Object | ||
|
|
||
| private def readFile(file: PartitionedFile): Iterator[InternalRow] = { |
There was a problem hiding this comment.
This is changed compared to FileScanRDD.readCurrentFile, which takes a parameter instead of relying on currentFile, because we need to use nextIterator() to call readFile() multiple times for different files in initializeHeapWithFirstRows().
|
Test build #128187 has finished for PR 29625 at commit
|
| context.killTaskIfInterrupted() | ||
| (currentIterator != null && currentIterator.hasNext) || nextIterator() |
There was a problem hiding this comment.
This is changed compared to FileScanRDD.nextIterator, not call hasNext() because FileSortedBucketScanIterator needs to override hasNext().
|
Test build #128228 has finished for PR 29625 at commit
|
|
Test build #128229 has finished for PR 29625 at commit
|
|
retest this please |
|
Test build #128264 has finished for PR 29625 at commit
|
|
After discussing with @cloud-fan , it would be better to have a rule to automatically decide whether to do bucket sorted scan based on query shape, and same for bucket scan. So I will first do bucket scan in https://issues.apache.org/jira/browse/SPARK-24528, and redo this one after the PR for https://issues.apache.org/jira/browse/SPARK-24528 is merged. |
…cally ### What changes were proposed in this pull request? This PR is to add support to decide bucketed table scan dynamically based on actual query plan. Currently bucketing is enabled by default (`spark.sql.sources.bucketing.enabled`=true), so for all bucketed tables in the query plan, we will use bucket table scan (all input files per the bucket will be read by same task). This has the drawback that if the bucket table scan is not benefitting at all (no join/groupby/etc in the query), we don't need to use bucket table scan as it would restrict the # of tasks to be # of buckets and might hurt parallelism. The feature is to add a physical plan rule right after `EnsureRequirements`: The rule goes through plan nodes. For all operators which has "interesting partition" (i.e., require `ClusteredDistribution` or `HashClusteredDistribution`), check if the sub-plan for operator has `Exchange` and bucketed table scan (and only allow certain operators in plan (i.e. `Scan/Filter/Project/Sort/PartialAgg/etc`.), see details in `DisableUnnecessaryBucketedScan.disableBucketWithInterestingPartition`). If yes, disable the bucketed table scan in the sub-plan. In addition, disabling bucketed table scan if there's operator with interesting partition along the sub-plan. Why the algorithm works is that if there's a shuffle between the bucketed table scan and operator with interesting partition, then bucketed table scan partitioning will be destroyed by the shuffle operator in the middle, and we don't need bucketed table scan for sure. The idea of "interesting partition" is inspired from "interesting order" in "Access Path Selection in a Relational Database Management System"(http://www.inf.ed.ac.uk/teaching/courses/adbs/AccessPath.pdf), after discussion with cloud-fan . ### Why are the changes needed? To avoid unnecessary bucketed scan in the query, and this is prerequisite for #29625 (decide bucketed sorted scan dynamically will be added later in that PR). ### Does this PR introduce _any_ user-facing change? A new config `spark.sql.sources.bucketing.autoBucketedScan.enabled` is introduced which set to false by default (the rule is disabled by default as it can regress cached bucketed table query, see discussion in #29804 (comment)). User can opt-in/opt-out by enabling/disabling the config, as we found in prod, some users rely on assumption of # of tasks == # of buckets when reading bucket table to precisely control # of tasks. This is a bad assumption but it does happen on our side, so leave a config here to allow them opt-out for the feature. ### How was this patch tested? Added unit tests in `DisableUnnecessaryBucketedScanSuite.scala` Closes #29804 from c21/bucket-rule. Authored-by: Cheng Su <chengsu@fb.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
@c21 I wanted to ask if you were planning on continuing this PR now that https://github.com/apache/spark/pull/29804/files has been merged? |
|
@rahij - yes I am. Will raise a PR soon, thanks. |
|
@c21 Just wanted to check if you had any time to revive this PR. I'm happy to create a new PR off this branch, but I figured you know this code much better than I do in case we need to change/fix something. |
|
@rahij - sure I will take a look of palantir#730. Sorry I was busy with something else, I will have a PR ready in this week. Btw, I got feedback earlier for this PR, that we want to enable/disable sorted bucketed scan automatically similar to #29804 . So I need more time to craft for that, thanks. |
|
Thank you, that makes sense. You don't have to take a look at palantir#730 - the current master branch is not up to date with apache/spark right now (but hopefully should soon). So we can just cherry pick whatever you get merged upstream once our branch is up to date. |
|
@rahij - yes sure. sorry I was just reading as "review" instead of "revive" by mistake :) |
What changes were proposed in this pull request?
This PR is to support reading sorted bucket table efficiently in data source v1 read path. Previously in
FileSourceScanExec->FileScanRDD, we read input file sequentially one by one. For sorted bucket table which potentially can have multiple sorted files per bucket, the sort ordering for each bucket cannot be preserved as we read sorted files one by one. This PR is to add the support to read sorted files all together in a sort-merge way to preserve the ordering for each bucket.Specifically the code change is:
Add a parameter
ScanMode(which can be eitherRowMode(read row by row - non-vectorization),BatchMode(read batch by batch - vectorization), orSortedBucketMode(read rows in sort-merge way - sorted bucketed file case)) forFileScanRDD.FileScanRDD.compute()decides which iterator to use based on thisScanMode.Extract the existing iterator logic in
FileScanRDDtoBaseFileScanIterator, which holds common logic for file scan.BaseFileScanIteratorhas 3 subclasses whereFileRowScanIteratorreads row by row for each file (RowMode), andFileBatchScanIteratorreads batch by batch for each file (BatchMode), andFileSortedBucketScanIteratorreads row by row based on each row ordering across multiple files (SortedBucketMode).FileSortedBucketScanIterator: a priority queue is used to output rows from multiple sorted files based on table sorted columns.The PR also separates the logic for existing row scan and batch scan, so we don't need to do
if (nextElement.isInstanceOf[ColumnarBatch]) {at line 100 ofFileScanRDDper row, which was a long-standing TODO.The whole feature is controlled by a new config
spark.sql.sources.bucketing.sortedScan.enabledwhich is disabled by default, as there can be a risk to read more data in task and cause OOM (especially with vectorization, we need to keep each batch of each file in memory at any time). In addition, we need to setInputFileBlockHolderfor each row (as any row from any file can be outputted), so it can be costly.Why are the changes needed?
Right now for sorted bucket table, even though each individual file is sorted, the current file scan approach does not preserve sort ordering across files in one bucket, so a sort still needs for a sort-merge join or sort aggregate. If the table is big, the external sort can happen and cause extra CPU and IO overhead. Introducing the code path here to read multiple sorted files in a sort-merge way to avoid later sort before join or aggregate, to save CPU and IO resource.
At our internal fork, we added the similar support (https://www.youtube.com/watch?v=brzInUisshY&feature=youtu.be&t=910), though the code path is different (we are reading/writing hive table).
Does this PR introduce any user-facing change?
Yes. A new user-facing config
spark.sql.sources.bucketing.sortedScan.enabledis introduced to allow users to read sorted bucketed table efficiently (e.g. no sort for sort merge join / sort aggregate on bucketed columns).Example query plan with no
SortbeforeSortMergeJoin:How was this patch tested?
Added unit tests in
BucketedReadSuite.scalaandExplainSuite.scala.