-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-9066][SQL] Improve cartesian performance #7417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
0a62098
61d1a7e
23deb4b
eb9d155
8198648
1006d46
f0ce447
2bc0991
547242e
bca7a07
a168900
99bcde7
4310536
b2a0ae8
5ca1d26
04678d1
f1cebae
8a8658c
60f2102
e01c8f0
dd77444
d9aef91
a66f475
9812242
ce6ad25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -213,10 +213,51 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
| object CartesianProduct extends Strategy { | ||
| def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
| case logical.Join(left, right, _, None) => | ||
| execution.joins.CartesianProduct(planLater(left), planLater(right)) :: Nil | ||
| // For BroadcastCartesianProduct we will broadcast the small size plan, | ||
| // for CartesianProduct we will use the small size plan as cartesian left rdd. | ||
| if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { | ||
| if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && | ||
| right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { | ||
| execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildRight) :: Nil | ||
| } else { | ||
| execution.joins.CartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildLeft) :: Nil | ||
| } | ||
| } else { | ||
| if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && | ||
| left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { | ||
| execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildLeft) :: Nil | ||
| } else { | ||
| execution.joins.CartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildRight) :: Nil | ||
| } | ||
| } | ||
| case logical.Join(left, right, Inner, Some(condition)) => | ||
| execution.Filter(condition, | ||
| execution.joins.CartesianProduct(planLater(left), planLater(right))) :: Nil | ||
| if (right.statistics.sizeInBytes <= left.statistics.sizeInBytes) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is almost the same as the code above. I would put it in a method i.e. |
||
| if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && | ||
| right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { | ||
| execution.Filter(condition, | ||
| execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildRight)) :: Nil | ||
| } else { | ||
| execution.Filter(condition, | ||
| execution.joins.CartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildLeft)) :: Nil | ||
| } | ||
| } else { | ||
| if (sqlContext.conf.autoBroadcastJoinThreshold > 0 && | ||
| left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold) { | ||
| execution.Filter(condition, | ||
| execution.joins.BroadcastCartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildLeft)) :: Nil | ||
| } else { | ||
| execution.Filter(condition, | ||
| execution.joins.CartesianProduct(planLater(left), planLater(right), | ||
| joins.BuildRight)) :: Nil | ||
| } | ||
| } | ||
| case _ => Nil | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.joins | ||
|
|
||
| import scala.concurrent._ | ||
| import scala.concurrent.duration._ | ||
|
|
||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, JoinedRow} | ||
| import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
| /** | ||
| * :: DeveloperApi :: | ||
| */ | ||
| @DeveloperApi | ||
| case class BroadcastCartesianProduct( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this different from a BroadcastNestedLoopJoin?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BroadcastNestedLoopJoin just used for out join right? But this is used for cartesian.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The inner join variant with (degenerate) condition All I am saying is that this also a way to get a broadcasting cartesian join going, and it saves some lines of code. |
||
| left: SparkPlan, | ||
| right: SparkPlan, | ||
| buildSide: BuildSide) extends BinaryNode { | ||
| override def output: Seq[Attribute] = left.output ++ right.output | ||
|
|
||
| private val (streamed, broadcast) = buildSide match { | ||
| case BuildRight => (left, right) | ||
| case BuildLeft => (right, left) | ||
| } | ||
|
|
||
| private val timeout: Duration = { | ||
| val timeoutValue = sqlContext.conf.broadcastTimeout | ||
| if (timeoutValue < 0) { | ||
| Duration.Inf | ||
| } else { | ||
| timeoutValue.seconds | ||
| } | ||
| } | ||
|
|
||
| @transient | ||
| private val broadcastFuture = future { | ||
| val input = broadcast.execute().map(_.copy()).collect() | ||
| sparkContext.broadcast(input) | ||
| }(BroadcastCartesianProduct.broadcastCartesianProductExecutionContext) | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| val leftResults = streamed.execute().map(_.copy()) | ||
| val rightResults = Await.result(broadcastFuture, timeout) | ||
|
|
||
| leftResults.mapPartitions { streamedIter => | ||
| for (x <- streamedIter; y <- rightResults.value) | ||
| yield { | ||
| val joinedRow = new JoinedRow | ||
| buildSide match { | ||
| case BuildRight => joinedRow(x, y) | ||
| case BuildLeft => joinedRow(y, x) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| object BroadcastCartesianProduct { | ||
| private val broadcastCartesianProductExecutionContext = ExecutionContext.fromExecutorService( | ||
| ThreadUtils.newDaemonCachedThreadPool("broadcast-cartesian-product", 128)) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,16 +27,27 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} | |
| * :: DeveloperApi :: | ||
| */ | ||
| @DeveloperApi | ||
| case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { | ||
| case class CartesianProduct( | ||
| left: SparkPlan, | ||
| right: SparkPlan, | ||
| buildSide: BuildSide) extends BinaryNode { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not so sure if the change necessary, as if either side of the table is small enough, we will resort to the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Sephiroth-Lin Can you explain the reason that we need
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yhuai use buildSide just want to know which side is small, and use this to decide whether we need to change the order. |
||
| override def output: Seq[Attribute] = left.output ++ right.output | ||
|
|
||
| private val (small, big) = buildSide match { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has to be |
||
| case BuildRight => (left, right) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In other places, |
||
| case BuildLeft => (right, left) | ||
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| val leftResults = left.execute().map(_.copy()) | ||
| val rightResults = right.execute().map(_.copy()) | ||
| val leftResults = small.execute().map(_.copy()) | ||
| val rightResults = big.execute().map(_.copy()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could use zipPartition() here, similar to ShuffleHashJoin, then we don't need to copy the bigger stream.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davies Sorry, I don't very clear. Use zipPartition() can get two iters, then we use these 2 iters do cartesian by ourselfe, don't call cartesian()?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, just realized that we can't use zipPartition here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As comment by @zsxwing , the right side will be iterated multiple times by CartesianRDD, so it make sense to having small on the right side. We could even load all the items on right side into memory as Array, we could do this if each partition of |
||
|
|
||
| leftResults.cartesian(rightResults).mapPartitions { iter => | ||
| val joinedRow = new JoinedRow | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quick question. Why not use
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, use partition size here is not accurate, see a rdd with 100 partitions, and each partition has one record and a rdd with 10 partition and each partition has 100 million records, use the method above will cause more scan from hdfs
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hvanhovell Yes, use sizeInBytes is better, but also have a problem, if leftResults only have 1 record and this record size are big, and rightResults have many records and these records total size are small, then at this scenario will cause worse performance. The best way is we check the total records for the partition, but now we can not get it. |
||
| iter.map(r => joinedRow(r._1, r._2)) | ||
| buildSide match { | ||
| case BuildRight => iter.map(r => joinedRow(r._1, r._2)) | ||
| case BuildLeft => iter.map(r => joinedRow(r._2, r._1)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use
CanBroadcastextractor in order to determine that a side is Broadcastable, this will also consider thebroadcasthint, and is probably more future proof.