From 0babe66e426c57f4f738387bee12098c15b4ebf3 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 4 May 2015 11:14:42 +0800 Subject: [PATCH 1/2] Support input information reporting for Direct Kafka stream --- .../kafka/DirectKafkaInputDStream.scala | 11 +++- .../kafka/DirectKafkaStreamSuite.scala | 65 ++++++++++++++++++- .../ui/StreamingJobProgressListener.scala | 4 +- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 1b1fc8051d05..12e8e530fe3e 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -17,7 +17,6 @@ package org.apache.spark.streaming.kafka - import scala.annotation.tailrec import scala.collection.mutable import scala.reflect.{classTag, ClassTag} @@ -27,10 +26,10 @@ import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.{Logging, SparkException} -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset +import org.apache.spark.streaming.scheduler.InputInfo /** * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where @@ -117,6 +116,12 @@ class DirectKafkaInputDStream[ val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) + // Report the number of records of the batch interval to InputInfoTracker. + val currentNumRecords = currentOffsets.map(_._2).sum + val toBeProcessedNumRecords = untilOffsets.map(_._2.offset).sum + val inputInfo = InputInfo(id, toBeProcessedNumRecords - currentNumRecords) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) + currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 415730f5559c..a579a95301c6 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.scheduler._ import org.apache.spark.util.Utils class DirectKafkaStreamSuite @@ -290,7 +291,6 @@ class DirectKafkaStreamSuite }, "Recovered ranges are not the same as the ones generated" ) - // Restart context, give more data and verify the total at the end // If the total is write that means each records has been received only once ssc.start() @@ -301,6 +301,49 @@ class DirectKafkaStreamSuite ssc.stop() } + test("Direct Kafka stream report input information") { + val topic = "report-test" + val data = Map("a" -> 7, "b" -> 9) + kafkaTestUtils.createTopic(topic) + kafkaTestUtils.sendMessages(topic, data) + + val totalSent = data.values.sum + val kafkaParams = Map( + "metadata.broker.list" -> kafkaTestUtils.brokerAddress, + "auto.offset.reset" -> "smallest" + ) + + import DirectKafkaStreamSuite._ + ssc = new StreamingContext(sparkConf, Milliseconds(200)) + val collector = new InputInfoCollector + ssc.addStreamingListener(collector) + + val stream = withClue("Error creating direct stream") { + KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, Set(topic)) + } + + val allReceived = new ArrayBuffer[(String, String)] + + stream.foreachRDD { rdd => allReceived ++= rdd.collect() } + ssc.start() + eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { + assert(allReceived.size === totalSent, + "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + } + ssc.stop() + + // Calculate all the record number collected in the StreamingListener. + val numRecordsSubmitted = collector.streamIdToNumRecordsSubmitted.map(_.values.sum).sum + assert(numRecordsSubmitted === totalSent) + + val numRecordsStarted = collector.streamIdToNumRecordsStarted.map(_.values.sum).sum + assert(numRecordsStarted === totalSent) + + val numRecordsCompleted = collector.streamIdToNumRecordsCompleted.map(_.values.sum).sum + assert(numRecordsCompleted === totalSent) + } + /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { @@ -313,4 +356,24 @@ class DirectKafkaStreamSuite object DirectKafkaStreamSuite { val collectedData = new mutable.ArrayBuffer[String]() var total = -1L + + class InputInfoCollector extends StreamingListener { + val streamIdToNumRecordsSubmitted = new ArrayBuffer[Map[Int, Long]]() + val streamIdToNumRecordsStarted = new ArrayBuffer[Map[Int, Long]]() + val streamIdToNumRecordsCompleted = new ArrayBuffer[Map[Int, Long]]() + + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = + synchronized { + streamIdToNumRecordsSubmitted += batchSubmitted.batchInfo.streamIdToNumRecords + } + + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized { + streamIdToNumRecordsStarted += batchStarted.batchInfo.streamIdToNumRecords + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = + synchronized { + streamIdToNumRecordsCompleted += batchCompleted.batchInfo.streamIdToNumRecords + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index d2729fa70d6d..24cbb2bf9d8f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -192,8 +192,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) val latestReceiverNumRecords = latestBatchInfos.map(_.receiverNumRecords) val streamIds = ssc.graph.getInputStreams().map(_.id) streamIds.map { id => - val recordsOfParticularReceiver = - latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration) + val recordsOfParticularReceiver = + latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration) val distribution = Distribution(recordsOfParticularReceiver) (id, distribution) }.toMap From b0b506c363968b1f0ef27d71986618bb514f0f3c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Tue, 5 May 2015 14:58:47 +0800 Subject: [PATCH 2/2] Address the comments --- .../kafka/DirectKafkaInputDStream.scala | 7 ++-- .../kafka/DirectKafkaStreamSuite.scala | 36 ++++++++----------- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 12e8e530fe3e..6715aede7928 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -116,10 +116,9 @@ class DirectKafkaInputDStream[ val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) - // Report the number of records of the batch interval to InputInfoTracker. - val currentNumRecords = currentOffsets.map(_._2).sum - val toBeProcessedNumRecords = untilOffsets.map(_._2.offset).sum - val inputInfo = InputInfo(id, toBeProcessedNumRecords - currentNumRecords) + // Report the record number of this batch interval to InputInfoTracker. + val numRecords = rdd.offsetRanges.map(r => r.untilOffset - r.fromOffset).sum + val inputInfo = InputInfo(id, numRecords) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index a579a95301c6..b6d314dfc778 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka import java.io.File +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -330,18 +331,13 @@ class DirectKafkaStreamSuite eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { assert(allReceived.size === totalSent, "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) + + // Calculate all the record number collected in the StreamingListener. + assert(collector.numRecordsSubmitted.get() === totalSent) + assert(collector.numRecordsStarted.get() === totalSent) + assert(collector.numRecordsCompleted.get() === totalSent) } ssc.stop() - - // Calculate all the record number collected in the StreamingListener. - val numRecordsSubmitted = collector.streamIdToNumRecordsSubmitted.map(_.values.sum).sum - assert(numRecordsSubmitted === totalSent) - - val numRecordsStarted = collector.streamIdToNumRecordsStarted.map(_.values.sum).sum - assert(numRecordsStarted === totalSent) - - val numRecordsCompleted = collector.streamIdToNumRecordsCompleted.map(_.values.sum).sum - assert(numRecordsCompleted === totalSent) } /** Get the generated offset ranges from the DirectKafkaStream */ @@ -358,22 +354,20 @@ object DirectKafkaStreamSuite { var total = -1L class InputInfoCollector extends StreamingListener { - val streamIdToNumRecordsSubmitted = new ArrayBuffer[Map[Int, Long]]() - val streamIdToNumRecordsStarted = new ArrayBuffer[Map[Int, Long]]() - val streamIdToNumRecordsCompleted = new ArrayBuffer[Map[Int, Long]]() + val numRecordsSubmitted = new AtomicLong(0L) + val numRecordsStarted = new AtomicLong(0L) + val numRecordsCompleted = new AtomicLong(0L) - override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = - synchronized { - streamIdToNumRecordsSubmitted += batchSubmitted.batchInfo.streamIdToNumRecords + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { + numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords) } - override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized { - streamIdToNumRecordsStarted += batchStarted.batchInfo.streamIdToNumRecords + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { + numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords) } - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = - synchronized { - streamIdToNumRecordsCompleted += batchCompleted.batchInfo.streamIdToNumRecords + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords) } } }