From cf147aadf964d01fa1f664a1d9053057ffd2c9a9 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 15 Jun 2020 06:31:44 +0900 Subject: [PATCH 1/6] [SPARK-24634][SS][FOLLOWUP] Rename the variable from "numLateInputs" to "numDropppedRowsByWatermark" --- docs/structured-streaming-programming-guide.md | 9 ++++----- .../execution/streaming/statefulOperators.scala | 12 ++++++------ .../org/apache/spark/sql/streaming/progress.scala | 4 ++-- .../sql/streaming/EventTimeWatermarkSuite.scala | 15 ++++++++------- .../sql/streaming/StateStoreMetricsTest.scala | 10 ++++++---- .../StreamingQueryStatusAndProgressSuite.scala | 8 ++++---- 6 files changed, 30 insertions(+), 28 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 53b7a3a8e46f..18fb67279f1d 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1678,12 +1678,11 @@ emits late rows if the operator uses Append mode. Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue: 1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab -2. On Streaming Query Listener: check "numLateInputs" in "stateOperators" in QueryProcessEvent. +2. On Streaming Query Listener: check "numDroppedRowsByWatermark" in "stateOperators" in QueryProcessEvent. -Please note that the definition of "input" is relative: it doesn't always mean "input rows" for the operator. -Streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs, -hence the number is not same as the number of original input rows. You'd like to check the fact whether the value is zero -or non-zero. +Please note that "numDroppedRowsByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator. +It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs, +hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero. There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 073266bd621d..52550b6d1943 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -77,8 +77,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numLateInputs" -> SQLMetrics.createMetric(sparkContext, - "number of inputs which are later than watermark ('inputs' are relative to operators)"), + "numDroppedRowsByWatermark" -> SQLMetrics.createMetric(sparkContext, + "number of rows which are dropped by watermark"), "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"), @@ -102,7 +102,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, memoryUsedBytes = longMetric("stateMemory").value, - numLateInputs = longMetric("numLateInputs").value, + numDroppedRowsByWatermark = longMetric("numDroppedRowsByWatermark").value, javaConvertedCustomMetrics ) } @@ -137,10 +137,10 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => protected def applyRemovingRowsOlderThanWatermark( iter: Iterator[InternalRow], - predicateFilterOutLateInput: BasePredicate): Iterator[InternalRow] = { + predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = { iter.filterNot { row => - val lateInput = predicateFilterOutLateInput.eval(row) - if (lateInput) longMetric("numLateInputs") += 1 + val lateInput = predicateDropRowByWatermark.eval(row) + if (lateInput) longMetric("numDroppedRowsByWatermark") += 1 lateInput } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 22bae76ef422..3265244808b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -43,7 +43,7 @@ class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, val memoryUsedBytes: Long, - val numLateInputs: Long, + val numDroppedRowsByWatermark: Long, val customMetrics: ju.Map[String, JLong] = new ju.HashMap() ) extends Serializable { @@ -63,7 +63,7 @@ class StateOperatorProgress private[sql]( ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ - ("numLateInputs" -> JInt(numLateInputs)) ~ + ("numDroppedRowsByWatermark" -> JInt(numDroppedRowsByWatermark)) ~ ("customMetrics" -> { if (!customMetrics.isEmpty) { val keys = customMetrics.keySet.asScala.toSeq.sorted diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 329196a5cfef..473223348e71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -298,11 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), - assertNumLateInputs(0), + assertnumDroppedRowsByWatermark(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertNumLateInputs(1) + assertnumDroppedRowsByWatermark(1) ) } @@ -323,15 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), - assertNumLateInputs(0), + assertnumDroppedRowsByWatermark(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), - assertNumLateInputs(1), + assertnumDroppedRowsByWatermark(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertNumLateInputs(1) + assertnumDroppedRowsByWatermark(1) ) } @@ -788,7 +788,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } - private def assertNumLateInputs(numLateInputs: Long): AssertOnQuery = AssertOnQuery { q => + private def assertnumDroppedRowsByWatermark( + numDroppedRowsByWatermark: Long): AssertOnQuery = AssertOnQuery { q => q.processAllAvailable() val progressWithData = q.recentProgress.filterNot { p => // filter out batches which are falling into one of types: @@ -796,7 +797,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // 2) empty input batch p.inputRowsPerSecond == 0 }.lastOption.get - assert(progressWithData.stateOperators(0).numLateInputs === numLateInputs) + assert(progressWithData.stateOperators(0).numDroppedRowsByWatermark === numDroppedRowsByWatermark) true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index 640f5181aa52..e28a1b77d227 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -55,8 +55,8 @@ trait StateStoreMetricsTest extends StreamTest { val allNumUpdatedRowsSinceLastCheck = progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated)) - val allNumLateInputsSinceLastCheck = - progressesSinceLastCheck.map(_.stateOperators.map(_.numLateInputs)) + val allnumDroppedRowsByWatermarkSinceLastCheck = + progressesSinceLastCheck.map(_.stateOperators.map(_.numDroppedRowsByWatermark)) lazy val debugString = "recent progresses:\n" + progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n") @@ -67,8 +67,10 @@ trait StateStoreMetricsTest extends StreamTest { val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString") - val numLateInputs = arraySum(allNumLateInputsSinceLastCheck, numStateOperators) - assert(numLateInputs === lateInputs, s"incorrect late inputs, $debugString") + val numDroppedRowsByWatermark = arraySum(allnumDroppedRowsByWatermarkSinceLastCheck, + numStateOperators) + assert(numDroppedRowsByWatermark === lateInputs, + s"incorrect dropped rows by watermark, $debugString") lastCheckedRecentProgressIndex = recentProgress.length - 1 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 79028a6c442d..6ddc376c37a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -64,7 +64,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, | "memoryUsedBytes" : 3, - | "numLateInputs" : 0, + | "numDroppedRowsByWatermark" : 0, | "customMetrics" : { | "loadedMapCacheHitCount" : 1, | "loadedMapCacheMissCount" : 0, @@ -115,7 +115,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, | "memoryUsedBytes" : 2, - | "numLateInputs" : 0 + | "numDroppedRowsByWatermark" : 0 | } ], | "sources" : [ { | "description" : "source", @@ -323,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numLateInputs = 0, + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numDroppedRowsByWatermark = 0, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) @@ -355,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputs = 0)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numDroppedRowsByWatermark = 0)), sources = Array( new SourceProgress( description = "source", From ca3b3de653a92090db33ca8282eea18b75ff2420 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 15 Jun 2020 06:54:51 +0900 Subject: [PATCH 2/6] Fix scalastyle --- .../apache/spark/sql/streaming/EventTimeWatermarkSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 473223348e71..95e8d249c5b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -797,7 +797,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // 2) empty input batch p.inputRowsPerSecond == 0 }.lastOption.get - assert(progressWithData.stateOperators(0).numDroppedRowsByWatermark === numDroppedRowsByWatermark) + assert(progressWithData.stateOperators(0).numDroppedRowsByWatermark + === numDroppedRowsByWatermark) true } From a712b4dff959f55633ae543a4a70224c218a61eb Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 15 Jun 2020 07:30:23 +0900 Subject: [PATCH 3/6] nit --- .../sql/streaming/EventTimeWatermarkSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 95e8d249c5b8..b5e2ab5ed4cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -298,11 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), - assertnumDroppedRowsByWatermark(0), + assertNumDroppedRowsByWatermark(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertnumDroppedRowsByWatermark(1) + assertNumDroppedRowsByWatermark(1) ) } @@ -323,15 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), - assertnumDroppedRowsByWatermark(0), + assertNumDroppedRowsByWatermark(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), - assertnumDroppedRowsByWatermark(1), + assertNumDroppedRowsByWatermark(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertnumDroppedRowsByWatermark(1) + assertNumDroppedRowsByWatermark(1) ) } @@ -788,7 +788,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } - private def assertnumDroppedRowsByWatermark( + private def assertNumDroppedRowsByWatermark( numDroppedRowsByWatermark: Long): AssertOnQuery = AssertOnQuery { q => q.processAllAvailable() val progressWithData = q.recentProgress.filterNot { p => From 75d12d31814cf774f35368bb487cf0f1ecea904d Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 15 Jun 2020 16:45:30 +0900 Subject: [PATCH 4/6] Rename numDroppedRowsByWatermark to numRowsDroppedByWatermark --- docs/structured-streaming-programming-guide.md | 4 ++-- .../streaming/statefulOperators.scala | 6 +++--- .../apache/spark/sql/streaming/progress.scala | 4 ++-- .../streaming/EventTimeWatermarkSuite.scala | 18 +++++++++--------- .../sql/streaming/StateStoreMetricsTest.scala | 8 ++++---- .../StreamingQueryStatusAndProgressSuite.scala | 8 ++++---- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 18fb67279f1d..a371f4f50f9f 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1678,9 +1678,9 @@ emits late rows if the operator uses Append mode. Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue: 1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab -2. On Streaming Query Listener: check "numDroppedRowsByWatermark" in "stateOperators" in QueryProcessEvent. +2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent. -Please note that "numDroppedRowsByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator. +Please note that "numRowsDroppedByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator. It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs, hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 52550b6d1943..bf606d278243 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -77,7 +77,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numDroppedRowsByWatermark" -> SQLMetrics.createMetric(sparkContext, + "numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext, "number of rows which are dropped by watermark"), "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), @@ -102,7 +102,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, memoryUsedBytes = longMetric("stateMemory").value, - numDroppedRowsByWatermark = longMetric("numDroppedRowsByWatermark").value, + numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value, javaConvertedCustomMetrics ) } @@ -140,7 +140,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = { iter.filterNot { row => val lateInput = predicateDropRowByWatermark.eval(row) - if (lateInput) longMetric("numDroppedRowsByWatermark") += 1 + if (lateInput) longMetric("numRowsDroppedByWatermark") += 1 lateInput } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3265244808b5..67c4d968511a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -43,7 +43,7 @@ class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, val memoryUsedBytes: Long, - val numDroppedRowsByWatermark: Long, + val numRowsDroppedByWatermark: Long, val customMetrics: ju.Map[String, JLong] = new ju.HashMap() ) extends Serializable { @@ -63,7 +63,7 @@ class StateOperatorProgress private[sql]( ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ - ("numDroppedRowsByWatermark" -> JInt(numDroppedRowsByWatermark)) ~ + ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~ ("customMetrics" -> { if (!customMetrics.isEmpty) { val keys = customMetrics.keySet.asScala.toSeq.sorted diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index b5e2ab5ed4cc..b5e313d2e107 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -298,11 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(0), + assertNumRowsDroppedByWatermark(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(1) + assertNumRowsDroppedByWatermark(1) ) } @@ -323,15 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(0), + assertNumRowsDroppedByWatermark(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(1), + assertNumRowsDroppedByWatermark(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), assertNumStateRows(2), - assertNumDroppedRowsByWatermark(1) + assertNumRowsDroppedByWatermark(1) ) } @@ -788,8 +788,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } - private def assertNumDroppedRowsByWatermark( - numDroppedRowsByWatermark: Long): AssertOnQuery = AssertOnQuery { q => + private def assertNumRowsDroppedByWatermark( + numRowsDroppedByWatermark: Long): AssertOnQuery = AssertOnQuery { q => q.processAllAvailable() val progressWithData = q.recentProgress.filterNot { p => // filter out batches which are falling into one of types: @@ -797,8 +797,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // 2) empty input batch p.inputRowsPerSecond == 0 }.lastOption.get - assert(progressWithData.stateOperators(0).numDroppedRowsByWatermark - === numDroppedRowsByWatermark) + assert(progressWithData.stateOperators(0).numRowsDroppedByWatermark + === numRowsDroppedByWatermark) true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index e28a1b77d227..662e2fae0d8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -55,8 +55,8 @@ trait StateStoreMetricsTest extends StreamTest { val allNumUpdatedRowsSinceLastCheck = progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated)) - val allnumDroppedRowsByWatermarkSinceLastCheck = - progressesSinceLastCheck.map(_.stateOperators.map(_.numDroppedRowsByWatermark)) + val allNumRowsDroppedByWatermarkSinceLastCheck = + progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsDroppedByWatermark)) lazy val debugString = "recent progresses:\n" + progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n") @@ -67,9 +67,9 @@ trait StateStoreMetricsTest extends StreamTest { val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString") - val numDroppedRowsByWatermark = arraySum(allnumDroppedRowsByWatermarkSinceLastCheck, + val numRowsDroppedByWatermark = arraySum(allNumRowsDroppedByWatermarkSinceLastCheck, numStateOperators) - assert(numDroppedRowsByWatermark === lateInputs, + assert(numRowsDroppedByWatermark === lateInputs, s"incorrect dropped rows by watermark, $debugString") lastCheckedRecentProgressIndex = recentProgress.length - 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 6ddc376c37a2..98e2342c78e5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -64,7 +64,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, | "memoryUsedBytes" : 3, - | "numDroppedRowsByWatermark" : 0, + | "numRowsDroppedByWatermark" : 0, | "customMetrics" : { | "loadedMapCacheHitCount" : 1, | "loadedMapCacheMissCount" : 0, @@ -115,7 +115,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, | "memoryUsedBytes" : 2, - | "numDroppedRowsByWatermark" : 0 + | "numRowsDroppedByWatermark" : 0 | } ], | "sources" : [ { | "description" : "source", @@ -323,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numDroppedRowsByWatermark = 0, + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numRowsDroppedByWatermark = 0, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) @@ -355,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numDroppedRowsByWatermark = 0)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numRowsDroppedByWatermark = 0)), sources = Array( new SourceProgress( description = "source", From 2e984f3789c845edf95ba606b8627890700c61ba Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 16 Jun 2020 05:06:52 +0900 Subject: [PATCH 5/6] Fix missing spots --- .../execution/streaming/ProgressReporter.scala | 6 +++++- .../execution/streaming/statefulOperators.scala | 6 +++--- .../org/apache/spark/sql/streaming/progress.scala | 6 +++--- .../sql/streaming/StateStoreMetricsTest.scala | 15 +++++++++------ .../streaming/StreamingDeduplicationSuite.scala | 10 +++++----- .../spark/sql/streaming/StreamingJoinSuite.scala | 10 +++++----- 6 files changed, 30 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 55491d96f9b1..2c737206dd2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -222,7 +222,11 @@ trait ProgressReporter extends Logging { lastExecution.executedPlan.collect { case p if p.isInstanceOf[StateStoreWriter] => val progress = p.asInstanceOf[StateStoreWriter].getProgress() - if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0, newNumLateInputs = 0) + if (hasExecuted) { + progress + } else { + progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index bf606d278243..74daaf80b10e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -139,9 +139,9 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => iter: Iterator[InternalRow], predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = { iter.filterNot { row => - val lateInput = predicateDropRowByWatermark.eval(row) - if (lateInput) longMetric("numRowsDroppedByWatermark") += 1 - lateInput + val shouldDrop = predicateDropRowByWatermark.eval(row) + if (shouldDrop) longMetric("numRowsDroppedByWatermark") += 1 + shouldDrop } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 67c4d968511a..482f2b4bf4ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -55,9 +55,9 @@ class StateOperatorProgress private[sql]( private[sql] def copy( newNumRowsUpdated: Long, - newNumLateInputs: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, newNumLateInputs, - customMetrics) + newNumRowsDroppedByWatermark: Long): StateOperatorProgress = + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, + newNumRowsDroppedByWatermark, customMetrics) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index 662e2fae0d8a..be83f0ee776d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -32,9 +32,9 @@ trait StateStoreMetricsTest extends StreamTest { def assertNumStateRows( total: Seq[Long], updated: Seq[Long], - lateInputs: Seq[Long]): AssertOnQuery = + droppedByWatermark: Seq[Long]): AssertOnQuery = AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" + - s", late inputs = $lateInputs") { q => + s", rows dropped by watermark = $droppedByWatermark") { q => // This assumes that the streaming query will not make any progress while the eventually // is being executed. eventually(timeout(streamingTimeout)) { @@ -69,7 +69,7 @@ trait StateStoreMetricsTest extends StreamTest { val numRowsDroppedByWatermark = arraySum(allNumRowsDroppedByWatermarkSinceLastCheck, numStateOperators) - assert(numRowsDroppedByWatermark === lateInputs, + assert(numRowsDroppedByWatermark === droppedByWatermark, s"incorrect dropped rows by watermark, $debugString") lastCheckedRecentProgressIndex = recentProgress.length - 1 @@ -79,11 +79,14 @@ trait StateStoreMetricsTest extends StreamTest { def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = { assert(total.length === updated.length) - assertNumStateRows(total, updated, lateInputs = (0 until total.length).map(_ => 0L)) + assertNumStateRows(total, updated, droppedByWatermark = (0 until total.length).map(_ => 0L)) } - def assertNumStateRows(total: Long, updated: Long, lateInput: Long = 0): AssertOnQuery = { - assertNumStateRows(Seq(total), Seq(updated), Seq(lateInput)) + def assertNumStateRows( + total: Long, + updated: Long, + droppedByWatermark: Long = 0): AssertOnQuery = { + assertNumStateRows(Seq(total), Seq(updated), Seq(droppedByWatermark)) } def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index ee0b980a9d87..fe10bbf7fad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -54,13 +54,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1), - assertNumStateRows(total = 1, updated = 1, lateInput = 0), + assertNumStateRows(total = 1, updated = 1, droppedByWatermark = 0), AddData(inputData, "a" -> 2), // Dropped CheckLastBatch(), - assertNumStateRows(total = 1, updated = 0, lateInput = 0), + assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 0), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1), - assertNumStateRows(total = 2, updated = 1, lateInput = 0) + assertNumStateRows(total = 2, updated = 1, droppedByWatermark = 0) ) } @@ -102,7 +102,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(total = 1, updated = 0, lateInput = 1), + assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 1), AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25 CheckNewAnswer(45), @@ -136,7 +136,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { AddData(inputData, 10), // Should not emit anything as data less than watermark CheckLastBatch(), - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputs = Seq(0L, 1L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), droppedByWatermark = Seq(0L, 1L)), AddData(inputData, 40), // Advance watermark to 30 seconds CheckLastBatch((15 -> 1), (25 -> 1)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 1f6d0a994568..833fc24e8b66 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -166,7 +166,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(input1, 5), CheckNewAnswer(), // Same reason as above - assertNumStateRows(total = 2, updated = 0, lateInput = 1) + assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1) ) } @@ -219,12 +219,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state AddData(rightInput, (1, 20), (1, 21), (1, 28)), CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)), - assertNumStateRows(total = 5, updated = 1, lateInput = 1), + assertNumStateRows(total = 5, updated = 1, droppedByWatermark = 1), // New data to left input with leftTime <= 20 should be filtered due to event time watermark AddData(leftInput, (1, 20), (1, 21)), CheckNewAnswer((1, 21, 28)), - assertNumStateRows(total = 6, updated = 1, lateInput = 1) + assertNumStateRows(total = 6, updated = 1, droppedByWatermark = 1) ) } @@ -293,7 +293,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)), - assertNumStateRows(total = 11, updated = 1, lateInput = 1), // only 31 added + assertNumStateRows(total = 11, updated = 1, droppedByWatermark = 1), // only 31 added // Advance the watermark AddData(rightInput, (1, 80)), @@ -307,7 +307,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state CheckNewAnswer((1, 49, 50), (1, 50, 50)), - assertNumStateRows(total = 7, updated = 1, lateInput = 1) // 50 added + assertNumStateRows(total = 7, updated = 1, droppedByWatermark = 1) // 50 added ) } From c496eb5d62187cae5b8d41b57c89f47ad79c7c7c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 16 Jun 2020 05:29:06 +0900 Subject: [PATCH 6/6] Fix style --- .../spark/sql/streaming/StreamingDeduplicationSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index fe10bbf7fad5..f6f150e7bbbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -136,7 +136,8 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { AddData(inputData, 10), // Should not emit anything as data less than watermark CheckLastBatch(), - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), droppedByWatermark = Seq(0L, 1L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), + droppedByWatermark = Seq(0L, 1L)), AddData(inputData, 40), // Advance watermark to 30 seconds CheckLastBatch((15 -> 1), (25 -> 1)),