-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19426][SQL] Custom coalescer for Dataset #18861
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -596,14 +596,15 @@ object CollapseProject extends Rule[LogicalPlan] { | |
| object CollapseRepartition extends Rule[LogicalPlan] { | ||
| def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { | ||
| // Case 1: When a Repartition has a child of Repartition or RepartitionByExpression, | ||
| // 1) When the top node does not enable the shuffle (i.e., coalesce API), but the child | ||
| // enables the shuffle. Returns the child node if the last numPartitions is bigger; | ||
| // otherwise, keep unchanged. | ||
| // 1) When the top node does not enable the shuffle (i.e., coalesce with no user-specified | ||
| // strategy), but the child enables the shuffle. Returns the child node if the last | ||
| // numPartitions is bigger; otherwise, keep unchanged. | ||
| // 2) In the other cases, returns the top node with the child's child | ||
| case r @ Repartition(_, _, child: RepartitionOperation) => (r.shuffle, child.shuffle) match { | ||
| case (false, true) => if (r.numPartitions >= child.numPartitions) child else r | ||
| case _ => r.copy(child = child.child) | ||
| } | ||
| case r @ Repartition(_, _, child: RepartitionOperation, None) => | ||
|
||
| (r.shuffle, child.shuffle) match { | ||
| case (false, true) => if (r.numPartitions >= child.numPartitions) child else r | ||
| case _ => r.copy(child = child.child) | ||
| } | ||
| // Case 2: When a RepartitionByExpression has a child of Repartition or RepartitionByExpression | ||
| // we can remove the child. | ||
| case r @ RepartitionByExpression(_, child: RepartitionOperation, _) => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,12 @@ | |
|
|
||
| package org.apache.spark.sql.catalyst.plans.logical | ||
|
|
||
| import org.apache.spark.rdd.PartitionCoalescer | ||
| import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression | ||
| import org.apache.spark.sql.catalyst.plans._ | ||
| import org.apache.spark.sql.catalyst.plans.logical.statsEstimation._ | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.util.random.RandomSampler | ||
|
|
@@ -746,8 +746,20 @@ abstract class RepartitionOperation extends UnaryNode { | |
| * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user | ||
| * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer | ||
| * of the output requires some specific ordering or distribution of the data. | ||
| * | ||
| * If `shuffle` = false (`coalesce` cases), this logical plan can have an user-specified strategy | ||
| * to coalesce input partitions. | ||
| * | ||
| * @param numPartitions How many partitions to use in the output RDD | ||
| * @param shuffle Whether to shuffle when repartitioning | ||
| * @param child the LogicalPlan | ||
| * @param coalescer Optional coalescer that an user specifies | ||
| */ | ||
| case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) | ||
| case class Repartition( | ||
| numPartitions: Int, | ||
| shuffle: Boolean, | ||
| child: LogicalPlan, | ||
| coalescer: Option[PartitionCoalescer] = None) | ||
| extends RepartitionOperation { | ||
| require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a new require here?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD | |
| import org.apache.spark.api.java.function._ | ||
| import org.apache.spark.api.python.{PythonRDD, SerDeUtil} | ||
| import org.apache.spark.broadcast.Broadcast | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.rdd.{PartitionCoalescer, RDD} | ||
| import org.apache.spark.sql.catalyst._ | ||
| import org.apache.spark.sql.catalyst.analysis._ | ||
| import org.apache.spark.sql.catalyst.catalog.CatalogRelation | ||
|
|
@@ -2661,6 +2661,30 @@ class Dataset[T] private[sql]( | |
| partitionExprs.map(_.expr), logicalPlan, sparkSession.sessionState.conf.numShufflePartitions) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a new Dataset that an user-defined `PartitionCoalescer` reduces into fewer partitions. | ||
| * `userDefinedCoalescer` is the same with a coalescer used in the `RDD` coalesce function. | ||
| * | ||
| * If a larger number of partitions is requested, it will stay at the current | ||
| * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in | ||
| * a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not | ||
| * be a shuffle, instead each of the 100 new partitions will claim 10 of the current partitions. | ||
| * | ||
| * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, | ||
| * this may result in your computation taking place on fewer nodes than | ||
| * you like (e.g. one node in the case of numPartitions = 1). To avoid this, | ||
| * you can call repartition. This will add a shuffle step, but means the | ||
| * current upstream partitions will be executed in parallel (per whatever | ||
| * the current partitioning is). | ||
| * | ||
| * @group typedrel | ||
| * @since 2.3.0 | ||
| */ | ||
| def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]) | ||
| : Dataset[T] = withTypedPlan { | ||
|
||
| Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions | ||
| * are requested. If a larger number of partitions is requested, it will stay at the current | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,14 +20,13 @@ package org.apache.spark.sql.execution | |
| import scala.concurrent.{ExecutionContext, Future} | ||
| import scala.concurrent.duration.Duration | ||
|
|
||
| import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext} | ||
| import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD} | ||
| import org.apache.spark.{InterruptibleIterator, TaskContext} | ||
| import org.apache.spark.rdd.{EmptyRDD, PartitionCoalescer, PartitionwiseSampledRDD, RDD} | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions._ | ||
| import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} | ||
| import org.apache.spark.sql.catalyst.plans.physical._ | ||
| import org.apache.spark.sql.execution.metric.SQLMetrics | ||
| import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates | ||
| import org.apache.spark.sql.types.LongType | ||
| import org.apache.spark.util.ThreadUtils | ||
| import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} | ||
|
|
@@ -561,7 +560,7 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { | |
| * Physical plan for returning a new RDD that has exactly `numPartitions` partitions. | ||
| * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. | ||
| * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of | ||
| * the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions | ||
| * the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions | ||
| * is requested, it will stay at the current number of partitions. | ||
| * | ||
| * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, | ||
|
|
@@ -570,8 +569,16 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan { | |
| * you see ShuffleExchange. This will add a shuffle step, but means the | ||
| * current upstream partitions will be executed in parallel (per whatever | ||
| * the current partitioning is). | ||
| * | ||
| * If you want to define how to coalesce partitions, you can set a custom strategy | ||
| * to coalesce partitions in `coalescer`. | ||
| * | ||
| * @param numPartitions Number of partitions this coalescer tries to reduce partitions into | ||
| * @param child the SparkPlan | ||
| * @param coalescer Optional coalescer that an user specifies | ||
| */ | ||
| case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { | ||
| case class CoalesceExec(numPartitions: Int, child: SparkPlan, coalescer: Option[PartitionCoalescer]) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add the parm description of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok! |
||
| extends UnaryExecNode { | ||
| override def output: Seq[Attribute] = child.output | ||
|
|
||
| override def outputPartitioning: Partitioning = { | ||
|
|
@@ -580,7 +587,7 @@ case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecN | |
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| child.execute().coalesce(numPartitions, shuffle = false) | ||
| child.execute().coalesce(numPartitions, shuffle = false, coalescer) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add new test cases to
CollapseRepartitionSuitefor the changes in this rule.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok