Skip to content

Commit 44721a6

Browse files
committed
Further address the comments
1 parent c097ddc commit 44721a6

File tree

1 file changed

+27
-18
lines changed

1 file changed

+27
-18
lines changed

streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,58 +28,67 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
2828

2929
private val streamingListener = ssc.progressListener
3030

31-
private def registerGauge[T](name: String, f: StreamingJobProgressListener => Option[T],
31+
private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
3232
defaultValue: T) {
33+
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
34+
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
35+
})
36+
}
37+
38+
private def registerGaugeWithOption[T](
39+
name: String,
40+
f: StreamingJobProgressListener => Option[T],
41+
defaultValue: T): Unit = {
3342
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
3443
override def getValue: T = f(streamingListener).getOrElse(defaultValue)
3544
})
3645
}
3746

3847
// Gauge for number of network receivers
39-
registerGauge("receivers", l => Option(l.numReceivers), 0)
48+
registerGauge("receivers", _.numReceivers, 0)
4049

4150
// Gauge for number of total completed batches
42-
registerGauge("totalCompletedBatches", l => Option(l.numTotalCompletedBatches), 0L)
51+
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
4352

4453
// Gauge for number of unprocessed batches
45-
registerGauge("unprocessedBatches", l => Option(l.numUnprocessedBatches), 0L)
54+
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
4655

4756
// Gauge for number of waiting batches
48-
registerGauge("waitingBatches", l => Option(l.waitingBatches.size), 0L)
57+
registerGauge("waitingBatches", _.waitingBatches.size, 0L)
4958

5059
// Gauge for number of running batches
51-
registerGauge("runningBatches", l => Option(l.runningBatches.size), 0L)
60+
registerGauge("runningBatches", _.runningBatches.size, 0L)
5261

5362
// Gauge for number of retained completed batches
54-
registerGauge("retainedCompletedBatches", l => Option(l.retainedCompletedBatches.size), 0L)
63+
registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L)
5564

5665
// Gauge for last completed batch, useful for monitoring the streaming job's running status,
5766
// displayed data -1 for any abnormal condition.
58-
registerGauge("lastCompletedBatch_submissionTime",
67+
registerGaugeWithOption("lastCompletedBatch_submissionTime",
5968
_.lastCompletedBatch.map(_.submissionTime), -1L)
60-
registerGauge("lastCompletedBatch_processingStartTime",
69+
registerGaugeWithOption("lastCompletedBatch_processingStartTime",
6170
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
62-
registerGauge("lastCompletedBatch_processingEndTime",
71+
registerGaugeWithOption("lastCompletedBatch_processingEndTime",
6372
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
6473

6574
// Gauge for last completed batch's delay information.
66-
registerGauge("lastCompletedBatch_processingTime",
75+
registerGaugeWithOption("lastCompletedBatch_processingDelay",
6776
_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
68-
registerGauge("lastCompletedBatch_schedulingDelay",
77+
registerGaugeWithOption("lastCompletedBatch_schedulingDelay",
6978
_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
70-
registerGauge("lastCompletedBatch_totalDelay",
79+
registerGaugeWithOption("lastCompletedBatch_totalDelay",
7180
_.lastCompletedBatch.flatMap(_.totalDelay), -1L)
7281

7382
// Gauge for last received batch, useful for monitoring the streaming job's running status,
7483
// displayed data -1 for any abnormal condition.
75-
registerGauge("lastReceivedBatch_submissionTime",
84+
registerGaugeWithOption("lastReceivedBatch_submissionTime",
7685
_.lastCompletedBatch.map(_.submissionTime), -1L)
77-
registerGauge("lastReceivedBatch_processingStartTime",
86+
registerGaugeWithOption("lastReceivedBatch_processingStartTime",
7887
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
79-
registerGauge("lastReceivedBatch_processingEndTime",
88+
registerGaugeWithOption("lastReceivedBatch_processingEndTime",
8089
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)
8190

8291
// Gauge for last received batch records and total received batch records.
83-
registerGauge("lastReceivedBatchRecords", l => Option(l.lastReceivedBatchRecords.values.sum), 0L)
84-
registerGauge("totalReceivedBatchRecords", l => Option(l.numTotalReceivedBatchRecords), 0L)
92+
registerGauge("lastReceivedBatchRecords", _.lastReceivedBatchRecords.values.sum, 0L)
93+
registerGauge("totalReceivedBatchRecords", _.numTotalReceivedBatchRecords, 0L)
8594
}

0 commit comments

Comments
 (0)