Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
05274e7
Decouple consume functions of physical operators in whole-stage codegen.
viirya Aug 13, 2017
e0e7a6e
shouldStop is called outside consume().
viirya Aug 13, 2017
413707d
Fix the condition and the case of using continue in consume.
viirya Aug 13, 2017
0bb8c0e
More comment.
viirya Aug 13, 2017
6d600d5
Fix aggregation.
viirya Aug 13, 2017
502139a
Also deal with sort case.
viirya Aug 13, 2017
5fe3762
Fix broadcasthash join.
viirya Aug 14, 2017
4bef567
Add more comments.
viirya Aug 14, 2017
1694c9b
Fix the cases where operators set up its produce framework.
viirya Aug 14, 2017
8f3b984
Fix Expand.
viirya Aug 14, 2017
c04da15
Fix BroadcastHashJoin.
viirya Aug 15, 2017
9540195
Rename variables.
viirya Aug 17, 2017
1101b2c
Don't create consume function if the number of arguments are more tha…
viirya Sep 1, 2017
ff77bfe
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Sep 26, 2017
e36ec3c
Remove the part of "continue" processing.
viirya Sep 26, 2017
edb73d6
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Oct 6, 2017
601c225
Fix test.
viirya Oct 7, 2017
476994f
More accurate calculation of valid method parameter length.
viirya Oct 11, 2017
bdc1146
Address comment.
viirya Oct 12, 2017
58eaf00
Address comments.
viirya Jan 24, 2018
2f2d1fd
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Jan 24, 2018
9f0d1da
Copy variables used for creating unsaferow.
viirya Jan 24, 2018
79d0106
Revert vairables copying.
viirya Jan 24, 2018
6384aec
Add final to constants.
viirya Jan 24, 2018
0c4173e
Address comments.
viirya Jan 25, 2018
c859d53
Add tests.
viirya Jan 25, 2018
11946e7
Refactor isValidParamLength a bit.
viirya Jan 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ case class ExpandExec(
child.asInstanceOf[CodegenSupport].inputRDDs()
}

override protected def doConsumeInChainOfFunc: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be not 100% sure about your intention though, I feel this is a little confusing because ExpandExec consume functions can be chained in gen'd code, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doConsume produces something like:

   |for (int $i = 0; $i < ${projections.length}; $i ++) {
   |  switch ($i) {
   |    ${cases.mkString("\n").trim}
   |  }
   |  $numOutput.add(1);
   |  ${consume(ctx, outputColumns)}
   |}

So the consume logic of its parent node is actually wrapped in a local for-loop. It has the same effect as not chain the next consume.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, probably we might need to describe more about exceptional cases we can't use this optimization like HashAggregateExec in https://github.com/apache/spark/pull/18931/files#diff-28cb12941b992ff680c277c651b59aa0R204

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The good news is, the just merged #19324 simplifies the usage of continue in codegen. I'm now testing with it if I can remove this tricky part of continue.


protected override def doProduce(ctx: CodegenContext): String = {
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ case class SortExec(
""".stripMargin.trim
}

override protected def doConsumeInChainOfFunc: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to put the reason why we set false explicitly in each plan like https://github.com/apache/spark/pull/18931/files#diff-28cb12941b992ff680c277c651b59aa0R204

btw, we can have better naming for this? e.g., canPipeline, or something because IIUC this optimisation can be applied in pipelining operators (pipeling is one of database terminology: https://link.springer.com/referenceworkentry/10.1007%2F978-0-387-39940-9_872).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revised this variable name in times, but didn't find a good name to convey its meaning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, other guys might give good suggestions on the naming...


protected override val shouldStopRequired = false

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,143 @@ trait CodegenSupport extends SparkPlan {

ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)

// Under certain conditions, we can put the logic to consume the rows of this operator into
Copy link
Member

@kiszk kiszk Aug 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate certain conditions in the comment if you have time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comment to elaborate the idea.

// another function. So we can prevent a generated function too long to be optimized by JIT.
// The conditions:
// 1. The parent uses all variables in output. we can't defer variable evaluation when consume
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the concrete example when this case prevents consume functions from being split?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., a ProjectExec node doesn't necessarily evaluate all its output variables before continuing doConsume of its parent node. It can defer the evaluation until the variables are needed in the parent node's consume logic.

Once a variable's evaluation is deferred, and if we create a consume function, the variable will be evaluated in the function. But now the references of this variable is out of scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the kind explanation!

// in another function.
// 2. The output variables are not empty. If it's empty, we don't bother to do that.
// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto; I want to know the concrete example, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same reason as above. The variables used to evaluate the row can be out of scope because row construction is deferred until it is used actually.

// can't do it.
val requireAllOutput = output.forall(parent.usedInputs.contains(_))
val consumeFunc =
if (row == null && outputVars.nonEmpty && requireAllOutput) {
constructDoConsumeFunction(ctx, inputVars)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always split consume functions?; we don't need to check if this consume function is too long?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to check it. But the whole-stage codegen is a non-breaking processing which produce/consume calls are embeded together. You don't have a break to check the function length here.

Actually I think it should have no negative effect to split consume functions always. From the benchmarking numbers, looks it shows no harm to normal queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should pass row to this function, if it's non-null, we can save a projection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should create a method for generating rowVar, so that we can use it in both consume and constructDoConsumeFunction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

} else {
parent.doConsume(ctx, inputVars, rowVar)
}
s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
|${parent.doConsume(ctx, inputVars, rowVar)}
|$consumeFunc
""".stripMargin
}

/**
* To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining,
* we may put the consume logic of parent operator into a function and set this flag to `true`.
* The parent operator can know if its consume logic is inlined or in separated function.
*/
private var doConsumeInFunc: Boolean = false

/**
* Returning true means we have at least one consume logic from child operator or this operator is
* separated in a function. If this is `true`, this operator shouldn't use `continue` statement to
* continue on next row, because its generated codes aren't enclosed in main while-loop.
*
* For example, we have generated codes for a query plan like:
* Op1Exec
* Op2Exec
* Op3Exec
*
* If we put the consume code of Op2Exec into a separated function, the generated codes are like:
* while (...) {
* ... // logic of Op3Exec.
* Op2Exec_doConsume(...);
* }
* private boolean Op2Exec_doConsume(...) {
* ... // logic of Op2Exec to consume rows.
* }
* For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`.
*
* Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions
* but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return
* `false`.
*/
protected def doConsumeInChainOfFunc: Boolean = {
val codegenChildren = children.map(_.asInstanceOf[CodegenSupport])
doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc)
}

/**
* The actual java statement this operator should use if there is a need to continue on next row
* in its `doConsume` codes.
*
* while (...) {
* ... // logic of Op3Exec.
* Op2Exec_doConsume(...);
* }
* private boolean Op2Exec_doConsume(...) {
* ... // logic of Op2Exec to consume rows.
* continue; // Wrong. We can't use continue with the while-loop.
* }
* In above code, we can't use `continue` in `Op2Exec_doConsume`.
*
* Instead, we do something like:
* while (...) {
* ... // logic of Op3Exec.
* boolean continueForLoop = Op2Exec_doConsume(...);
* if (continueForLoop) continue;
* }
* private boolean Op2Exec_doConsume(...) {
* ... // logic of Op2Exec to consume rows.
* return true; // When we need to do continue, we return true.
* }
*/
protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) {
"return true;";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove ;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll fix it.

} else {
"continue;"
}

/**
* To prevent concatenated function growing too long to be optimized by JIT. We can separate the
* parent's `doConsume` codes of a `CodegenSupport` operator into a function to call.
*/
protected def constructDoConsumeFunction(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

ctx: CodegenContext,
inputVars: Seq[ExprCode]): String = {
val (callingParams, arguList, inputVarsInFunc) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add fall back path to the original code gen (i.e. without creating consume function) if the number of arguments is more than 254 (255 - one for non-static method) (e.g. #19082).
If the total number of arguments is more than 255, janino will fail its compilation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I'll follow it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's cleaner to return paramNames, paramTypes, paramVars, then we can simply do

void $doConsume(paramTypes.zip(paramNames).map(i => i._1 + " " + i._2).mkString(", "))

and

doConsumeFuncName(paramNames.mkString(", "))

inside constructConsumeParameters we can just create 3 mutable collections and go through variables to fill these collections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds cleaner. I need to change it a little because the arguments and parameters are not the same. Some variables are not able parameterized, e.g., constants or statements.

constructConsumeParameters(ctx, output, inputVars)
parent.doConsumeInFunc = true
val rowVar = ExprCode("", "false", "unsafeRow")
val doConsume = ctx.freshName("doConsume")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put the operator name in this function name?

Copy link
Member Author

@viirya viirya Jan 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The freshName here will add variablePrefix before doConsume. So it already has operator name, e.g., agg_doConsume.

val doConsumeFuncName = ctx.addNewFunction(doConsume,
s"""
| private boolean $doConsume($arguList) throws java.io.IOException {
| ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
| return false;
| }
""".stripMargin)

s"""
| boolean continueForLoop = $doConsumeFuncName($callingParams);
| if (continueForLoop) $continueStatementInDoConsume
""".stripMargin
}

/**
* Returns source code for calling consume function and the argument list of the consume function
* and also the `ExprCode` for the argument list.
*/
protected def constructConsumeParameters(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

ctx: CodegenContext,
attributes: Seq[Attribute],
variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
val params = variables.zipWithIndex.map { case (ev, i) =>
val callingParam = ev.value + ", " + ev.isNull
val arguName = ctx.freshName(s"expr_$i")
val arguIsNull = ctx.freshName(s"exprIsNull_$i")
(callingParam,
ctx.javaType(attributes(i).dataType) + " " + arguName + ", boolean " + arguIsNull,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about s"${ctx.javaType(attributes(i).dataType)} $arguName, boolean $arguIsNull"?

ExprCode("", arguIsNull, arguName))
}.unzip3
(params._1.mkString(", "),
params._2.mkString(", "),
params._3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above 3 lines can be one line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
* Returns source code to evaluate all the variables, and clear the code of them, to prevent
* them to be evaluated twice.
Expand Down Expand Up @@ -252,6 +382,8 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
child.execute() :: Nil
}

override protected def doConsumeInChainOfFunc: Boolean = false

override def doProduce(ctx: CodegenContext): String = {
val input = ctx.freshName("input")
// Right now, InputAdapter is only used when there is one input RDD.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ case class HashAggregateExec(
// The variables used as aggregation buffer. Only used for aggregation without keys.
private var bufVars: Seq[ExprCode] = _

override protected def doConsumeInChainOfFunc: Boolean = false

private def doProduceWithoutKeys(ctx: CodegenContext): String = {
val initAgg = ctx.freshName("initAgg")
ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ case class FilterExec(condition: Expression, child: SparkPlan)
s"""
|$evaluated
|${ev.code}
|if (${nullCheck}!${ev.value}) continue;
|if (${nullCheck}!${ev.value}) $continueStatementInDoConsume
""".stripMargin
}

Expand Down Expand Up @@ -317,7 +317,7 @@ case class SampleExec(
""".stripMargin.trim)

s"""
| if ($sampler.sample() == 0) continue;
| if ($sampler.sample() == 0) $continueStatementInDoConsume
| $numOutput.add(1);
| ${consume(ctx, input)}
""".stripMargin.trim
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,15 @@ case class BroadcastHashJoinExec(
private def getJoinCondition(
ctx: CodegenContext,
input: Seq[ExprCode],
uniqueKeyCodePath: Boolean,
anti: Boolean = false): (String, String, Seq[ExprCode]) = {
val matched = ctx.freshName("matched")
val buildVars = genBuildSideVars(ctx, matched)
val continueStatement = if (uniqueKeyCodePath) {
continueStatementInDoConsume
} else {
"continue;"
}
val checkCondition = if (condition.isDefined) {
val expr = condition.get
// evaluate the variables from build side that used by condition
Expand All @@ -198,18 +204,18 @@ case class BroadcastHashJoinExec(
ctx.currentVars = input ++ buildVars
val ev =
BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx)
val skipRow = if (!anti) {
val skipRow = if (!(anti && uniqueKeyCodePath)) {
s"${ev.isNull} || !${ev.value}"
} else {
s"!${ev.isNull} && ${ev.value}"
}
s"""
|$eval
|${ev.code}
|if ($skipRow) continue;
|if ($skipRow) $continueStatement
""".stripMargin
} else if (anti) {
"continue;"
} else if (anti && uniqueKeyCodePath) {
continueStatement
} else {
""
}
Expand All @@ -221,21 +227,22 @@ case class BroadcastHashJoinExec(
*/
private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input)
val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input, uniqueKeyCodePath)
val numOutput = metricTerm(ctx, "numOutputRows")

val resultVars = buildSide match {
case BuildLeft => buildVars ++ input
case BuildRight => input ++ buildVars
}
if (broadcastRelation.value.keyIsUnique) {
if (uniqueKeyCodePath) {
s"""
|// generate join key for stream side
|${keyEv.code}
|// find matches from HashedRelation
|UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
|if ($matched == null) continue;
|if ($matched == null) $continueStatementInDoConsume
|$checkCondition
|$numOutput.add(1);
|${consume(ctx, resultVars)}
Expand All @@ -250,7 +257,7 @@ case class BroadcastHashJoinExec(
|${keyEv.code}
|// find matches from HashRelation
|$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value});
|if ($matches == null) continue;
|if ($matches == null) $continueStatementInDoConsume
|while ($matches.hasNext()) {
| UnsafeRow $matched = (UnsafeRow) $matches.next();
| $checkCondition
Expand Down Expand Up @@ -342,16 +349,17 @@ case class BroadcastHashJoinExec(
*/
private def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath)
val numOutput = metricTerm(ctx, "numOutputRows")
if (broadcastRelation.value.keyIsUnique) {
if (uniqueKeyCodePath) {
s"""
|// generate join key for stream side
|${keyEv.code}
|// find matches from HashedRelation
|UnsafeRow $matched = $anyNull ? null: (UnsafeRow)$relationTerm.getValue(${keyEv.value});
|if ($matched == null) continue;
|if ($matched == null) $continueStatementInDoConsume
|$checkCondition
|$numOutput.add(1);
|${consume(ctx, input)}
Expand All @@ -365,14 +373,14 @@ case class BroadcastHashJoinExec(
|${keyEv.code}
|// find matches from HashRelation
|$iteratorCls $matches = $anyNull ? null : ($iteratorCls)$relationTerm.get(${keyEv.value});
|if ($matches == null) continue;
|if ($matches == null) $continueStatementInDoConsume
|boolean $found = false;
|while (!$found && $matches.hasNext()) {
| UnsafeRow $matched = (UnsafeRow) $matches.next();
| $checkCondition
| $found = true;
|}
|if (!$found) continue;
|if (!$found) $continueStatementInDoConsume
|$numOutput.add(1);
|${consume(ctx, input)}
""".stripMargin
Expand All @@ -386,7 +394,7 @@ case class BroadcastHashJoinExec(
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath)
val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath, true)
val numOutput = metricTerm(ctx, "numOutputRows")

if (uniqueKeyCodePath) {
Expand Down Expand Up @@ -424,7 +432,7 @@ case class BroadcastHashJoinExec(
| $checkCondition
| $found = true;
| }
| if ($found) continue;
| if ($found) $continueStatementInDoConsume
| }
|}
|$numOutput.add(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ case class SortMergeJoinExec(
}
}

override protected def doConsumeInChainOfFunc: Boolean = false

override def doProduce(ctx: CodegenContext): String = {
ctx.copyResult = true
val leftInput = ctx.freshName("leftInput")
Expand Down