Skip to content

Commit 14502d5

Browse files
zsxwingtdas
authored andcommitted
[SPARK-7405] [STREAMING] Fix the bug that ReceiverInputDStream doesn't report InputInfo
The bug is because SPARK-7139 removed some codes from SPARK-7112 unintentionally here: 1854ac3#diff-5c8651dd78abd20439b8eb938175075dL72 This PR just added them back and added some assertions in the tests to verify it. Author: zsxwing <[email protected]> Closes #5950 from zsxwing/SPARK-7405 and squashes the following commits: 675f5d9 [zsxwing] Fix the bug that ReceiverInputDStream doesn't report InputInfo
1 parent 71a452b commit 14502d5

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.storage.BlockId
2424
import org.apache.spark.streaming._
2525
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
2626
import org.apache.spark.streaming.receiver.Receiver
27+
import org.apache.spark.streaming.scheduler.InputInfo
2728
import org.apache.spark.streaming.util.WriteAheadLogUtils
2829

2930
/**
@@ -68,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
6869
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
6970
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
7071

72+
// Register the input blocks information into InputInfoTracker
73+
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
74+
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
75+
7176
// Are WAL record handles present with all the blocks
7277
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
7378

streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
5050

5151
// Set up the streaming context and input streams
5252
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
53+
ssc.addStreamingListener(ssc.progressListener)
54+
5355
val input = Seq(1, 2, 3, 4, 5)
5456
// Use "batchCount" to make sure we check the result after all batches finish
5557
val batchCounter = new BatchCounter(ssc)
@@ -72,6 +74,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
7274
if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
7375
fail("Timeout: cannot finish all batches in 30 seconds")
7476
}
77+
78+
// Verify all "InputInfo"s have been reported
79+
assert(ssc.progressListener.numTotalReceivedRecords === input.size)
80+
assert(ssc.progressListener.numTotalProcessedRecords === input.size)
81+
7582
logInfo("Stopping server")
7683
testServer.stop()
7784
logInfo("Stopping context")

0 commit comments

Comments
 (0)