Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {

// Construct a project list from plan's output, while the value is always NULL.
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
plan.output.map{ a => Alias(Literal(null), a.name)(a.exprId) }
plan.output.map{ a => Alias(Cast(Literal(null), a.dataType), a.name)(a.exprId) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use CastSupport.cast


def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p: Union if p.children.forall(isEmptyLocalRelation) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{IntegerType, StructType}

class PropagateEmptyRelationSuite extends PlanTest {
object Optimize extends RuleExecutor[LogicalPlan] {
Expand All @@ -37,7 +37,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
PruneFilters,
PropagateEmptyRelation) :: Nil
PropagateEmptyRelation,
CollapseProject) :: Nil
}

object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] {
Expand All @@ -48,7 +49,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
ReplaceExceptWithAntiJoin,
ReplaceIntersectWithSemiJoin,
PushDownPredicate,
PruneFilters) :: Nil
PruneFilters,
CollapseProject) :: Nil
}

val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
Expand Down Expand Up @@ -79,18 +81,21 @@ class PropagateEmptyRelationSuite extends PlanTest {

(true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
(true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
(true, false, LeftOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
(true, false, LeftOuter,
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
(true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
(true, false, FullOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
(true, false, FullOuter,
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
(true, false, LeftAnti, Some(testRelation1)),
(true, false, LeftSemi, Some(LocalRelation('a.int))),

(false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
(false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
(false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
(false, true, RightOuter,
Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
(false, true, FullOuter, Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
(false, true, FullOuter,
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
(false, true, LeftAnti, Some(LocalRelation('a.int))),
(false, true, LeftSemi, Some(LocalRelation('a.int))),

Expand All @@ -107,7 +112,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
val query = testRelation1
.where(left)
.join(testRelation2.where(right), joinType = jt, condition = Some('a.attr == 'b.attr))
val optimized = Optimize.execute(query.analyze)
val optimized = Optimize.execute(query.analyze).analyze
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the subsequent analyze here is to fill in timezones on casts inserted by Optimize

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fill the time zone in the optimizer rule

val correctAnswer =
answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze))
comparePlans(optimized, correctAnswer)
Expand Down Expand Up @@ -209,4 +214,11 @@ class PropagateEmptyRelationSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("propagate empty relation keeps the plan resolved") {
val query = testRelation1.join(
LocalRelation('a.int, 'b.int), UsingJoin(FullOuter, "a" :: Nil), None)
val optimized = Optimize.execute(query.analyze)
assert(optimized.resolved)
}
}