diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index a246b47fe655..f0ab9568683f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -96,9 +96,12 @@ class SQLMetric( def +=(v: Long): Unit = add(v) - // _value may be uninitialized, in many cases being -1. We should not expose it to the user - // and instead return 0. - override def value: Long = if (isZero) 0 else _value + // We use -1 as initial value of the SIZE and TIMIMG accumulators (0 is a valid metric value). + // We need to return it as it is so that the SQL UI can filter out the invalid accumulator + // values in `SQLMetrics.stringValue` when calculating min, max, etc. + // However, users accessing the values in the physical plan programmatically still gets -1. They + // may use `SQLMetric.isZero` before consuming this value. + override def value: Long = _value // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 2c24cc7d570b..fec2e73500a3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1746,7 +1746,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat val allFilesNum = scan1.metrics("numFiles").value val allFilesSize = scan1.metrics("filesSize").value assert(scan1.metrics("numPartitions").value === numPartitions) - assert(scan1.metrics("pruningTime").value === 0) + assert(scan1.metrics("pruningTime").value === -1) // No dynamic partition pruning, so no static metrics // Only files from fid = 5 partition are scanned @@ -1760,7 +1760,7 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat assert(0 < partFilesNum && partFilesNum < allFilesNum) assert(0 < partFilesSize && partFilesSize < allFilesSize) assert(scan2.metrics("numPartitions").value === 1) - assert(scan2.metrics("pruningTime").value === 0) + assert(scan2.metrics("pruningTime").value === -1) // Dynamic partition pruning is used // Static metrics are as-if reading the whole fact table diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 93df399731d4..84c2a4763449 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2291,8 +2291,16 @@ class AdaptiveQueryExecSuite assert(aqeReads.length == 2) aqeReads.foreach { c => val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics - assert(stats.sizeInBytes >= 0) - assert(stats.rowCount.get >= 0) + val rowCount = stats.rowCount.get + val sizeInBytes = stats.sizeInBytes + assert(rowCount >= 0) + if (rowCount == 0) { + // For empty relation, the query stage doesn't serialize any bytes. + // The SQLMetric keeps initial value. + assert(sizeInBytes == -1) + } else { + assert(sizeInBytes > 0) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 45c775e6c463..e832e78ec3a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -815,7 +815,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils testMetricsInSparkPlanOperator(exchanges.head, Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100)) - testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0)) + // `testData2.filter($"b" === 0)` is an empty relation. + // The exchange doesn't serialize any bytes. + // The SQLMetric keeps initial value. + testMetricsInSparkPlanOperator(exchanges(1), + Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0)) } test("Add numRows to metric of BroadcastExchangeExec") { @@ -935,21 +939,46 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils assert(windowGroupLimit.get.metrics("numOutputRows").value == 2L) } + test("SPARK-49038: Correctly filter out invalid accumulator metrics") { + val metric1 = SQLMetrics.createTimingMetric(sparkContext, name = "m") + val metric2 = SQLMetrics.createTimingMetric(sparkContext, name = "m") + val metric3 = SQLMetrics.createTimingMetric(sparkContext, name = "m") + val metric4 = SQLMetrics.createTimingMetric(sparkContext, name = "m") + val metric5 = SQLMetrics.createTimingMetric(sparkContext, name = "m") + val metric6 = SQLMetrics.createTimingMetric(sparkContext, name = "m") + val metric7 = SQLMetrics.createTimingMetric(sparkContext, name = "m") + + metric3.add(0) + metric4.add(2) + metric5.add(4) + metric6.add(5) + metric7.add(10) + + val metricTypeTiming = "timing" + val values = Array(metric1.value, metric2.value, + metric3.value, metric4.value, metric5.value, metric6.value, metric7.value) + val maxMetrics = Array(metric7.value, 2, 0, 4) // maxValue stageId attemptId taskId + val expectedOutput = + "total (min, med, max (stageId: taskId))\n21 ms (0 ms, 4 ms, 10 ms (stage 2.0: task 4))" + + assert(SQLMetrics.stringValue(metricTypeTiming, values, maxMetrics) === expectedOutput) + } + test("Creating metrics with initial values") { - assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").value === 0) - assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).value === 0) + assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").value === -1) + assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).value === -1) assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").isZero) assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).isZero) - assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").value === 0) - assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).value === 0) + assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").value === -1) + assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).value === -1) assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").isZero) assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).isZero) - assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").value === 0) - assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).value === 0) + assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").value === -1) + assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).value === -1) assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").isZero) assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isZero)