diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1776d23607f7..d834cc536713 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1674,6 +1674,11 @@ Any of the stateful operation(s) after any of below stateful operations can have As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function emits late rows if the operator uses Append mode. +Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue: + +1. On Spark UI: check the metrics in "CountLateRows" node in query execution details page in SQL tab +2. On Streaming Query Listener: check "numLateInputRows" in "stateOperators" in QueryProcessEvent + There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index f28ae5676a72..a582acdcd230 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -38,7 +38,11 @@ object MimaExcludes { lazy val v31excludes = v30excludes ++ Seq( // [SPARK-31077] Remove ChiSqSelector dependency on mllib.ChiSqSelectorModel // private constructor - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.ChiSqSelectorModel.this") + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.ChiSqSelectorModel.this"), + + // [SPARK-24634][SQL] Add a new metric regarding number of rows later than watermark + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StateOperatorProgress.$default$4"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StateOperatorProgress.$default$4") ) // Exclude rules for 3.0.x diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bd2684d92a1d..58f51d2584c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -429,7 +429,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { aggregateExpressions.map(expr => expr.asInstanceOf[AggregateExpression]), rewrittenResultExpressions, stateVersion, - planLater(child)) + CountLateRowsExec(None, planLater(child))) case _ => Nil } @@ -441,7 +441,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object StreamingDeduplicationStrategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case Deduplicate(keys, child) if child.isStreaming => - StreamingDeduplicateExec(keys, planLater(child)) :: Nil + StreamingDeduplicateExec(keys, CountLateRowsExec(None, planLater(child))) :: Nil case _ => Nil } @@ -492,7 +492,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val stateVersion = conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) new StreamingSymmetricHashJoinExec(leftKeys, rightKeys, joinType, condition, - stateVersion, planLater(left), planLater(right)) :: Nil + stateVersion, CountLateRowsExec(None, planLater(left)), + CountLateRowsExec(None, planLater(right))) :: Nil case Join(left, right, _, _, _) if left.isStreaming && right.isStreaming => throw new AnalysisException( @@ -625,10 +626,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case FlatMapGroupsWithState( func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, stateEnc, outputMode, _, timeout, child) => + val stateVersion = conf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION) val execPlan = FlatMapGroupsWithStateExec( func, keyDeser, valueDeser, groupAttr, dataAttr, outputAttr, None, stateEnc, stateVersion, - outputMode, timeout, batchTimestampMs = None, eventTimeWatermark = None, planLater(child)) + outputMode, timeout, batchTimestampMs = None, eventTimeWatermark = None, + CountLateRowsExec(None, planLater(child))) + execPlan :: Nil case _ => Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 09ae7692ec51..7d293680ad29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -132,6 +132,9 @@ class IncrementalExecution( } override def apply(plan: SparkPlan): SparkPlan = plan transform { + case m: CountLateRowsExec => + m.copy(eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + case StateStoreSaveExec(keys, None, None, None, stateFormatVersion, UnaryExecNode(agg, StateStoreRestoreExec(_, None, _, child))) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 0dff1c2fe576..a70146c89669 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -222,7 +222,7 @@ trait ProgressReporter extends Logging { lastExecution.executedPlan.collect { case p if p.isInstanceOf[StateStoreWriter] => val progress = p.asInstanceOf[StateStoreWriter].getProgress() - if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0) + if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0, newNumLateInputRows = 0) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1bec924ba219..5709665f17db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ @@ -37,6 +37,84 @@ import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress} import org.apache.spark.sql.types._ import org.apache.spark.util.{CompletionIterator, NextIterator, Utils} +case class CountLateRowsExec( + eventTimeWatermark: Option[Long] = None, + child: SparkPlan) + extends UnaryExecNode with WatermarkSupport with CodegenSupport { + + // No need to determine key expressions here. + override def keyExpressions: Seq[Attribute] = Seq.empty + + override def output: Seq[Attribute] = child.output + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override lazy val metrics = Map( + "numLateRows" -> SQLMetrics.createMetric(sparkContext, + "number of input rows later than watermark plus allowed delay")) + + override protected def doExecute(): RDD[InternalRow] = { + val numLateRows = longMetric("numLateRows") + child.execute().mapPartitionsWithIndex { (_, iter) => + watermarkPredicateForData match { + case Some(pred) => + iter.map { row => + val r = pred.eval(row) + if (r) numLateRows += 1 + row + } + + case None => iter + } + } + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].inputRDDs() + } + + override protected def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { + val numLateRows = metricTerm(ctx, "numLateRows") + + val generated = watermarkExpression match { + case Some(expr) => + val bound = BindReferences.bindReference(expr, child.output) + val evaluated = evaluateRequiredVariables(child.output, input, expr.references) + + // Generate the code for the predicate. + val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx) + val nullCheck = if (bound.nullable) { + s"${ev.isNull} || " + } else { + s"" + } + + s""" + |$evaluated + |${ev.code} + |if (${nullCheck}${ev.value}) { + | $numLateRows.add(1); + |} + """.stripMargin + + case None => "" + } + + // Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;" + s""" + |do { + | $generated + | ${consume(ctx, input)} + |} while(false); + """.stripMargin + } +} /** Used to identify the state store for a given operator. */ case class StatefulOperatorStateInfo( @@ -96,11 +174,16 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = new java.util.HashMap(customMetrics.mapValues(long2Long).asJava) + val numLateInputRows = self.children.flatMap(_.collectFirst { + case d: CountLateRowsExec => d + }).map(_.metrics("numLateRows").value).sum + new StateOperatorProgress( numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, + numLateInputRows = numLateInputRows, memoryUsedBytes = longMetric("stateMemory").value, - javaConvertedCustomMetrics + customMetrics = javaConvertedCustomMetrics ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 13b506b60a12..7cba601f9c91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -42,6 +42,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, + val numLateInputRows: Long, val memoryUsedBytes: Long, val customMetrics: ju.Map[String, JLong] = new ju.HashMap() ) extends Serializable { @@ -52,12 +53,14 @@ class StateOperatorProgress private[sql]( /** The pretty (i.e. indented) JSON representation of this progress. */ def prettyJson: String = pretty(render(jsonValue)) - private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, customMetrics) + private[sql] def copy(newNumRowsUpdated: Long, newNumLateInputRows: Long): StateOperatorProgress = + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, newNumLateInputRows, memoryUsedBytes, + customMetrics) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("numLateInputRows" -> JInt(numLateInputRows)) ~ ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ ("customMetrics" -> { if (!customMetrics.isEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index f29a6c7f7707..6882c51b627b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -298,9 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), + assertNumLateRows(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateRows(1) ) } @@ -321,12 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), + assertNumLateRows(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), + assertNumLateRows(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateRows(1) ) } @@ -377,8 +382,10 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche testStream(df)( AddData(inputData, 10, 11, 12, 13, 14, 15), CheckAnswer(), + assertNumLateRows(0), AddData(inputData, 25), // Advance watermark to 15 seconds CheckAnswer((10, 5)), + assertNumLateRows(0), StopStream, AssertOnQuery { q => // purge commit and clear the sink val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) @@ -389,12 +396,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche StartStream(), AddData(inputData, 10, 27, 30), // Advance watermark to 20 seconds, 10 should be ignored CheckAnswer((15, 1)), + assertNumLateRows(1), StopStream, StartStream(), AddData(inputData, 17), // Watermark should still be 20 seconds, 17 should be ignored CheckAnswer((15, 1)), + assertNumLateRows(1), AddData(inputData, 40), // Advance watermark to 30 seconds, emit first data 25 - CheckNewAnswer((25, 2)) + CheckNewAnswer((25, 2)), + assertNumLateRows(0) ) } @@ -486,18 +496,24 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) - // No eviction when asked to compute complete results. + // No state eviction when asked to compute complete results. + // It still counts late input rows, though. testStream(windowedAggregation, OutputMode.Complete)( AddData(inputData, 10, 11, 12), CheckAnswer((10, 3)), + assertNumLateRows(0), AddData(inputData, 25), CheckAnswer((10, 3), (25, 1)), + assertNumLateRows(0), AddData(inputData, 25), CheckAnswer((10, 3), (25, 2)), + assertNumLateRows(0), AddData(inputData, 10), CheckAnswer((10, 4), (25, 2)), + assertNumLateRows(1), AddData(inputData, 25), - CheckAnswer((10, 4), (25, 3)) + CheckAnswer((10, 4), (25, 3)), + assertNumLateRows(0) ) } @@ -756,6 +772,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } + private def assertNumLateRows(numLateRows: Long): AssertOnQuery = AssertOnQuery { q => + q.processAllAvailable() + val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get + assert(progressWithData.stateOperators(0).numLateInputRows === numLateRows) + true + } + /** Assert event stats generated on that last batch with data in it */ private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { Execute("AssertEventStats") { q => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index b04f8b0d4d17..6a02f23de7f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -626,20 +626,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { testStream(result, Update)( AddData(inputData, "a"), CheckNewAnswer(("a", "1")), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a", "b"), CheckNewAnswer(("a", "2"), ("b", "1")), - assertNumStateRows(total = 2, updated = 2), + assertNumStateRows(total = 2, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "b"), // should remove state for "a" and not return anything for a CheckNewAnswer(("b", "2")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1 and CheckNewAnswer(("a", "1"), ("c", "1")), - assertNumStateRows(total = 3, updated = 2) + assertNumStateRows(total = 3, updated = 2, lateInput = 0) ) } @@ -768,17 +768,17 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { AddData(inputData, "a"), AdvanceManualClock(1 * 1000), CheckNewAnswer(("a", "1")), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "b"), AdvanceManualClock(1 * 1000), CheckNewAnswer(("b", "1")), - assertNumStateRows(total = 2, updated = 1), + assertNumStateRows(total = 2, updated = 1, lateInput = 0), AddData(inputData, "b"), AdvanceManualClock(10 * 1000), CheckNewAnswer(("a", "-1"), ("b", "2")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), StopStream, StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock), @@ -786,7 +786,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { AddData(inputData, "c"), AdvanceManualClock(11 * 1000), CheckNewAnswer(("b", "-1"), ("c", "1")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), AdvanceManualClock(12 * 1000), AssertOnQuery { _ => clock.getTimeMillis() == 35000 }, @@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { } }, CheckNewAnswer(("c", "-1")), - assertNumStateRows(total = 0, updated = 1) + assertNumStateRows(total = 0, updated = 1, lateInput = 0) ) } @@ -978,20 +978,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { testStream(result, Update)( AddData(inputData, "a"), CheckNewAnswer(("a", "1")), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a", "b"), CheckNewAnswer(("a", "2"), ("b", "1")), - assertNumStateRows(total = 2, updated = 2), + assertNumStateRows(total = 2, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "b"), // should remove state for "a" and return count as -1 CheckNewAnswer(("a", "-1"), ("b", "2")), - assertNumStateRows(total = 1, updated = 2), + assertNumStateRows(total = 1, updated = 2, lateInput = 0), StopStream, StartStream(), AddData(inputData, "a", "c"), // should recreate state for "a" and return count as 1 CheckNewAnswer(("a", "1"), ("c", "1")), - assertNumStateRows(total = 3, updated = 2) + assertNumStateRows(total = 3, updated = 2, lateInput = 0) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index fb5d13d09fb0..fe87c7623f5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -29,8 +29,12 @@ trait StateStoreMetricsTest extends StreamTest { lastCheckedRecentProgressIndex = -1 } - def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = - AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q => + def assertNumStateRows( + total: Seq[Long], + updated: Seq[Long], + lateInputRows: Seq[Long]): AssertOnQuery = + AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" + + s", late input rows = $lateInputRows") { q => // This assumes that the streaming query will not make any progress while the eventually // is being executed. eventually(timeout(streamingTimeout)) { @@ -60,13 +64,17 @@ trait StateStoreMetricsTest extends StreamTest { val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString") + val numLateInputRows = recentProgress.filter(_.numInputRows > 0).last + .stateOperators.map(_.numLateInputRows) + assert(numLateInputRows === lateInputRows, s"incorrect late input rows, $debugString") + lastCheckedRecentProgressIndex = recentProgress.length - 1 } true } - def assertNumStateRows(total: Long, updated: Long): AssertOnQuery = - assertNumStateRows(Seq(total), Seq(updated)) + def assertNumStateRows(total: Long, updated: Long, lateInput: Long): AssertOnQuery = + assertNumStateRows(Seq(total), Seq(updated), Seq(lateInput)) def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = { if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 85e1b85b84d2..5f4d0755b7b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -502,10 +502,12 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { expectShuffling: Boolean, expectedPartition: Int): Boolean = { val executedPlan = se.lastExecution.executedPlan - val restore = executedPlan - .collect { case ss: StateStoreRestoreExec => ss } - .head - restore.child match { + val nodeToCheck = executedPlan + .collect { + case StateStoreRestoreExec(_, _, _, s: CountLateRowsExec) => s.child + case ss: StateStoreRestoreExec => ss.child + }.head + nodeToCheck match { case node: UnaryExecNode => assert(node.outputPartitioning.numPartitions === expectedPartition, "Didn't get the expected number of partitions.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index f63778aef5a7..54025e754cbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -37,13 +37,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a"), CheckLastBatch("a"), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a"), CheckLastBatch(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 0), AddData(inputData, "b"), CheckLastBatch("b"), - assertNumStateRows(total = 2, updated = 1) + assertNumStateRows(total = 2, updated = 1, lateInput = 0) ) } @@ -54,13 +54,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, "a" -> 2), // Dropped CheckLastBatch(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 0), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1), - assertNumStateRows(total = 2, updated = 1) + assertNumStateRows(total = 2, updated = 1, lateInput = 0) ) } @@ -71,15 +71,15 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates` CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -94,19 +94,19 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Append)( AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*), CheckAnswer(10 to 15: _*), - assertNumStateRows(total = 6, updated = 6), + assertNumStateRows(total = 6, updated = 6, lateInput = 0), AddData(inputData, 25), // Advance watermark to 15 secs, no-data-batch drops rows <= 15 CheckNewAnswer(25), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInput = 1), AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25 CheckNewAnswer(45), - assertNumStateRows(total = 1, updated = 1) + assertNumStateRows(total = 1, updated = 1, lateInput = 0) ) } @@ -126,23 +126,23 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { CheckLastBatch(), // states in aggregate in [10, 14), [15, 20) (2 windows) // states in deduplicate is 10 to 15 - assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)), + assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L), lateInputRows = Seq(0L, 0L)), AddData(inputData, 25), // Advance watermark to 15 seconds CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate, emitted with no-data-batch // states in aggregate in [15, 20) and [25, 30); no-data-batch removed [10, 14) // states in deduplicate is 25, no-data-batch removed 10 to 14 - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckLastBatch(), - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 1L)), AddData(inputData, 40), // Advance watermark to 30 seconds CheckLastBatch((15 -> 1), (25 -> 1)), // states in aggregate is [40, 45); no-data-batch removed [15, 20) and [25, 30) // states in deduplicate is 40; no-data-batch removed 25 - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -158,16 +158,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Update)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 1), // Dropped CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -183,16 +183,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { testStream(result, Complete)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 1), // Dropped CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("a" -> 3L, "b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -273,13 +273,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { StartStream(additionalConfs = Map(flagKey -> flag.toString)), AddData(inputData, 10, 11, 12, 13, 14, 15), CheckAnswer(10, 11, 12, 13, 14, 15), - assertNumStateRows(total = 6, updated = 6), + assertNumStateRows(total = 6, updated = 6, lateInput = 0), AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer(25), { // State should have been cleaned if flag is set, otherwise should not have been cleaned - if (flag) assertNumStateRows(total = 1, updated = 1) - else assertNumStateRows(total = 7, updated = 1) + if (flag) assertNumStateRows(total = 1, updated = 1, lateInput = 0) + else assertNumStateRows(total = 7, updated = 1, lateInput = 0) }, AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index 3f218c9cb7fd..2949e8cf0958 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -142,31 +142,32 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with testStream(joined)( AddData(input1, 1), CheckAnswer(), - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(input2, 1), CheckAnswer((1, 10, 2, 3)), - assertNumStateRows(total = 2, updated = 1), + assertNumStateRows(total = 2, updated = 1, lateInput = 0), StopStream, StartStream(), AddData(input1, 25), CheckNewAnswer(), // watermark = 15, no-data-batch should remove 2 rows having window=[0,10] - assertNumStateRows(total = 1, updated = 1), + assertNumStateRows(total = 1, updated = 1, lateInput = 0), AddData(input2, 25), CheckNewAnswer((25, 30, 50, 75)), - assertNumStateRows(total = 2, updated = 1), + assertNumStateRows(total = 2, updated = 1, lateInput = 0), StopStream, StartStream(), AddData(input2, 1), CheckNewAnswer(), // Should not join as < 15 removed - assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15 - + // row not add as 1 < state key watermark = 15 + // note that input row is not discarded as right side doesn't have watermark + assertNumStateRows(total = 2, updated = 0, lateInput = 0), AddData(input1, 5), CheckNewAnswer(), // Same reason as above - assertNumStateRows(total = 2, updated = 0) + assertNumStateRows(total = 2, updated = 0, lateInput = 1) ) } @@ -195,12 +196,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with CheckNewAnswer((1, 5, 11)), AddData(rightInput, (1, 10)), CheckNewAnswer(), // no match as leftTime 5 is not < rightTime 10 - 5 - assertNumStateRows(total = 3, updated = 3), + assertNumStateRows(total = 3, updated = 3, lateInput = 0), // Increase event time watermark to 20s by adding data with time = 30s on both inputs AddData(leftInput, (1, 3), (1, 30)), CheckNewAnswer((1, 3, 10), (1, 3, 11)), - assertNumStateRows(total = 5, updated = 2), + assertNumStateRows(total = 5, updated = 2, lateInput = 0), AddData(rightInput, (0, 30)), CheckNewAnswer(), @@ -208,7 +209,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // so left side going to only receive data where leftTime > 20 // right side state constraint: 20 < leftTime < rightTime - 5 ==> rightTime > 25 // right state where rightTime <= 25 will be cleared, (1, 11) and (1, 10) removed - assertNumStateRows(total = 4, updated = 1), + assertNumStateRows(total = 4, updated = 1, lateInput = 0), // New data to right input should match with left side (1, 3) and (1, 5), as left state should // not be cleared. But rows rightTime <= 20 should be filtered due to event time watermark and @@ -219,12 +220,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state AddData(rightInput, (1, 20), (1, 21), (1, 28)), CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)), - assertNumStateRows(total = 5, updated = 1), + assertNumStateRows(total = 5, updated = 1, lateInput = 1), // New data to left input with leftTime <= 20 should be filtered due to event time watermark AddData(leftInput, (1, 20), (1, 21)), CheckNewAnswer((1, 21, 28)), - assertNumStateRows(total = 6, updated = 1) + assertNumStateRows(total = 6, updated = 1, lateInput = 1) ) } @@ -276,7 +277,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with CheckAnswer(), AddData(rightInput, (1, 14), (1, 15), (1, 25), (1, 26), (1, 30), (1, 31)), CheckNewAnswer((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)), - assertNumStateRows(total = 7, updated = 7), + assertNumStateRows(total = 7, updated = 7, lateInput = 0), // If rightTime = 60, then it matches only leftTime = [50, 65] AddData(rightInput, (1, 60)), @@ -289,11 +290,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // Should drop < 20 from left, i.e., none // Right state value watermark = 30 - 5 = slightly less than 25 (since condition has <=) // Should drop < 25 from the right, i.e., 14 and 15 - assertNumStateRows(total = 10, updated = 5), // 12 - 2 removed + assertNumStateRows(total = 10, updated = 5, lateInput = 0), // 12 - 2 removed AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)), - assertNumStateRows(total = 11, updated = 1), // only 31 added + assertNumStateRows(total = 11, updated = 1, lateInput = 1), // only 31 added // Advance the watermark AddData(rightInput, (1, 80)), @@ -303,11 +304,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // Should drop < 36 from left, i.e., 20, 31 (30 was not added) // Right state value watermark = 46 - 5 = slightly less than 41 (since condition has <=) // Should drop < 41 from the right, i.e., 25, 26, 30, 31 - assertNumStateRows(total = 6, updated = 1), // 12 - 6 removed + assertNumStateRows(total = 6, updated = 1, lateInput = 0), // 12 - 6 removed AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state CheckNewAnswer((1, 49, 50), (1, 50, 50)), - assertNumStateRows(total = 7, updated = 1) // 50 added + assertNumStateRows(total = 7, updated = 1, lateInput = 1) // 50 added ) } @@ -467,7 +468,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(inputStream, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), // batch 2: same result as above test CheckNewAnswer((6, 6L, 6, 6L), (8, 8L, 8, 8L), (10, 10L, 10, 10L)), - assertNumStateRows(11, 6), + assertNumStateRows(11, 6, 0), Execute { query => // Verify state format = 1 val f = query.lastExecution.executedPlan.collect { @@ -539,7 +540,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // The left rows with leftValue <= 4 should generate their outer join row now and // not get added to the state. CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)), - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // We shouldn't get more outer join rows when the watermark advances. MultiAddData(leftInput, 20)(rightInput, 21), CheckNewAnswer(), @@ -567,7 +568,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3), // The right rows with rightValue <= 7 should never be added to the state. CheckNewAnswer(Row(3, 10, 6, "9")), // rightValue = 9 > 7 hence joined and added to state - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // When the watermark advances, we get the outer join rows just as we would if they // were added but didn't match the full join condition. MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch computes nulls @@ -596,7 +597,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5), // The left rows with leftValue <= 4 should never be added to the state. CheckNewAnswer(Row(3, 10, 6, "9")), // leftValue = 7 > 4 hence joined and added to state - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // When the watermark advances, we get the outer join rows just as we would if they // were added but didn't match the full join condition. MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch computes nulls @@ -626,7 +627,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // The right rows with rightValue <= 7 should generate their outer join row now and // not get added to the state. CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, null, "3"), Row(2, 10, null, "6")), - assertNumStateRows(total = 4, updated = 4), + assertNumStateRows(total = 4, updated = 4, lateInput = 0), // We shouldn't get more outer join rows when the watermark advances. MultiAddData(leftInput, 20)(rightInput, 21), CheckNewAnswer(), @@ -645,11 +646,11 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)), - assertNumStateRows(total = 2, updated = 12), + assertNumStateRows(total = 2, updated = 12, lateInput = 0), AddData(leftInput, 22), CheckNewAnswer(Row(22, 30, 44, 66)), - assertNumStateRows(total = 3, updated = 1) + assertNumStateRows(total = 3, updated = 1, lateInput = 0) ) } @@ -663,11 +664,11 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls CheckNewAnswer(Row(6, 10, null, 18), Row(7, 10, null, 21)), - assertNumStateRows(total = 2, updated = 12), + assertNumStateRows(total = 2, updated = 12, lateInput = 0), AddData(leftInput, 22), CheckNewAnswer(Row(22, 30, 44, 66)), - assertNumStateRows(total = 3, updated = 1) + assertNumStateRows(total = 3, updated = 1, lateInput = 0) ) } @@ -703,15 +704,15 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with CheckNewAnswer((1, 1, 5, 10)), AddData(rightInput, (1, 11)), CheckNewAnswer(), // no match as left time is too low - assertNumStateRows(total = 5, updated = 5), + assertNumStateRows(total = 5, updated = 5, lateInput = 0), // Increase event time watermark to 20s by adding data with time = 30s on both inputs AddData(leftInput, (1, 7), (1, 30)), CheckNewAnswer((1, 1, 7, 10), (1, 1, 7, 11)), - assertNumStateRows(total = 7, updated = 2), + assertNumStateRows(total = 7, updated = 2, lateInput = 0), AddData(rightInput, (0, 30)), // watermark = 30 - 10 = 20, no-data-batch computes nulls CheckNewAnswer(outerResult), - assertNumStateRows(total = 2, updated = 1) + assertNumStateRows(total = 2, updated = 1, lateInput = 0) ) } } @@ -736,40 +737,41 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // leftValue <= 10 should generate outer join rows even though it matches right keys MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3), CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)), - assertNumStateRows(total = 3, updated = 3), // only right 1, 2, 3 added + assertNumStateRows(total = 3, updated = 3, lateInput = 0), // only right 1, 2, 3 added MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch cleared < 10 CheckNewAnswer(), - assertNumStateRows(total = 2, updated = 2), // only 20 and 21 left in state + assertNumStateRows(total = 2, updated = 2, lateInput = 0), // only 20 and 21 left in state AddData(rightInput, 20), CheckNewAnswer(Row(20, 30, 40, 60)), - assertNumStateRows(total = 3, updated = 1), + assertNumStateRows(total = 3, updated = 1, lateInput = 0), // leftValue and rightValue both satisfying condition should not generate outer join rows MultiAddData(leftInput, 40, 41)(rightInput, 40, 41), // watermark = 31 CheckNewAnswer((40, 50, 80, 120), (41, 50, 82, 123)), - assertNumStateRows(total = 4, updated = 4), // only left 40, 41 + right 40,41 left in state + // only left 40, 41 + right 40,41 left in state + assertNumStateRows(total = 4, updated = 4, lateInput = 0), MultiAddData(leftInput, 70)(rightInput, 71), // watermark = 60 CheckNewAnswer(), - assertNumStateRows(total = 2, updated = 2), // only 70, 71 left in state + assertNumStateRows(total = 2, updated = 2, lateInput = 0), // only 70, 71 left in state AddData(rightInput, 70), CheckNewAnswer((70, 80, 140, 210)), - assertNumStateRows(total = 3, updated = 1), + assertNumStateRows(total = 3, updated = 1, lateInput = 0), // rightValue between 300 and 1000 should generate outer join rows even though it matches left MultiAddData(leftInput, 101, 102, 103)(rightInput, 101, 102, 103), // watermark = 91 CheckNewAnswer(), - assertNumStateRows(total = 6, updated = 3), // only 101 - 103 left in state + assertNumStateRows(total = 6, updated = 3, lateInput = 0), // only 101 - 103 left in state MultiAddData(leftInput, 1000)(rightInput, 1001), CheckNewAnswer( Row(101, 110, 202, null), Row(102, 110, 204, null), Row(103, 110, 206, null)), - assertNumStateRows(total = 2, updated = 2) + assertNumStateRows(total = 2, updated = 2, lateInput = 0) ) } @@ -803,7 +805,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L) // right: (2, 2L), (4, 4L) CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)), - assertNumStateRows(7, 7), + assertNumStateRows(7, 7, 0), AddData(inputStream, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), // batch 2 - global watermark = 5 @@ -817,7 +819,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with // NOTE: look for evicted rows in right which are not evicted from left - they were // properly joined in batch 1 CheckNewAnswer((6, 6L, 6, 6L), (8, 8L, 8, 8L), (10, 10L, 10, 10L)), - assertNumStateRows(13, 8), + assertNumStateRows(13, 8, 0), AddData(inputStream, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), // batch 3 @@ -832,7 +834,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with CheckNewAnswer( Row(12, 12L, 12, 12L), Row(14, 14L, 14, 14L), Row(1, 1L, null, null), Row(3, 3L, null, null)), - assertNumStateRows(15, 7) + assertNumStateRows(15, 7, 0) ) } @@ -866,17 +868,17 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with testStream(query)( AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), CheckNewAnswer((2, 2L, 2, 2L), (4, 4L, 4, 4L)), - assertNumStateRows(7, 7), + assertNumStateRows(7, 7, 0), AddData(inputStream, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), CheckNewAnswer((6, 6L, 6, 6L), (8, 8L, 8, 8L), (10, 10L, 10, 10L)), - assertNumStateRows(13, 8), + assertNumStateRows(13, 8, 0), AddData(inputStream, (11, 11L), (12, 12L), (13, 13L), (14, 14L), (15, 15L)), CheckNewAnswer( Row(12, 12L, 12, 12L), Row(14, 14L, 14, 14L), Row(null, null, 1, 1L), Row(null, null, 3, 3L)), - assertNumStateRows(15, 7) + assertNumStateRows(15, 7, 0) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 08b3644745f9..a3aaccc2e6c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -63,6 +63,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, + | "numLateInputRows" : 0, | "memoryUsedBytes" : 3, | "customMetrics" : { | "loadedMapCacheHitCount" : 1, @@ -113,6 +114,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, + | "numLateInputRows" : 0, | "memoryUsedBytes" : 2 | } ], | "sources" : [ { @@ -321,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, + numRowsTotal = 0, numRowsUpdated = 1, numLateInputRows = 0, memoryUsedBytes = 3, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) .mapValues(long2Long).asJava) @@ -353,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), + numRowsTotal = 0, numRowsUpdated = 1, numLateInputRows = 0, memoryUsedBytes = 2)), sources = Array( new SourceProgress( description = "source",