Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ class SqlParser extends AbstractSparkSQLParser with DataTypeParser {
{ case s ~ p => Substring(s, p, Literal(Integer.MAX_VALUE)) }
| (SUBSTR | SUBSTRING) ~ "(" ~> expression ~ ("," ~> expression) ~ ("," ~> expression) <~ ")" ^^
{ case s ~ p ~ l => Substring(s, p, l) }
| COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs => Coalesce(exprs) }
| COALESCE ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { case exprs =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap the whole case as is done below

expressions.Coalesce(exprs) }
| SQRT ~ "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) }
| ABS ~ "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) }
| ident ~ ("(" ~> repsep(expression, ",")) <~ ")" ^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import order


import scala.collection.immutable.HashSet
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -234,7 +236,7 @@ object NullPropagation extends Rule[LogicalPlan] {
case e @ Count(expr) if !expr.nullable => Count(Literal(1))

// For Coalesce, remove null literals.
case e @ Coalesce(children) =>
case e @ expressions.Coalesce(children) =>
val newChildren = children.filter {
case Literal(null, _) => false
case _ => true
Expand All @@ -244,7 +246,7 @@ object NullPropagation extends Rule[LogicalPlan] {
} else if (newChildren.length == 1) {
newChildren(0)
} else {
Coalesce(newChildren)
expressions.Coalesce(newChildren)
}

case e @ Substring(Literal(null, _), _, _) => Literal.create(null, e.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

case class Coalesce(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider calling this CoalescePartitions since coalesce is such a common SQL concept.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup repartition ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently Repartition also exists :( I'm reverting to CoalescePartitions.
On Apr 28, 2015 6:29 PM, "Reynold Xin" [email protected] wrote:

In
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
#5762 (comment):

@@ -310,6 +310,10 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

+case class Coalesce(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends UnaryNode {

yup repartition ...


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/5762/files#r29304752.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to be super confusing with existing repartition and CoalescePartitions.

How about

Repartition vs RepartitionByExpression?

override def output: Seq[Attribute] = child.output
}

/**
* A relation with one row. This is used in "SELECT ..." without a from clause.
*/
Expand Down
10 changes: 3 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, S
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, ResolvedStar}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
import org.apache.spark.sql.jdbc.JDBCWriteDetails
Expand Down Expand Up @@ -961,9 +962,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def repartition(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.map(_.copy()).repartition(numPartitions),
schema, needsConversion = false)
logical.Coalesce(numPartitions, shuffle = true, logicalPlan)
}

/**
Expand All @@ -974,10 +973,7 @@ class DataFrame private[sql](
* @group rdd
*/
override def coalesce(numPartitions: Int): DataFrame = {
sqlContext.createDataFrame(
queryExecution.toRdd.coalesce(numPartitions),
schema,
needsConversion = false)
logical.Coalesce(numPartitions, shuffle = false, logicalPlan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil

case logical.Coalesce(numPartitions, shuffle, child) =>
execution.Coalesce(numPartitions, shuffle, planLater(child)) :: Nil
case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,19 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
}
}

/**
* :: DeveloperApi ::
* Return a new RDD that has exactly `numPartitions` partitions.
*/
@DeveloperApi
case class Coalesce(numPartitions: Int, shuffle: Boolean, child: SparkPlan) extends UnaryNode {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Should I rename this to Repartition? There are a lot of conflicts coming from catalyst and sql. In fact, the Coalesce function in catalyst fits it's usage, which is to combine (elements) in a mass or whole.. Here, we are basically repartitioning the dataset. Coalesce with a higher number of partitions sounds weird. Also it might be weird to have two different types of Coalesce. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea repartition sounds better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, or repartition sounds even better than CoalescePartitions

override def output: Seq[Attribute] = child.output

override def execute(): RDD[Row] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
}


/**
* :: DeveloperApi ::
Expand Down