Skip to content

Commit 7437322

Browse files
viiryaGitHub Enterprise
authored andcommitted
[SPARK-41442][SQL] Only update SQLMetric value if merging with valid metric (apache#1643)
### What changes were proposed in this pull request? This patch updates how `SQLMetric` merges two invalid instances where the value is both -1. ### Why are the changes needed? We use -1 as initial value of `SQLMetric`, and change it to 0 while merging with other `SQLMetric` instances. A `SQLMetric` will be treated as invalid and filtered out later. While we are developing with Spark, it is trouble behavior that two invalid `SQLMetric` instances merge to a valid `SQLMetric` because merging will set the value to 0. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes apache#38969 from viirya/minor_sql_metrics. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c0bddf6 commit 7437322

File tree

3 files changed

+17
-4
lines changed

3 files changed

+17
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
5555

5656
override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
5757
case o: SQLMetric =>
58-
if (_value < 0) _value = 0
59-
if (o.value > 0) _value += o.value
58+
if (o.value > 0) {
59+
if (_value < 0) _value = 0
60+
_value += o.value
61+
}
6062
case _ => throw QueryExecutionErrors.cannotMergeClassWithOtherClassError(
6163
this.getClass.getName, other.getClass.getName)
6264
}

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1929,8 +1929,15 @@ class AdaptiveQueryExecSuite
19291929
assert(aqeReads.length == 2)
19301930
aqeReads.foreach { c =>
19311931
val stats = c.child.asInstanceOf[QueryStageExec].getRuntimeStatistics
1932-
assert(stats.sizeInBytes >= 0)
1932+
val rowCount = stats.rowCount.get
19331933
assert(stats.rowCount.get >= 0)
1934+
if (rowCount == 0) {
1935+
// For empty relation, the query stage doesn't serialize any bytes.
1936+
// The SQLMetric keeps initial value.
1937+
assert(stats.sizeInBytes == -1)
1938+
} else {
1939+
assert(stats.sizeInBytes > 0)
1940+
}
19341941
}
19351942
}
19361943
}

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,11 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
774774

775775
testMetricsInSparkPlanOperator(exchanges.head,
776776
Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
777-
testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0))
777+
// `testData2.filter($"b" === 0)` is an empty relation.
778+
// The exchange doesn't serialize any bytes.
779+
// The SQLMetric keeps initial value.
780+
testMetricsInSparkPlanOperator(exchanges(1),
781+
Map("dataSize" -> -1, "shuffleRecordsWritten" -> 0))
778782
}
779783

780784
test("Add numRows to metric of BroadcastExchangeExec") {

0 commit comments

Comments
 (0)