Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,58 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {

private val streamingListener = ssc.progressListener

private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
private def registerGauge[T](name: String, f: StreamingJobProgressListener => Option[T],
defaultValue: T) {
metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
override def getValue: T = f(streamingListener).getOrElse(defaultValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its better to keep the Option here (and document that defaultValue is used when f returns null. And other places should not have to use Option. This is safer for any one to use and also minimizes the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will revert it back and try a better way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi TD, what's your meaning of "And other places should not have to use Option", If here as an example, change to

registerGauge("lastCompletedBatch_submissionTime",
    _.lastCompletedBatch.map(_.submissionTime).get, -1L)

get will throw exception when there's no completed batch. I'm not sure what's actual meaning, sorry if I misunderstand anything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi TD, sorry to bother you again, I'm not if there's a better way to address this problem, would you mind giving me some hints, thanks a lot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the problem. Good catch, I did not realize that. How about this. Lets make two versions of registerGauge, one that takes f: StreamingProgressListener => T without any default value, another that takes f: StreamingProgressListener => Option[T] and the default value. Each version will be used accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it, I will change the code as you suggested.

})
}

// Gauge for number of network receivers
registerGauge("receivers", _.numReceivers, 0)
registerGauge("receivers", l => Option(l.numReceivers), 0)

// Gauge for number of total completed batches
registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
registerGauge("totalCompletedBatches", l => Option(l.numTotalCompletedBatches), 0L)

// Gauge for number of unprocessed batches
registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
registerGauge("unprocessedBatches", l => Option(l.numUnprocessedBatches), 0L)

// Gauge for number of waiting batches
registerGauge("waitingBatches", _.waitingBatches.size, 0L)
registerGauge("waitingBatches", l => Option(l.waitingBatches.size), 0L)

// Gauge for number of running batches
registerGauge("runningBatches", _.runningBatches.size, 0L)
registerGauge("runningBatches", l => Option(l.runningBatches.size), 0L)

// Gauge for number of retained completed batches
registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L)
registerGauge("retainedCompletedBatches", l => Option(l.retainedCompletedBatches.size), 0L)

// Gauge for last completed batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastCompletedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastCompletedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
_.lastCompletedBatch.map(_.submissionTime), -1L)
registerGauge("lastCompletedBatch_processingStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
registerGauge("lastCompletedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)

// Gauge for last completed batch's delay information.
registerGauge("lastCompletedBatch_processingTime",
_.lastCompletedBatch.flatMap(_.processingDelay), -1L)
registerGauge("lastCompletedBatch_schedulingDelay",
_.lastCompletedBatch.flatMap(_.schedulingDelay), -1L)
registerGauge("lastCompletedBatch_totalDelay",
_.lastCompletedBatch.flatMap(_.totalDelay), -1L)

// Gauge for last received batch, useful for monitoring the streaming job's running status,
// displayed data -1 for any abnormal condition.
registerGauge("lastReceivedBatch_submissionTime",
_.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
registerGauge("lastReceivedBatch_processEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
_.lastCompletedBatch.map(_.submissionTime), -1L)
registerGauge("lastReceivedBatch_processingStartTime",
_.lastCompletedBatch.flatMap(_.processingStartTime), -1L)
registerGauge("lastReceivedBatch_processingEndTime",
_.lastCompletedBatch.flatMap(_.processingEndTime), -1L)

// Gauge for last received batch records and total received batch records.
registerGauge("lastReceivedBatchRecords", l => Option(l.lastReceivedBatchRecords.values.sum), 0L)
registerGauge("totalReceivedBatchRecords", l => Option(l.numTotalReceivedBatchRecords), 0L)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
import org.apache.spark.Logging


private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
Expand All @@ -37,6 +36,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
private var totalReceivedBatchRecords = 0L

val batchDuration = ssc.graph.batchDuration.milliseconds

Expand Down Expand Up @@ -65,6 +65,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)

batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalReceivedBatchRecords += infos.map(_.numRecords).sum
}
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
Expand All @@ -83,6 +87,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
totalCompletedBatches
}

def numTotalReceivedBatchRecords: Long = synchronized {
totalReceivedBatchRecords
}

def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
}
Expand Down