From 454efab90098b7ea62803e9237290a7bb2da87a0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 3 Oct 2018 13:22:07 +0800 Subject: [PATCH 1/5] range metrics can be wrong if the result rows are not fully consumed --- .../spark/sql/execution/SparkPlan.scala | 4 +- .../execution/basicPhysicalOperators.scala | 106 +++++++++++++----- .../execution/metric/SQLMetricsSuite.scala | 91 ++++++++++++++- 3 files changed, 168 insertions(+), 33 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ab6031c436e9..9d9b020309d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -250,7 +250,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) - while (iter.hasNext && (n < 0 || count < n)) { + // `iter.hasNext` may produce one row and buffer it, we should only call it when the limit is + // not hit. + while ((n < 0 || count < n) && iter.hasNext) { val row = iter.next().asInstanceOf[UnsafeRow] out.writeInt(row.getSizeInBytes) row.writeToStream(out, buffer) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 222a1b8bc730..a873538a84d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -378,7 +378,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val numOutput = metricTerm(ctx, "numOutputRows") val initTerm = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initRange") - val number = ctx.addMutableState(CodeGenerator.JAVA_LONG, "number") + val nextIndex = ctx.addMutableState(CodeGenerator.JAVA_LONG, "nextIndex") val value = ctx.freshName("value") val ev = ExprCode.forNonNullValue(JavaCode.variable(value, LongType)) @@ -397,7 +397,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // within a batch, while the code in the outer loop is setting batch parameters and updating // the metrics. - // Once number == batchEnd, it's time to progress to the next batch. + // Once nextIndex == batchEnd, it's time to progress to the next batch. val batchEnd = ctx.addMutableState(CodeGenerator.JAVA_LONG, "batchEnd") // How many values should still be generated by this range operator. @@ -421,13 +421,13 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | | $BigInt st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); | if (st.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) { - | $number = Long.MAX_VALUE; + | $nextIndex = Long.MAX_VALUE; | } else if (st.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) { - | $number = Long.MIN_VALUE; + | $nextIndex = Long.MIN_VALUE; | } else { - | $number = st.longValue(); + | $nextIndex = st.longValue(); | } - | $batchEnd = $number; + | $batchEnd = $nextIndex; | | $BigInt end = index.add($BigInt.ONE).multiply(numElement).divide(numSlice) | .multiply(step).add(start); @@ -440,7 +440,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | } | | $BigInt startToEnd = $BigInt.valueOf(partitionEnd).subtract( - | $BigInt.valueOf($number)); + | $BigInt.valueOf($nextIndex)); | $numElementsTodo = startToEnd.divide(step).longValue(); | if ($numElementsTodo < 0) { | $numElementsTodo = 0; @@ -453,11 +453,63 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val localIdx = ctx.freshName("localIdx") val localEnd = ctx.freshName("localEnd") val range = ctx.freshName("range") - val shouldStop = if (parent.needStopCheck) { - s"if (shouldStop()) { $number = $value + ${step}L; return; }" + + val processingLoop = if (parent.needStopCheck) { + // TODO (cloud-fan): do we really need to do the stop check within batch? + s""" + |int $localIdx = 0; + |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) { + | long $value = $nextIndex; + | ${consume(ctx, Seq(ev))} + | $nextIndex += ${step}L; + |} + |$numOutput.add($localIdx); + |$inputMetrics.incRecordsRead($localIdx); + """.stripMargin + } else { + s""" + |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { + | long $value = ((long)$localIdx * ${step}L) + $nextIndex; + | ${consume(ctx, Seq(ev))} + |} + |$nextIndex = $batchEnd; + |$numOutput.add($localEnd); + |$inputMetrics.incRecordsRead($localEnd); + """.stripMargin + } + + val loopCondition = if (parent.needStopCheck) { + "!shouldStop()" } else { - "// shouldStop check is eliminated" + "true" } + + // An overview of the Range processing. + // + // For each partition, the Range task needs to produce records from partition start(inclusive) + // to end(exclusive). For better performance, we separate the partition range into batches, and + // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner + // for loop is used to iterate records inside a batch. + // + // `nextIndex` tracks the index of the next record that is going to be consumed, initialized + // with partition start. `batchEnd` tracks the end index of the current batch, initialized + // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero. + // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex` + // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next + // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is + // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner + // loop but just update the `batchEnd`. + // + // The inner loop iterates from 0 to `localEnd`, which is calculated by + // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in + // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always + // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends + // up being equal to `batchEnd` when the inner loop finishes. + // + // The inner loop can be interrupted, if the query has produced at least one result row, so that + // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop, + // because `nextIndex` is updated per loop iteration and remembers how far we have processed. + s""" | // initialize Range | if (!$initTerm) { @@ -465,33 +517,25 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $initRangeFuncName(partitionIndex); | } | - | while (true) { - | long $range = $batchEnd - $number; + | while ($loopCondition) { + | long $range = $batchEnd - $nextIndex; | if ($range != 0L) { | int $localEnd = (int)($range / ${step}L); - | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { - | long $value = ((long)$localIdx * ${step}L) + $number; - | ${consume(ctx, Seq(ev))} - | $shouldStop + | $processingLoop + | } else { + | long $nextBatchTodo; + | if ($numElementsTodo > ${batchSize}L) { + | $nextBatchTodo = ${batchSize}L; + | $numElementsTodo -= ${batchSize}L; + | } else { + | $nextBatchTodo = $numElementsTodo; + | $numElementsTodo = 0; + | if ($nextBatchTodo == 0) break; | } - | $number = $batchEnd; + | $batchEnd += $nextBatchTodo * ${step}L; | } | | $taskContext.killTaskIfInterrupted(); - | - | long $nextBatchTodo; - | if ($numElementsTodo > ${batchSize}L) { - | $nextBatchTodo = ${batchSize}L; - | $numElementsTodo -= ${batchSize}L; - | } else { - | $nextBatchTodo = $numElementsTodo; - | $numElementsTodo = 0; - | if ($nextBatchTodo == 0) break; - | } - | $numOutput.add($nextBatchTodo); - | $inputMetrics.incRecordsRead($nextBatchTodo); - | - | $batchEnd += $nextBatchTodo * ${step}L; | } """.stripMargin } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index d45eb0c27a6b..f8b9a8ad7e79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -24,7 +24,7 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.execution.ui.SQLAppStatusStore +import org.apache.spark.sql.execution.{FilterExec, RangeExec, WholeStageCodegenExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -517,4 +517,93 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared test("writing data out metrics with dynamic partition: parquet") { testMetricsDynamicPartition("parquet", "parquet", "t1") } + + test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") { + val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + df.collect() + df.queryExecution.executedPlan.foreach { + case w: WholeStageCodegenExec => + w.child.foreach { + case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L) + case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L) + case _ => + } + + case _ => + } + } + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + df.collect() + df.queryExecution.executedPlan.foreach { + case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L) + case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L) + case _ => + } + } + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + df.queryExecution.executedPlan.foreach(_.resetMetrics()) + // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should + // produce 4 rows(0, 1, 2, 3). + df.queryExecution.toRdd.mapPartitions(_.take(2)).collect() + df.queryExecution.executedPlan.foreach { + case w: WholeStageCodegenExec => + w.child.foreach { + // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range + // should be 4 * 2. + case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L) + case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L) + case _ => + } + + case _ => + } + } + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + df.queryExecution.executedPlan.foreach(_.resetMetrics()) + // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should + // produce 4 rows(0, 1, 2, 3). + df.queryExecution.toRdd.mapPartitions(_.take(2)).collect() + df.queryExecution.executedPlan.foreach { + // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range + // should be 4 * 2. + case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L) + case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L) + case _ => + } + } + + val df2 = df.limit(2) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and + // Range produces 4 rows(0, 1, 2, 3). + df2.collect() + df2.queryExecution.executedPlan.foreach { + case w: WholeStageCodegenExec => + w.child.foreach { + case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L) + case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L) + case _ => + } + + case _ => + } + } + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and + // Range produces 4 rows(0, 1, 2, 3). + df2.collect() + df2.queryExecution.executedPlan.foreach { + case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L) + case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L) + case _ => + } + } + } } From 1c94d13895f4537111518e5b26075395bcc250b0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 4 Oct 2018 10:15:56 +0800 Subject: [PATCH 2/5] address comments --- .../execution/basicPhysicalOperators.scala | 26 ++-- .../execution/metric/SQLMetricsSuite.scala | 120 ++++++------------ 2 files changed, 53 insertions(+), 93 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index a873538a84d5..a8adc9e72417 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -394,8 +394,8 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // operator produces elements in batches. After a batch is complete, the metrics are updated // and a new batch is started. // In the implementation below, the code in the inner loop is producing all the values - // within a batch, while the code in the outer loop is setting batch parameters and updating - // the metrics. + // within a batch and updating metrics afterwards, while the code in the outer loop is setting + // batch parameters. // Once nextIndex == batchEnd, it's time to progress to the next batch. val batchEnd = ctx.addMutableState(CodeGenerator.JAVA_LONG, "batchEnd") @@ -452,7 +452,6 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val localIdx = ctx.freshName("localIdx") val localEnd = ctx.freshName("localEnd") - val range = ctx.freshName("range") val processingLoop = if (parent.needStopCheck) { // TODO (cloud-fan): do we really need to do the stop check within batch? @@ -493,12 +492,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // // `nextIndex` tracks the index of the next record that is going to be consumed, initialized // with partition start. `batchEnd` tracks the end index of the current batch, initialized - // with `nextIndex`. In the outer loop, we first check if `batchEnd - nextIndex` is non-zero. - // Note that it can be negative, because range step can be negative. If `batchEnd - nextIndex` - // is non-zero, we enter the inner loop. Otherwise, we update `batchEnd` to process the next - // batch. If `batchEnd` reaches partition end, exit the outer loop. Since `batchEnd` is - // initialized with `nextIndex`, the first iteration of outer loop will not enter the inner - // loop but just update the `batchEnd`. + // with `nextIndex`. In the outer loop, we first check if `nextIndex == batchEnd`. If it's true, + // it means the current batch is fully consumed, and we will update `batchEnd` to process the + // next batch. If `batchEnd` reaches partition end, exit the outer loop. Then we enter the inner + // loop. Note that, when we enter inner loop, `nextIndex` must be different than `batchEnd`, + // otherwise the outer loop should already be exited. // // The inner loop iterates from 0 to `localEnd`, which is calculated by // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in @@ -507,7 +505,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // up being equal to `batchEnd` when the inner loop finishes. // // The inner loop can be interrupted, if the query has produced at least one result row, so that - // we don't buffer many result rows and waste memory. It's ok to interrupt the inner loop, + // we don't buffer too many result rows and waste memory. It's ok to interrupt the inner loop, // because `nextIndex` is updated per loop iteration and remembers how far we have processed. s""" @@ -518,11 +516,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | } | | while ($loopCondition) { - | long $range = $batchEnd - $nextIndex; - | if ($range != 0L) { - | int $localEnd = (int)($range / ${step}L); - | $processingLoop - | } else { + | if ($nextIndex == $batchEnd) { | long $nextBatchTodo; | if ($numElementsTodo > ${batchSize}L) { | $nextBatchTodo = ${batchSize}L; @@ -535,6 +529,8 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $batchEnd += $nextBatchTodo * ${step}L; | } | + | int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L); + | $processingLoop | $taskContext.killTaskIfInterrupted(); | } """.stripMargin diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index f8b9a8ad7e79..e214b04f2437 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -24,7 +24,7 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.execution.{FilterExec, RangeExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -519,90 +519,54 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") { - val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0) - - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { - df.collect() - df.queryExecution.executedPlan.foreach { - case w: WholeStageCodegenExec => - w.child.foreach { - case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L) - case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L) - case _ => - } - - case _ => - } - } - - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - df.collect() - df.queryExecution.executedPlan.foreach { - case f: FilterExec => assert(f.metrics("numOutputRows").value == 10L) - case r: RangeExec => assert(r.metrics("numOutputRows").value == 30L) + def checkFilterAndRangeMetrics( + df: DataFrame, + filterNumOutputs: Int, + rangeNumOutputs: Int): Unit = { + var filter: FilterExec = null + var range: RangeExec = null + val collectFilterAndRange: SparkPlan => Unit = { + case f: FilterExec => + assert(filter == null, "the query should only have one Filter") + filter = f + case r: RangeExec => + assert(range == null, "the query should only have one Range") + range = r case _ => } - } - - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { - df.queryExecution.executedPlan.foreach(_.resetMetrics()) - // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should - // produce 4 rows(0, 1, 2, 3). - df.queryExecution.toRdd.mapPartitions(_.take(2)).collect() - df.queryExecution.executedPlan.foreach { - case w: WholeStageCodegenExec => - w.child.foreach { - // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range - // should be 4 * 2. - case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L) - case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L) - case _ => - } - - case _ => + if (SQLConf.get.wholeStageEnabled) { + df.queryExecution.executedPlan.foreach { + case w: WholeStageCodegenExec => + w.child.foreach(collectFilterAndRange) + case _ => + } + } else { + df.queryExecution.executedPlan.foreach(collectFilterAndRange) } - } - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - df.queryExecution.executedPlan.foreach(_.resetMetrics()) - // For each partition, we get 2 rows. Then the Filter should produce 2 rows, and Range should - // produce 4 rows(0, 1, 2, 3). - df.queryExecution.toRdd.mapPartitions(_.take(2)).collect() - df.queryExecution.executedPlan.foreach { - // Range has 2 partitions, so the expected metrics for filter should be 2 * 2, for range - // should be 4 * 2. - case f: FilterExec => assert(f.metrics("numOutputRows").value == 4L) - case r: RangeExec => assert(r.metrics("numOutputRows").value == 8L) - case _ => - } + assert(filter != null && range != null, "the query doesn't have Filter and Range") + assert(filter.metrics("numOutputRows").value == filterNumOutputs) + assert(range.metrics("numOutputRows").value == rangeNumOutputs) } + val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0) val df2 = df.limit(2) - - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { - // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and - // Range produces 4 rows(0, 1, 2, 3). - df2.collect() - df2.queryExecution.executedPlan.foreach { - case w: WholeStageCodegenExec => - w.child.foreach { - case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L) - case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L) - case _ => - } - - case _ => - } - } - - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - // Top-most limit will only run the first task, so totally the Filter produces 2 rows, and - // Range produces 4 rows(0, 1, 2, 3). - df2.collect() - df2.queryExecution.executedPlan.foreach { - case f: FilterExec => assert(f.metrics("numOutputRows").value == 2L) - case r: RangeExec => assert(r.metrics("numOutputRows").value == 4L) - case _ => + Seq(true, false).foreach { wholeStageEnabled => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageEnabled.toString) { + df.collect() + checkFilterAndRangeMetrics(df, filterNumOutputs = 10, rangeNumOutputs = 30) + + df.queryExecution.executedPlan.foreach(_.resetMetrics()) + // For each partition, we get 2 rows. Then the Filter should produce 2 rows per-partition, + // and Range should produce 4 rows([0, 1, 2, 3] and [15, 16, 17, 18]) per-partition. Since + // the Range has 2 partitions, totally Filter produces 4 rows, Range produces 8 rows. + df.queryExecution.toRdd.mapPartitions(_.take(2)).collect() + checkFilterAndRangeMetrics(df, filterNumOutputs = 4, rangeNumOutputs = 8) + + // Top-most limit will call `CollectLimitExec.executeCollect`, which will only run the first + // task, so totally the Filter produces 2 rows, and Range produces 4 rows([0, 1, 2, 3]). + df2.collect() + checkFilterAndRangeMetrics(df2, filterNumOutputs = 2, rangeNumOutputs = 4) } } } From bf0e891b93b2c634a890847902c7f8c74e3537b8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 4 Oct 2018 12:26:34 +0800 Subject: [PATCH 3/5] different idea --- .../sql/execution/basicPhysicalOperators.scala | 9 +++------ .../sql/execution/metric/SQLMetricsSuite.scala | 14 +++++++------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index a8adc9e72417..f854b692fe71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -456,14 +456,11 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val processingLoop = if (parent.needStopCheck) { // TODO (cloud-fan): do we really need to do the stop check within batch? s""" - |int $localIdx = 0; - |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) { + |for (int $localIdx = 0; $localIdx < $localEnd && !shouldStop(); $localIdx++) { | long $value = $nextIndex; | ${consume(ctx, Seq(ev))} | $nextIndex += ${step}L; |} - |$numOutput.add($localIdx); - |$inputMetrics.incRecordsRead($localIdx); """.stripMargin } else { s""" @@ -472,8 +469,6 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | ${consume(ctx, Seq(ev))} |} |$nextIndex = $batchEnd; - |$numOutput.add($localEnd); - |$inputMetrics.incRecordsRead($localEnd); """.stripMargin } @@ -526,6 +521,8 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $numElementsTodo = 0; | if ($nextBatchTodo == 0) break; | } + | $numOutput.add($nextBatchTodo); + | $inputMetrics.incRecordsRead($nextBatchTodo); | $batchEnd += $nextBatchTodo * ${step}L; | } | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index e214b04f2437..9fc5db5e4bf8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -549,24 +549,24 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared assert(range.metrics("numOutputRows").value == rangeNumOutputs) } - val df = spark.range(0, 30, 1, 2).toDF().filter('id % 3 === 0) + val df = spark.range(0, 3000, 1, 2).toDF().filter('id % 3 === 0) val df2 = df.limit(2) Seq(true, false).foreach { wholeStageEnabled => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageEnabled.toString) { df.collect() - checkFilterAndRangeMetrics(df, filterNumOutputs = 10, rangeNumOutputs = 30) + checkFilterAndRangeMetrics(df, filterNumOutputs = 1000, rangeNumOutputs = 3000) df.queryExecution.executedPlan.foreach(_.resetMetrics()) // For each partition, we get 2 rows. Then the Filter should produce 2 rows per-partition, - // and Range should produce 4 rows([0, 1, 2, 3] and [15, 16, 17, 18]) per-partition. Since - // the Range has 2 partitions, totally Filter produces 4 rows, Range produces 8 rows. + // and Range should produce 1000 rows (one batch) per-partition. Totally Filter produces + // 4 rows, and Range produces 2000 rows. df.queryExecution.toRdd.mapPartitions(_.take(2)).collect() - checkFilterAndRangeMetrics(df, filterNumOutputs = 4, rangeNumOutputs = 8) + checkFilterAndRangeMetrics(df, filterNumOutputs = 4, rangeNumOutputs = 2000) // Top-most limit will call `CollectLimitExec.executeCollect`, which will only run the first - // task, so totally the Filter produces 2 rows, and Range produces 4 rows([0, 1, 2, 3]). + // task, so totally the Filter produces 2 rows, and Range produces 1000 rows (one batch). df2.collect() - checkFilterAndRangeMetrics(df2, filterNumOutputs = 2, rangeNumOutputs = 4) + checkFilterAndRangeMetrics(df2, filterNumOutputs = 2, rangeNumOutputs = 1000) } } } From 8751000e8572cb179b41e0c7eef5f895a4debbf4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 4 Oct 2018 12:31:29 +0800 Subject: [PATCH 4/5] revert range change --- .../execution/basicPhysicalOperators.scala | 109 ++++++------------ 1 file changed, 36 insertions(+), 73 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index f854b692fe71..222a1b8bc730 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -378,7 +378,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val numOutput = metricTerm(ctx, "numOutputRows") val initTerm = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "initRange") - val nextIndex = ctx.addMutableState(CodeGenerator.JAVA_LONG, "nextIndex") + val number = ctx.addMutableState(CodeGenerator.JAVA_LONG, "number") val value = ctx.freshName("value") val ev = ExprCode.forNonNullValue(JavaCode.variable(value, LongType)) @@ -394,10 +394,10 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) // operator produces elements in batches. After a batch is complete, the metrics are updated // and a new batch is started. // In the implementation below, the code in the inner loop is producing all the values - // within a batch and updating metrics afterwards, while the code in the outer loop is setting - // batch parameters. + // within a batch, while the code in the outer loop is setting batch parameters and updating + // the metrics. - // Once nextIndex == batchEnd, it's time to progress to the next batch. + // Once number == batchEnd, it's time to progress to the next batch. val batchEnd = ctx.addMutableState(CodeGenerator.JAVA_LONG, "batchEnd") // How many values should still be generated by this range operator. @@ -421,13 +421,13 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | | $BigInt st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); | if (st.compareTo($BigInt.valueOf(Long.MAX_VALUE)) > 0) { - | $nextIndex = Long.MAX_VALUE; + | $number = Long.MAX_VALUE; | } else if (st.compareTo($BigInt.valueOf(Long.MIN_VALUE)) < 0) { - | $nextIndex = Long.MIN_VALUE; + | $number = Long.MIN_VALUE; | } else { - | $nextIndex = st.longValue(); + | $number = st.longValue(); | } - | $batchEnd = $nextIndex; + | $batchEnd = $number; | | $BigInt end = index.add($BigInt.ONE).multiply(numElement).divide(numSlice) | .multiply(step).add(start); @@ -440,7 +440,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | } | | $BigInt startToEnd = $BigInt.valueOf(partitionEnd).subtract( - | $BigInt.valueOf($nextIndex)); + | $BigInt.valueOf($number)); | $numElementsTodo = startToEnd.divide(step).longValue(); | if ($numElementsTodo < 0) { | $numElementsTodo = 0; @@ -452,57 +452,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val localIdx = ctx.freshName("localIdx") val localEnd = ctx.freshName("localEnd") - - val processingLoop = if (parent.needStopCheck) { - // TODO (cloud-fan): do we really need to do the stop check within batch? - s""" - |for (int $localIdx = 0; $localIdx < $localEnd && !shouldStop(); $localIdx++) { - | long $value = $nextIndex; - | ${consume(ctx, Seq(ev))} - | $nextIndex += ${step}L; - |} - """.stripMargin + val range = ctx.freshName("range") + val shouldStop = if (parent.needStopCheck) { + s"if (shouldStop()) { $number = $value + ${step}L; return; }" } else { - s""" - |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { - | long $value = ((long)$localIdx * ${step}L) + $nextIndex; - | ${consume(ctx, Seq(ev))} - |} - |$nextIndex = $batchEnd; - """.stripMargin + "// shouldStop check is eliminated" } - - val loopCondition = if (parent.needStopCheck) { - "!shouldStop()" - } else { - "true" - } - - // An overview of the Range processing. - // - // For each partition, the Range task needs to produce records from partition start(inclusive) - // to end(exclusive). For better performance, we separate the partition range into batches, and - // use 2 loops to produce data. The outer while loop is used to iterate batches, and the inner - // for loop is used to iterate records inside a batch. - // - // `nextIndex` tracks the index of the next record that is going to be consumed, initialized - // with partition start. `batchEnd` tracks the end index of the current batch, initialized - // with `nextIndex`. In the outer loop, we first check if `nextIndex == batchEnd`. If it's true, - // it means the current batch is fully consumed, and we will update `batchEnd` to process the - // next batch. If `batchEnd` reaches partition end, exit the outer loop. Then we enter the inner - // loop. Note that, when we enter inner loop, `nextIndex` must be different than `batchEnd`, - // otherwise the outer loop should already be exited. - // - // The inner loop iterates from 0 to `localEnd`, which is calculated by - // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by `nextBatchTodo * step` in - // the outer loop, and initialized with `nextIndex`, so `batchEnd - nextIndex` is always - // divisible by `step`. The `nextIndex` is increased by `step` during each iteration, and ends - // up being equal to `batchEnd` when the inner loop finishes. - // - // The inner loop can be interrupted, if the query has produced at least one result row, so that - // we don't buffer too many result rows and waste memory. It's ok to interrupt the inner loop, - // because `nextIndex` is updated per loop iteration and remembers how far we have processed. - s""" | // initialize Range | if (!$initTerm) { @@ -510,25 +465,33 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $initRangeFuncName(partitionIndex); | } | - | while ($loopCondition) { - | if ($nextIndex == $batchEnd) { - | long $nextBatchTodo; - | if ($numElementsTodo > ${batchSize}L) { - | $nextBatchTodo = ${batchSize}L; - | $numElementsTodo -= ${batchSize}L; - | } else { - | $nextBatchTodo = $numElementsTodo; - | $numElementsTodo = 0; - | if ($nextBatchTodo == 0) break; + | while (true) { + | long $range = $batchEnd - $number; + | if ($range != 0L) { + | int $localEnd = (int)($range / ${step}L); + | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { + | long $value = ((long)$localIdx * ${step}L) + $number; + | ${consume(ctx, Seq(ev))} + | $shouldStop | } - | $numOutput.add($nextBatchTodo); - | $inputMetrics.incRecordsRead($nextBatchTodo); - | $batchEnd += $nextBatchTodo * ${step}L; + | $number = $batchEnd; | } | - | int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L); - | $processingLoop | $taskContext.killTaskIfInterrupted(); + | + | long $nextBatchTodo; + | if ($numElementsTodo > ${batchSize}L) { + | $nextBatchTodo = ${batchSize}L; + | $numElementsTodo -= ${batchSize}L; + | } else { + | $nextBatchTodo = $numElementsTodo; + | $numElementsTodo = 0; + | if ($nextBatchTodo == 0) break; + | } + | $numOutput.add($nextBatchTodo); + | $inputMetrics.incRecordsRead($nextBatchTodo); + | + | $batchEnd += $nextBatchTodo * ${step}L; | } """.stripMargin } From 3b9b41f29e4819d63097645ed81d42b6fcde0b5d Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 4 Oct 2018 12:36:26 +0800 Subject: [PATCH 5/5] rename test --- .../org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 9fc5db5e4bf8..085a44548848 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -518,7 +518,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared testMetricsDynamicPartition("parquet", "parquet", "t1") } - test("SPARK-25602: range metrics can be wrong if the result rows are not fully consumed") { + test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") { def checkFilterAndRangeMetrics( df: DataFrame, filterNumOutputs: Int,