Skip to content

Commit 00f5f7f

Browse files
committed
Change the code style and add totalProcessedRecords
1 parent 44721a6 commit 00f5f7f

File tree

2 files changed

+24
-11
lines changed

2 files changed

+24
-11
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
2929
private val streamingListener = ssc.progressListener
3030

3131
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
32-
defaultValue: T) {
33-
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
34-
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
35-
})
32+
defaultValue: T): Unit = {
33+
registerGaugeWithOption[T](name,
34+
(l: StreamingJobProgressListener) => Option(f(streamingListener)), defaultValue)
3635
}
3736

3837
private def registerGaugeWithOption[T](
@@ -50,6 +49,12 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
5049
// Gauge for number of total completed batches
5150
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
5251

52+
// Gauge for number of total received records
53+
registerGauge("totalReceivedRecords", _.numTotalReceivedRecords, 0L)
54+
55+
// Gauge for number of total processed records
56+
registerGauge("totalProcessedRecords", _.numTotalProcessedRecords, 0L)
57+
5358
// Gauge for number of unprocessed batches
5459
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
5560

@@ -88,7 +93,6 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
8893
registerGaugeWithOption("lastReceivedBatch_processingEndTime",
8994
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
9095

91-
// Gauge for last received batch records and total received batch records.
92-
registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
93-
registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L)
96+
// Gauge for last received batch records.
97+
registerGauge("lastReceivedBatch_records", _.lastReceivedBatchRecords.values.sum, 0L)
9498
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3535
private val completedaBatchInfos = new Queue[BatchInfo]
3636
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
3737
private var totalCompletedBatches = 0L
38+
private var totalReceivedRecords = 0L
39+
private var totalProcessedRecords = 0L
3840
private val receiverInfos = new HashMap[Int, ReceiverInfo]
39-
private var totalReceivedBatchRecords = 0L
4041

4142
val batchDuration = ssc.graph.batchDuration.milliseconds
4243

@@ -67,7 +68,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
6768
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
6869

6970
batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
70-
totalReceivedBatchRecords += infos.map(_.numRecords).sum
71+
totalReceivedRecords += infos.map(_.numRecords).sum
7172
}
7273
}
7374

@@ -77,6 +78,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
7778
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
7879
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
7980
totalCompletedBatches += 1L
81+
82+
batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
83+
totalProcessedRecords += infos.map(_.numRecords).sum
84+
}
8085
}
8186

8287
def numReceivers = synchronized {
@@ -87,8 +92,12 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
8792
totalCompletedBatches
8893
}
8994

90-
def numTotalReceivedBatchRecords: Long = synchronized {
91-
totalReceivedBatchRecords
95+
def numTotalReceivedRecords: Long = synchronized {
96+
totalReceivedRecords
97+
}
98+
99+
def numTotalProcessedRecords: Long = synchronized {
100+
totalProcessedRecords
92101
}
93102

94103
def numUnprocessedBatches: Long = synchronized {

0 commit comments

Comments
 (0)