From 25b2769bcafad12e3459af74442f1859335ad875 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 19 Dec 2019 18:20:41 +0900 Subject: [PATCH] [SPARK-29450][SS] Measure the number of output rows for streaming aggregation with append mode ### What changes were proposed in this pull request? This patch addresses missing metric, the number of output rows for streaming aggregation with append mode. Other modes are correctly measuring it. ### Why are the changes needed? Without the patch, the value for such metric is always 0. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test added. Also manually tested with below query: > query ``` import spark.implicits._ spark.conf.set("spark.sql.shuffle.partitions", "5") val df = spark.readStream .format("rate") .option("rowsPerSecond", 1000) .load() .withWatermark("timestamp", "5 seconds") .selectExpr("timestamp", "mod(value, 100) as mod", "value") .groupBy(window($"timestamp", "10 seconds"), $"mod") .agg(max("value").as("max_value"), min("value").as("min_value"), avg("value").as("avg_value")) val query = df .writeStream .format("memory") .option("queryName", "test") .outputMode("append") .start() query.awaitTermination() ``` > before the patch ![screenshot-before-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023217-58d7bc80-0a01-11ea-8cac-40f1cced6d16.png) > after the patch ![screenshot-after-SPARK-29450](https://user-images.githubusercontent.com/1317309/69023221-5c6b4380-0a01-11ea-8a66-7bf1b7d09fc7.png) Closes #26104 from HeartSaVioR/SPARK-29450. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: HyukjinKwon --- .../streaming/statefulOperators.scala | 1 + .../streaming/StreamingAggregationSuite.scala | 65 ++++++++++++++++++- 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index c11af345b0248..41cb928c9e0a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -353,6 +353,7 @@ case class StateStoreSaveExec( finished = true null } else { + numOutputRows += 1 removedValueRow } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 1ae6ff3a90989..2c2ce23cc6217 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.streaming import java.io.File import java.util.{Locale, TimeZone} +import scala.collection.mutable + import org.apache.commons.io.FileUtils import org.scalatest.{Assertions, BeforeAndAfterAll} @@ -189,7 +191,68 @@ class StreamingAggregationSuite extends StateStoreMetricsTest ) } - testWithAllStateVersions("state metrics") { + testWithAllStateVersions("state metrics - append mode") { + val inputData = MemoryStream[Int] + val aggWithWatermark = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + implicit class RichStreamExecution(query: StreamExecution) { + // this could be either empty row batch or actual batch + def stateNodes: Seq[SparkPlan] = { + query.lastExecution.executedPlan.collect { + case p if p.isInstanceOf[StateStoreSaveExec] => p + } + } + + def stateOperatorProgresses: Seq[StateOperatorProgress] = { + val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]() + var progress = query.recentProgress.last + + operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) } + if (progress.numInputRows == 0) { + // empty batch, merge metrics from previous batch as well + progress = query.recentProgress.takeRight(2).head + operatorProgress.zipWithIndex.foreach { case (sop, index) => + // "numRowsUpdated" should be merged, as it could be updated in both batches. + // (for now it is only updated from previous batch, but things can be changed.) + // other metrics represent current status of state so picking up the latest values. + val newOperatorProgress = sop.copy( + sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated) + operatorProgress(index) = newOperatorProgress + } + } + + operatorProgress + } + } + + testStream(aggWithWatermark)( + AddData(inputData, 15), + CheckAnswer(), // watermark = 5 + AssertOnQuery { _.stateNodes.size === 1 }, + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, + AddData(inputData, 10, 12, 14), + CheckAnswer(), // watermark = 5 + AssertOnQuery { _.stateNodes.size === 1 }, + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }, + AddData(inputData, 25), + CheckAnswer((10, 3)), // watermark = 15 + AssertOnQuery { _.stateNodes.size === 1 }, + AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, + AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 } + ) + } + + testWithAllStateVersions("state metrics - update/complete mode") { val inputData = MemoryStream[Int] val aggregated =