From ff1b89553acc7ea3a19b586457dd295255047377 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Sat, 23 Jun 2018 11:34:16 +0900 Subject: [PATCH] SPARK-24634 Add a new metric regarding number of rows later than watermark * This adds a new metric to count the number of rows arrived later than watermark --- .../FlatMapGroupsWithStateExec.scala | 2 +- .../StreamingSymmetricHashJoinExec.scala | 2 +- .../streaming/statefulOperators.scala | 23 ++++++++++--- .../apache/spark/sql/streaming/progress.scala | 8 +++-- .../streaming/EventTimeWatermarkSuite.scala | 16 ++++++++-- .../sql/streaming/StateStoreMetricsTest.scala | 13 +++++--- .../StreamingDeduplicationSuite.scala | 32 +++++++++---------- .../sql/streaming/StreamingJoinSuite.scala | 12 +++---- ...StreamingQueryStatusAndProgressSuite.scala | 10 +++--- 9 files changed, 76 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 8e82cccbc8fa..3dd5f6656ca9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -134,7 +134,7 @@ case class FlatMapGroupsWithStateExec( // If timeout is based on event time, then filter late data based on watermark val filteredIter = watermarkPredicateForData match { case Some(predicate) if timeoutConf == EventTimeTimeout => - iter.filter(row => !predicate.eval(row)) + applyRemovingRowsOlderThanWatermark(iter, predicate) case _ => iter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index 50cf971e4ec3..437b3568fb07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -432,7 +432,7 @@ case class StreamingSymmetricHashJoinExec( WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match { case Some(watermarkExpr) => val predicate = newPredicate(watermarkExpr, inputAttributes) - inputIter.filter { row => !predicate.eval(row) } + applyRemovingRowsOlderThanWatermark(inputIter, predicate) case None => inputIter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 6759fb42b405..4f6e8c324ac8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -77,6 +77,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numLateRows" -> SQLMetrics.createMetric(sparkContext, + "number of rows which are later than watermark"), "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"), "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"), "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to update rows"), @@ -93,7 +95,9 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => new StateOperatorProgress( numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, - memoryUsedBytes = longMetric("stateMemory").value) + memoryUsedBytes = longMetric("stateMemory").value, + numLateInputRows = longMetric("numLateRows").value + ) } /** Records the duration of running `body` for the next query progress update. */ @@ -122,6 +126,15 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => }.toMap } + protected def applyRemovingRowsOlderThanWatermark(iter: Iterator[InternalRow], + predicate: Predicate): Iterator[InternalRow] = { + iter.filter { row => + val filteredIn = !predicate.eval(row) + if (!filteredIn) longMetric("numLateRows") += 1 + filteredIn + } + } + /** * Should the MicroBatchExecution run another batch based on this stateful operator and the * current updated metadata. @@ -301,7 +314,8 @@ case class StateStoreSaveExec( // Assumption: watermark predicates must be non-empty if append mode is allowed case Some(Append) => allUpdatesTimeMs += timeTakenMs { - val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row)) + val filteredIter = applyRemovingRowsOlderThanWatermark(iter, + watermarkPredicateForData.get) while (filteredIter.hasNext) { val row = filteredIter.next().asInstanceOf[UnsafeRow] val key = getKey(row) @@ -344,7 +358,7 @@ case class StateStoreSaveExec( new NextIterator[InternalRow] { // Filter late date using watermark if specified private[this] val baseIterator = watermarkPredicateForData match { - case Some(predicate) => iter.filter((row: InternalRow) => !predicate.eval(row)) + case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) case None => iter } private val updatesStartTimeNs = System.nanoTime @@ -421,14 +435,13 @@ case class StreamingDeduplicateExec( Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) => val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) val numOutputRows = longMetric("numOutputRows") - val numTotalStateRows = longMetric("numTotalStateRows") val numUpdatedStateRows = longMetric("numUpdatedStateRows") val allUpdatesTimeMs = longMetric("allUpdatesTimeMs") val allRemovalsTimeMs = longMetric("allRemovalsTimeMs") val commitTimeMs = longMetric("commitTimeMs") val baseIterator = watermarkPredicateForData match { - case Some(predicate) => iter.filter(row => !predicate.eval(row)) + case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter, predicate) case None => iter } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 0dcb666e2c3e..0175ced10c32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -38,7 +38,8 @@ import org.apache.spark.annotation.InterfaceStability class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, - val memoryUsedBytes: Long + val memoryUsedBytes: Long, + val numLateInputRows: Long ) extends Serializable { /** The compact JSON representation of this progress. */ @@ -48,12 +49,13 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numLateInputRows) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ - ("memoryUsedBytes" -> JInt(memoryUsedBytes)) + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ + ("numLateInputRows" -> JInt(numLateInputRows)) } override def toString: String = prettyJson diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 7e8fde1ff8e5..9395ee6a048d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -164,9 +164,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((10, 5)), assertNumStateRows(2), + assertNumLateInputRows(0), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateInputRows(1) ) } @@ -187,12 +189,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche AddData(inputData, 25), // Advance watermark to 15 seconds CheckNewAnswer((25, 1)), assertNumStateRows(2), + assertNumLateInputRows(0), AddData(inputData, 10, 25), // Ignore 10 as its less than watermark CheckNewAnswer((25, 2)), assertNumStateRows(2), + assertNumLateInputRows(1), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(2) + assertNumStateRows(2), + assertNumLateInputRows(1) ) } @@ -491,6 +496,13 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } + private def assertNumLateInputRows(numLateInputRows: Long): AssertOnQuery = AssertOnQuery { q => + q.processAllAvailable() + val progressWithData = q.recentProgress.lastOption.get + assert(progressWithData.stateOperators(0).numLateInputRows === numLateInputRows) + true + } + private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { AssertOnQuery { q => body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala index e45f9d3e2e97..dd98034c9e63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StateStoreMetricsTest.scala @@ -29,8 +29,10 @@ trait StateStoreMetricsTest extends StreamTest { lastCheckedRecentProgressIndex = -1 } - def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = - AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated") { q => + def assertNumStateRows(total: Seq[Long], updated: Seq[Long], + lateInputRows: Seq[Long]): AssertOnQuery = + AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" + + s", late input rows = $lateInputRows") { q => val recentProgress = q.recentProgress require(recentProgress.nonEmpty, "No progress made, cannot check num state rows") require(recentProgress.length < spark.sessionState.conf.streamingProgressRetention, @@ -57,12 +59,15 @@ trait StateStoreMetricsTest extends StreamTest { val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators) assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString") + val numLateInputRows = recentProgress.last.stateOperators.map(_.numLateInputRows) + assert(numLateInputRows === lateInputRows, s"incorrect late input rows, $debugString") + lastCheckedRecentProgressIndex = recentProgress.length - 1 true } - def assertNumStateRows(total: Long, updated: Long): AssertOnQuery = - assertNumStateRows(Seq(total), Seq(updated)) + def assertNumStateRows(total: Long, updated: Long, lateInputRows: Long = 0): AssertOnQuery = + assertNumStateRows(Seq(total), Seq(updated), Seq(lateInputRows)) def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = { if (arraySeq.isEmpty) return Seq.fill(arrayLength)(0L) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index 42ffd472eb84..e990ba88b11b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -76,15 +76,15 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf testStream(result, Append)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), // Dropped from the second `dropDuplicates` CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(0L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -107,7 +107,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf AddData(inputData, 10), // Should not emit anything as data less than watermark CheckNewAnswer(), - assertNumStateRows(total = 1, updated = 0), + assertNumStateRows(total = 1, updated = 0, lateInputRows = 1), AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25 CheckNewAnswer(45), @@ -131,23 +131,23 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf CheckLastBatch(), // states in aggregate in [10, 14), [15, 20) (2 windows) // states in deduplicate is 10 to 15 - assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L)), + assertNumStateRows(total = Seq(2L, 6L), updated = Seq(2L, 6L), lateInputRows = Seq(0L, 0L)), AddData(inputData, 25), // Advance watermark to 15 seconds CheckLastBatch((10 -> 5)), // 5 items (10 to 14) after deduplicate, emitted with no-data-batch // states in aggregate in [15, 20) and [25, 30); no-data-batch removed [10, 14) // states in deduplicate is 25, no-data-batch removed 10 to 14 - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, 10), // Should not emit anything as data less than watermark CheckLastBatch(), - assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 1L)), AddData(inputData, 40), // Advance watermark to 30 seconds CheckLastBatch((15 -> 1), (25 -> 1)), // states in aggregate is [40, 45); no-data-batch removed [15, 20) and [25, 30) // states in deduplicate is 40; no-data-batch removed 25 - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -163,16 +163,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf testStream(result, Update)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 1), // Dropped CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } @@ -188,16 +188,16 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest with BeforeAndAf testStream(result, Complete)( AddData(inputData, "a" -> 1), CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 1), // Dropped CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), + assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "a" -> 2), CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), + assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)), AddData(inputData, "b" -> 1), CheckLastBatch("a" -> 3L, "b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) + assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L), lateInputRows = Seq(0L, 0L)) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index c5cc8df4356a..f14ca5c662ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -159,11 +159,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(input2, 1), CheckNewAnswer(), // Should not join as < 15 removed - assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15 + assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15 AddData(input1, 5), CheckNewAnswer(), // Same reason as above - assertNumStateRows(total = 2, updated = 0) + assertNumStateRows(total = 2, updated = 0, lateInputRows = 1) // row later than watermark ) } @@ -216,12 +216,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with // (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state AddData(rightInput, (1, 20), (1, 21), (1, 28)), CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)), - assertNumStateRows(total = 5, updated = 1), + assertNumStateRows(total = 5, updated = 1, lateInputRows = 1), // New data to left input with leftTime <= 20 should be filtered due to event time watermark AddData(leftInput, (1, 20), (1, 21)), CheckNewAnswer((1, 21, 28)), - assertNumStateRows(total = 6, updated = 1) + assertNumStateRows(total = 6, updated = 1, lateInputRows = 1) ) } @@ -290,7 +290,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)), - assertNumStateRows(total = 11, updated = 1), // only 31 added + assertNumStateRows(total = 11, updated = 1, lateInputRows = 1), // only 31 added // Advance the watermark AddData(rightInput, (1, 80)), @@ -304,7 +304,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state CheckNewAnswer((1, 49, 50), (1, 50, 50)), - assertNumStateRows(total = 7, updated = 1) // 50 added + assertNumStateRows(total = 7, updated = 1, lateInputRows = 1) // 50 added ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 79bb827e0de9..96bbd056f25e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -58,7 +58,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, - | "memoryUsedBytes" : 2 + | "memoryUsedBytes" : 2, + | "numLateInputRows" : 0 | } ], | "sources" : [ { | "description" : "source", @@ -91,7 +92,8 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, - | "memoryUsedBytes" : 2 + | "memoryUsedBytes" : 2, + | "numLateInputRows" : 0 | } ], | "sources" : [ { | "description" : "source", @@ -230,7 +232,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputRows = 0)), sources = Array( new SourceProgress( description = "source", @@ -254,7 +256,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputRows = 0)), sources = Array( new SourceProgress( description = "source",