Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(codec.compressedOutputStream(bos))
while (iter.hasNext && (n < 0 || count < n)) {
// `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is
// not hit.
while ((n < 0 || count < n) && iter.hasNext) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch this one!

val row = iter.next().asInstanceOf[UnsafeRow]
out.writeInt(row.getSizeInBytes)
row.writeToStream(out, buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val numOutput = metricTerm(ctx, "numOutputRows")

val initTerm = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initRange")
val number = ctx.addMutableState(CodeGenerator.JAVA_LONG, "number")
val nextIndex = ctx.addMutableState(CodeGenerator.JAVA_LONG, "nextIndex")

val value = ctx.freshName("value")
val ev = ExprCode.forNonNullValue(JavaCode.variable(value, LongType))
Expand All @@ -397,7 +397,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
// within a batch, while the code in the outer loop is setting batch parameters and updating
// the metrics.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment should be updated too.


// Once number == batchEnd, it's time to progress to the next batch.
// Once nextIndex == batchEnd, it's time to progress to the next batch.
val batchEnd = ctx.addMutableState(CodeGenerator.JAVA_LONG, "batchEnd")

// How many values should still be generated by this range operator.
Expand All @@ -421,13 +421,13 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
|
| $BigInt st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
| if (st.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) {
| $number = Long.MAX_VALUE;
| $nextIndex = Long.MAX_VALUE;
| } else if (st.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) {
| $number = Long.MIN_VALUE;
| $nextIndex = Long.MIN_VALUE;
| } else {
| $number = st.longValue();
| $nextIndex = st.longValue();
| }
| $batchEnd = $number;
| $batchEnd = $nextIndex;
|
| $BigInt end = index.add($BigInt.ONE).multiply(numElement).divide(numSlice)
| .multiply(step).add(start);
Expand All @@ -440,7 +440,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| }
|
| $BigInt startToEnd = $BigInt.valueOf(partitionEnd).subtract(
| $BigInt.valueOf($number));
| $BigInt.valueOf($nextIndex));
| $numElementsTodo = startToEnd.divide(step).longValue();
| if ($numElementsTodo < 0) {
| $numElementsTodo = 0;
Expand All @@ -453,45 +453,89 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
val localIdx = ctx.freshName("localIdx")
val localEnd = ctx.freshName("localEnd")
val range = ctx.freshName("range")
val shouldStop = if (parent.needStopCheck) {
s"if (shouldStop()) { $number = $value + ${step}L; return; }"

val processingLoop = if (parent.needStopCheck) {
// TODO (cloud-fan): do we really need to do the stop check within batch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the motivation of bringing the discussion at #10989 (comment)

If it's OK to not interrupt the loop and buffer result rows for join, I think it's also OK here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't, then we would consume more rows than needed, don't we?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we just buffer more rows in BufferRowIterator.currentRows, it's only about performance IIUC.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmmh, but localIdx would become localEnd then, right? So the UTs you added would fail, or am I missing something?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there is BroadcastHashJoin case doesn't mean it is generally ok to buffer more rows. If it is possible, we still should avoid it.

s"""
|int $localIdx = 0;
|for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
| long $value = $nextIndex;
| ${consume(ctx, Seq(ev))}
| $nextIndex += ${step}L;
|}
|$numOutput.add($localIdx);
|$inputMetrics.incRecordsRead($localIdx);
""".stripMargin
} else {
s"""
|for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
| long $value = ((long)$localIdx * ${step}L) + $nextIndex;
| ${consume(ctx, Seq(ev))}
|}
|$nextIndex = $batchEnd;
|$numOutput.add($localEnd);
|$inputMetrics.incRecordsRead($localEnd);
""".stripMargin
}

val loopCondition = if (parent.needStopCheck) {
"!shouldStop()"
} else {
"// shouldStop check is eliminated"
"true"
}

// An overview of the Range processing.
//
// For each partition, the Range task needs to produce records from partition start(inclusive)
// to end(exclusive). For better performance, we separate the partition range into batches, and
// use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner
// for loop is used to iterate records inside a batch.
//
// `nextIndex` tracks the index of the next record that is going to be consumed, initialized
// with partition start. `batchEnd` tracks the end index of the current batch, initialized
// with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero.
// Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex`
// is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next
// batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is
// initialized with `nextIndex`, the first iteration of outer loop will not enter the inner
// loop but just update the `batchEnd`.
//
// The inner loop iterates from 0 to `localEnd`, which is calculated by
// `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in
// the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always
// divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends
// up being equal to `batchEnd` when the inner loop finishes.
//
// The inner loop can be interrupted, if the query has produced at least one result row, so that
// we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop,
// because `nextIndex` is updated per loop iteration and remembers how far we have processed.

s"""
| // initialize Range
| if (!$initTerm) {
| $initTerm = true;
| $initRangeFuncName(partitionIndex);
| }
|
| while (true) {
| long $range = $batchEnd - $number;
| while ($loopCondition) {
| long $range = $batchEnd - $nextIndex;
| if ($range != 0L) {
| int $localEnd = (int)($range / ${step}L);
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
| long $value = ((long)$localIdx * ${step}L) + $number;
| ${consume(ctx, Seq(ev))}
| $shouldStop
| $processingLoop
| } else {
| long $nextBatchTodo;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you move these lines in the else?

@cloud-fan cloud-fan Oct 3, 2018

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we don't do return when we need to interrupt the loop. Move these lines to else, so that we won't hit this code path when loop is interrupted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but in this way we are looping 2 more times in the outer loop, because we either go in the if or in the else while previously we were doing both on the same iteration IIUC. I don't think it is a big issue but it may introduce a (very small probably) overhead compared to the previous case.

Since if IIUC in the first iteration we just go to the else branch now, since batchEnd is inited to nextIndex, do you think it is feasible to move this block before the inner loop? So we would solve both issues, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea!

| if ($numElementsTodo > ${batchSize}L) {
| $nextBatchTodo = ${batchSize}L;
| $numElementsTodo -= ${batchSize}L;
| } else {
| $nextBatchTodo = $numElementsTodo;
| $numElementsTodo = 0;
| if ($nextBatchTodo == 0) break;
| }
| $number = $batchEnd;
| $batchEnd += $nextBatchTodo * ${step}L;
| }
|
| $taskContext.killTaskIfInterrupted();
|
| long $nextBatchTodo;
| if ($numElementsTodo > ${batchSize}L) {
| $nextBatchTodo = ${batchSize}L;
| $numElementsTodo -= ${batchSize}L;
| } else {
| $nextBatchTodo = $numElementsTodo;
| $numElementsTodo = 0;
| if ($nextBatchTodo == 0) break;
| }
| $numOutput.add($nextBatchTodo);
| $inputMetrics.incRecordsRead($nextBatchTodo);
|
| $batchEnd += $nextBatchTodo * ${step}L;
| }
""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.Random
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.ui.SQLAppStatusStore
import org.apache.spark.sql.execution.{FilterExec, RangeExec, WholeStageCodegenExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
test("writing data out metrics with dynamic partition: parquet") {
testMetricsDynamicPartition("parquet", "parquet", "t1")
}

test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") {
val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0)

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
df.collect()
df.queryExecution.executedPlan.foreach {
case w: WholeStageCodegenExec =>
w.child.foreach {
case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big issue, but if later we change things and these are not anymore here, we would not run the assert here. I would suggest to collect the FilterExec and the RangeExec and enforce that we collected 1 of both and then assert on them. What do you think?

Moreover, nit: would it be possible to dedup the code here? The tests are very similar with codegen on and off, only collecting the two exec nodes differs...

case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
case _ =>
}

case _ =>
}
}

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
df.collect()
df.queryExecution.executedPlan.foreach {
case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L)
case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L)
case _ =>
}
}

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
df.queryExecution.executedPlan.foreach(_.resetMetrics())
// For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
// produce 4 rows(0, 1, 2, 3).
df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
df.queryExecution.executedPlan.foreach {
case w: WholeStageCodegenExec =>
w.child.foreach {
// Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
// should be 4 * 2.
case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
case _ =>
}

case _ =>
}
}

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
df.queryExecution.executedPlan.foreach(_.resetMetrics())
// For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should
// produce 4 rows(0, 1, 2, 3).
df.queryExecution.toRdd.mapPartitions(_.take(2)).collect()
df.queryExecution.executedPlan.foreach {
// Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range
// should be 4 * 2.
case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L)
case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L)
case _ =>
}
}

val df2 = df.limit(2)

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
// Top-most limit will only run the first task, so totally the Filter produces 2 rows, and
// Range produces 4 rows(0, 1, 2, 3).
df2.collect()
df2.queryExecution.executedPlan.foreach {
case w: WholeStageCodegenExec =>
w.child.foreach {
case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L)
case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L)
case _ =>
}

case _ =>
}
}

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
// Top-most limit will only run the first task, so totally the Filter produces 2 rows, and

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does first task mean first partition?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

// Range produces 4 rows(0, 1, 2, 3).
df2.collect()
df2.queryExecution.executedPlan.foreach {
case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L)
case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L)
case _ =>
}
}
}
}