-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15453] [SQL] FileSourceScanExec to extract outputOrdering information
#14864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15453] [SQL] FileSourceScanExec to extract outputOrdering information
#14864
Conversation
…mize query plan
BEFORE
```
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1)
hc.sql("DROP TABLE table8").collect
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
hc.sql("DROP TABLE table9").collect
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
hc.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
:- 'UnresolvedRelation table8, a
+- 'UnresolvedRelation table9, b
== Analyzed Logical Plan ==
i: int, j: int, k: string, i: int, j: int, k: string
Project [i#119, j#120, k#121, i#122, j#123, k#124]
+- Join Inner, ((j#120 = j#123) && (k#121 = k#124))
:- SubqueryAlias a
: +- SubqueryAlias table8
: +- Relation[i#119,j#120,k#121] orc
+- SubqueryAlias b
+- SubqueryAlias table9
+- Relation[i#122,j#123,k#124] orc
== Optimized Logical Plan ==
Join Inner, ((j#120 = j#123) && (k#121 = k#124))
:- Filter (isnotnull(k#121) && isnotnull(j#120))
: +- Relation[i#119,j#120,k#121] orc
+- Filter (isnotnull(k#124) && isnotnull(j#123))
+- Relation[i#122,j#123,k#124] orc
== Physical Plan ==
*SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
:- *Sort [j#120 ASC, k#121 ASC], false, 0
: +- *Project [i#119, j#120, k#121]
: +- *Filter (isnotnull(k#121) && isnotnull(j#120))
: +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Sort [j#123 ASC, k#124 ASC], false, 0
+- *Project [i#122, j#123, k#124]
+- *Filter (isnotnull(k#124) && isnotnull(j#123))
+- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
```
AFTER
```
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
:- 'UnresolvedRelation `table8`, a
+- 'UnresolvedRelation `table9`, b
== Analyzed Logical Plan ==
i: int, j: int, k: string, i: int, j: int, k: string
Project [i#48, j#49, k#50, i#51, j#52, k#53]
+- Join Inner, ((j#49 = j#52) && (k#50 = k#53))
:- SubqueryAlias a
: +- SubqueryAlias table8
: +- Relation[i#48,j#49,k#50] orc
+- SubqueryAlias b
+- SubqueryAlias table9
+- Relation[i#51,j#52,k#53] orc
== Optimized Logical Plan ==
Join Inner, ((j#49 = j#52) && (k#50 = k#53))
:- Filter (isnotnull(k#50) && isnotnull(j#49))
: +- Relation[i#48,j#49,k#50] orc
+- Filter (isnotnull(j#52) && isnotnull(k#53))
+- Relation[i#51,j#52,k#53] orc
== Physical Plan ==
*SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
:- *Project [i#48, j#49, k#50]
: +- *Filter (isnotnull(k#50) && isnotnull(j#49))
: +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string>
+- *Project [i#51, j#52, k#53]
+- *Filter (isnotnull(j#52) && isnotnull(k#53))
+- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string>
```
|
ok to test |
|
Test build #64552 has finished for PR 14864 at commit
|
|
Jenkins test this please. The last run had JVM crash |
|
Test build #64592 has finished for PR 14864 at commit
|
|
cc @rxin , @cloud-fan for review |
|
I'm not sure it's safe to do so. A bucket may have more than one files(this can happen if we append data into bucketed table), so for each file, it's sorted, but for the whole bucket, it is NOT, because currently Spark SQL doesn't guarantee the records are sorted across all files of a bucket. |
|
@cloud-fan : I have taken care of that case in the PR (see L175 to L185). The sort ordering will only be used when all the buckets have single file. In subsequent PRs I plan to extend this so that it will benefit Hive tables where there is strict guarantee about single file per bucket. |
| relation.location.listFiles(partitionFilters).flatMap(partition => partition.files) | ||
| val bucketToFilesGrouping = | ||
| files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) | ||
| val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listing files and grouping by bucket id can be expensive, if there are a lot of files. What's worse, we will do it again in createBucketedReadRDD.
Instead of doing this, I'd like to fix the sorting problem for bucketed table first, then we don't need to scan file names to get the outputOrdering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sorting problem, one way to fix would be to do what Hive does : create single file per bucket. For any other approach, since there would be multiple files per bucket, one would have to globally sort them while reading it. This would in a way be sub-optimal because tables tend to be "write-once, read many" and spending more CPU once for write path to generate single file would be better.
When I came across this, I wondered why it was designed this way. I even posted about this to dev group earlier today : http://apache-spark-developers-list.1001551.n3.nabble.com/Questions-about-bucketing-in-Spark-td18814.html
To give you some context, I am trying to drive adoption for Spark within Facebook. We have lot of tables which would benefit from having full bucketing support. So my high level goal is to get Spark's bucketing in par with Hive's in terms of features and compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea that's a good question, single file per bucket looks more reasonable, it's more important to read bucketed table fast than writing it fast. But how about data insertion? Does hive support inserting into bucketed table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan : Open source Hive allows INSERTing data into bucketed table but it breaks the guarantee about one file per bucket. We could do better in two ways:
- Disallow operations which would break bucketing guarantee OR
- Always preserve bucketing across all operations which would mean rewriting of the entire table at times (eg INSERT INTO) and more complication in the code.
I think the later is a better model for longer term. But we could start with first one and work over it iteratively.
|
hi @tejasapatil , I think your second proposal(always preserve bucketing across all operations) makes sense, but need more discussion to make the decision. For now, I think it's ok to use your current approach, e.g. only expose the We can add a lazy val in What do you think? |
|
@cloud-fan : Sounds good to me. I tried doing that but got a |
|
mark it as |
|
@cloud-fan : Thanks !! Did the change. Jenkins, test this please |
|
Test build #64898 has finished for PR 14864 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we follow the same way to handle bucket columns? i.e.
val bucketColumns = spec.bucketColumnNames.flatMap { n =>
output.find(_.name == n)
}
if (bucketColumns.size == spec.bucketColumnNames.size) {
If the required output doesn't contain sort columns, should we just ignore the sorting, or throw exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan : Sure. Did this change.
I am throwing exception because end user should know that there is something wrong with the table metadata and they need to look into that.
c60afd6 to
568b742
Compare
|
|
||
| def toAttribute(colName: String, columnType: String): Attribute = | ||
| output.find(_.name == colName).getOrElse { | ||
| throw new AnalysisException(s"Could not find $columnType column $colName for " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that, if a table has 3 columns:i, j, k, and is bucketed by i and j, sorted by j and k. Now we wanna read i and j from this table, then the generated RDD should be bucketed, i.e. the number of partitions of this RDD should be equal to the number of buckets. For each RDD partition, can we treat it as sorted by j?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you meant earlier. I have made changes to the PR to follow this:
For bucketed columns:
HashPartitioning would be used only when:
- ALL the bucketing columns are being read from the table
For sorted columns:
Sort ordering should be used when ALL these criteria's match:
HashPartitioningis being used- A prefix (or all) of the sort columns are being read from the table.
Sort ordering would be over the prefix subset of sort columns being read
from the table. eg.
Assume (col0, col2, col3) are the columns read from the table
- If sort columns are (col0, col1), then sort ordering would be considered as (col0)
- If sort columns are (col1, col0), then sort ordering would be empty as per rule Removed reference to incubation in Spark user docs. #2 above
|
Test build #64901 has finished for PR 14864 at commit
|
|
Test build #65136 has finished for PR 14864 at commit
|
| .map(i => (i, i * 2, i.toString)) | ||
| .toDF("i", "j", "k") | ||
| .coalesce(1) | ||
| df.write.bucketBy(4, "j", "k").sortBy("j", "k").saveAsTable("SPARK_15453_table_a") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we bucket the table by i, j and sort it by j, k? To reflect the test name if the join predicates are subset of the sorted columns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For SMB to happen, bucketing columns == sort columns == join keys. My naming for the test case was wrong. I have deleted this test as I found a better place to add this test.
|
LGTM except some minor comments about tests, thanks for working on it! |
|
Test build #65155 has finished for PR 14864 at commit
|
|
Test build #65159 has finished for PR 14864 at commit
|
|
thanks, merging to master! |
…ormation ## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-15453 Extracting sort ordering information in `FileSourceScanExec` so that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted. Query: ``` val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", "k").coalesce(1) df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8") df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9") context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND a.k=b.k").explain(true) ``` Before: ``` == Physical Plan == *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner :- *Sort [j#120 ASC, k#121 ASC], false, 0 : +- *Project [i#119, j#120, k#121] : +- *Filter (isnotnull(k#121) && isnotnull(j#120)) : +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Sort [j#123 ASC, k#124 ASC], false, 0 +- *Project [i#122, j#123, k#124] +- *Filter (isnotnull(k#124) && isnotnull(j#123)) +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> ``` After: (note that the `Sort` step is no longer there) ``` == Physical Plan == *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner :- *Project [i#48, j#49, k#50] : +- *Filter (isnotnull(k#50) && isnotnull(j#49)) : +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: struct<i:int,j:int,k:string> +- *Project [i#51, j#52, k#53] +- *Filter (isnotnull(j#52) && isnotnull(k#53)) +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, Format: ORC, InputPaths: file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: struct<i:int,j:int,k:string> ``` ## How was this patch tested? Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite` Author: Tejas Patil <[email protected]> Closes apache#14864 from tejasapatil/SPARK-15453_smb_optimization.
|
Was this patch merged in spark2.0.1 release? If so, how can it be enforced/turned on? |
|
This is an optimization and we usually don't backport optimizations. |
…dering` information ### What changes were proposed in this pull request? `outputOrdering` is also dependent on whether the bucket has more than one files. The test cases fail when we try to move them to sql/core. This PR is to fix the test cases introduced in #14864 and add a test case to verify [the related logics](https://github.com/tejasapatil/spark/blob/070c24994747c0479fb2520774ede27ff1cf8cac/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L197-L206). ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes #16994 from gatorsmile/bucketingTS.
…dering` information ### What changes were proposed in this pull request? `outputOrdering` is also dependent on whether the bucket has more than one files. The test cases fail when we try to move them to sql/core. This PR is to fix the test cases introduced in apache#14864 and add a test case to verify [the related logics](https://github.com/tejasapatil/spark/blob/070c24994747c0479fb2520774ede27ff1cf8cac/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L197-L206). ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes apache#16994 from gatorsmile/bucketingTS.
What changes were proposed in this pull request?
Jira : https://issues.apache.org/jira/browse/SPARK-15453
Extracting sort ordering information in
FileSourceScanExecso that planner can make use of it. My motivation to make this change was to get Sort Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are bucketed + sorted.Query:
Before:
After: (note that the
Sortstep is no longer there)How was this patch tested?
Added a test case in
JoinSuite. Ran all other tests inJoinSuite