Skip to content

Commit 02dd44f

Browse files
committed
Fix the addressed comments
1 parent c7a9376 commit 02dd44f

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,6 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
8080
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
8181

8282
// Gauge for last received batch records and total received batch records.
83-
private var totalReceivedBatchRecords: Long = 0L
84-
def getTotalReceivedBatchRecords(listener: StreamingJobProgressListener): Long = {
85-
totalReceivedBatchRecords += listener.lastReceivedBatchRecords.values.sum
86-
totalReceivedBatchRecords
87-
}
88-
8983
registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
90-
registerGauge("totalReceivedBatchRecords", getTotalReceivedBatchRecords, 0L)
84+
registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L)
9185
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
2525
import org.apache.spark.streaming.scheduler.BatchInfo
2626
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
2727
import org.apache.spark.util.Distribution
28-
import org.apache.spark.Logging
2928

3029

3130
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -37,6 +36,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3736
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
3837
private var totalCompletedBatches = 0L
3938
private val receiverInfos = new HashMap[Int, ReceiverInfo]
39+
private var totalReceivedBatchRecords = 0L
4040

4141
val batchDuration = ssc.graph.batchDuration.milliseconds
4242

@@ -65,6 +65,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
6565
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
6666
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
6767
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
68+
69+
batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
70+
totalReceivedBatchRecords += infos.map(_.numRecords).sum
71+
}
6872
}
6973

7074
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
@@ -83,6 +87,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
8387
totalCompletedBatches
8488
}
8589

90+
def numTotalReceivedBatchRecords: Long = synchronized {
91+
totalReceivedBatchRecords
92+
}
93+
8694
def numUnprocessedBatches: Long = synchronized {
8795
waitingBatchInfos.size + runningBatchInfos.size
8896
}

0 commit comments

Comments
 (0)