Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -585,21 +585,26 @@ case class SortMergeJoinExec(

val iterator = ctx.freshName("iterator")
val numOutput = metricTerm(ctx, "numOutputRows")
val joinedRow = ctx.freshName("joined")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need to do this when there is CodegenFallback in the condition expressions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The joined row should always be used for correctness. We don't know what code the expression will generate, so we should plan on always passing the correct input row. Setting left and right on a joined row is a cheap operation, so I'd rather do it correctly than rely on something brittle like isInstanceOf[CodegenFallback].

Copy link
Contributor Author

@rdblue rdblue Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ended up being a bit more complicated. There are two problems (in 2.0.0 and 2.1.1 at least). The first is what this fixes, which is that the INPUT_ROW in the codegen context points to the wrong row. This is fixed and now has a test that fails if you uncomment the line that sets INPUT_ROW.

The second problem is in the check for CodegenFallback fails to check whether the condition supports codegen in some plans. To get the test to fail, I had to add a projection to exercise the path where this happens. I'll add a second commit for this problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second problem was fixed in this commit: 6b6dd68

I still think that the codegen problem should be fixed. Detecting CodgenFallback is imperfect, but will still generate code and run it. I think we should either remove codegen from CodegenFallback or add this fix to ensure that code works, even if we don't expect to run it.

val (beforeLoop, condCheck) = if (condition.isDefined) {
// Split the code of creating variables based on whether it's used by condition or not.
val loaded = ctx.freshName("loaded")
val (leftBefore, leftAfter) = splitVarsByCondition(left.output, leftVars)
val (rightBefore, rightAfter) = splitVarsByCondition(right.output, rightVars)
// Generate code for condition
// set INPUT_ROW to the joined row because it is the data for the condition
ctx.INPUT_ROW = joinedRow
ctx.currentVars = leftVars ++ rightVars
val cond = BindReferences.bindReference(condition.get, output).genCode(ctx)
// evaluate the columns those used by condition before loop
val before = s"""
|boolean $loaded = false;
|$joinedRow.withLeft($leftRow);
|$leftBefore
""".stripMargin

val checking = s"""
|$joinedRow.withRight($rightRow);
|$rightBefore
|${cond.code}
|if (${cond.isNull} || !${cond.value}) continue;
Expand All @@ -615,6 +620,7 @@ case class SortMergeJoinExec(
}

s"""
|JoinedRow $joinedRow = new JoinedRow();
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
| ${beforeLoop.trim}
| scala.collection.Iterator<UnsafeRow> $iterator = $matches.generateIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.execution.joins

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{And, BinaryExpression, Expression, Predicate}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.Join
Expand Down Expand Up @@ -124,7 +125,8 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
rightPlan: SparkPlan) = {
val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition,
leftPlan, rightPlan)
EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
EnsureRequirements(spark.sessionState.conf)
.apply(ProjectExec(sortMergeJoin.output, sortMergeJoin))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 2.1.1, an extra project causes WholeStageCodegenExec to not detect that the expression contains CodegenFallback. This is no longer the case. Like I said, there is no longer a good way to test what happens when CodegenFallback generates code. If there were, I'd use that here to test the case.

I guess I could add a testing case to WholeStageCodegenExec to make sure the code is generated correctly.

}

test(s"$testName using BroadcastHashJoin (build=left)") {
Expand Down Expand Up @@ -228,6 +230,27 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
)
)

testInnerJoin(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still can pass without the changes in this PR. What is the purpose of this test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails in 2.1.1 and versions before 6b6dd68. I'm not sure how to exercise the code generated by CodegenFallback with that fix, but this test is valid for the 2.1.1 branch.

"inner join with CodegenFallback filter",
myUpperCaseData,
myLowerCaseData,
() => {
// add a second equality check that is implemented with a CodegenFallback
// this expression is in the test so that no one implements codegen for it
And(
(myUpperCaseData.col("N") === myLowerCaseData.col("n")).expr,
EqNoCodegen(
org.apache.spark.sql.functions.lower(myUpperCaseData.col("L")).expr,
myLowerCaseData.col("l").expr))
},
Seq(
(1, "A", 1, "a"),
(2, "B", 2, "b"),
(3, "C", 3, "c"),
(4, "D", 4, "d")
)
)

{
lazy val left = myTestData1.where("a = 1")
lazy val right = myTestData2.where("a = 1")
Expand Down Expand Up @@ -287,3 +310,10 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext {
(Row(2, 2), "L2", Row(2, 2), "R2")))
}
}

case class EqNoCodegen(left: Expression, right: Expression) extends BinaryExpression
with CodegenFallback with Serializable with Predicate {
override protected def nullSafeEval(left: Any, right: Any): Boolean = {
left == right
}
}