-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7113][Streaming] Support input information reporting for Direct Kafka stream #5879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext} | |
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} | ||
| import org.apache.spark.streaming.dstream.DStream | ||
| import org.apache.spark.streaming.scheduler._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| class DirectKafkaStreamSuite | ||
|
|
@@ -290,7 +291,6 @@ class DirectKafkaStreamSuite | |
| }, | ||
| "Recovered ranges are not the same as the ones generated" | ||
| ) | ||
|
|
||
| // Restart context, give more data and verify the total at the end | ||
| // If the total is write that means each records has been received only once | ||
| ssc.start() | ||
|
|
@@ -301,6 +301,49 @@ class DirectKafkaStreamSuite | |
| ssc.stop() | ||
| } | ||
|
|
||
| test("Direct Kafka stream report input information") { | ||
| val topic = "report-test" | ||
| val data = Map("a" -> 7, "b" -> 9) | ||
| kafkaTestUtils.createTopic(topic) | ||
| kafkaTestUtils.sendMessages(topic, data) | ||
|
|
||
| val totalSent = data.values.sum | ||
| val kafkaParams = Map( | ||
| "metadata.broker.list" -> kafkaTestUtils.brokerAddress, | ||
| "auto.offset.reset" -> "smallest" | ||
| ) | ||
|
|
||
| import DirectKafkaStreamSuite._ | ||
| ssc = new StreamingContext(sparkConf, Milliseconds(200)) | ||
| val collector = new InputInfoCollector | ||
| ssc.addStreamingListener(collector) | ||
|
|
||
| val stream = withClue("Error creating direct stream") { | ||
| KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( | ||
| ssc, kafkaParams, Set(topic)) | ||
| } | ||
|
|
||
| val allReceived = new ArrayBuffer[(String, String)] | ||
|
|
||
| stream.foreachRDD { rdd => allReceived ++= rdd.collect() } | ||
| ssc.start() | ||
| eventually(timeout(20000.milliseconds), interval(200.milliseconds)) { | ||
| assert(allReceived.size === totalSent, | ||
| "didn't get expected number of messages, messages:\n" + allReceived.mkString("\n")) | ||
| } | ||
| ssc.stop() | ||
|
|
||
| // Calculate all the record number collected in the StreamingListener. | ||
| val numRecordsSubmitted = collector.streamIdToNumRecordsSubmitted.map(_.values.sum).sum | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is probably race condition here. The collector may not have received the batch completed signal when the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also used AtomicLong or AtomicInt to handle multithread access. Otherwise there can be race conditions leading to flakiness.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont think there's a multi-thread issue, I tested the number of records until the StreamingContext is stopped, so I think at that point there's no other thread will access collector object. Anyway I just only test total number of records, so AtomicLong is enough, I will change to that way.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not sure why all the assert should be in the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, I get it, thanks a lot for your explanation. |
||
| assert(numRecordsSubmitted === totalSent) | ||
|
|
||
| val numRecordsStarted = collector.streamIdToNumRecordsStarted.map(_.values.sum).sum | ||
| assert(numRecordsStarted === totalSent) | ||
|
|
||
| val numRecordsCompleted = collector.streamIdToNumRecordsCompleted.map(_.values.sum).sum | ||
| assert(numRecordsCompleted === totalSent) | ||
| } | ||
|
|
||
| /** Get the generated offset ranges from the DirectKafkaStream */ | ||
| private def getOffsetRanges[K, V]( | ||
| kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { | ||
|
|
@@ -313,4 +356,24 @@ class DirectKafkaStreamSuite | |
| object DirectKafkaStreamSuite { | ||
| val collectedData = new mutable.ArrayBuffer[String]() | ||
| var total = -1L | ||
|
|
||
| class InputInfoCollector extends StreamingListener { | ||
| val streamIdToNumRecordsSubmitted = new ArrayBuffer[Map[Int, Long]]() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are just going to test the total number of records received, why do you need arraybuffers?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's not necessary, I will change it. |
||
| val streamIdToNumRecordsStarted = new ArrayBuffer[Map[Int, Long]]() | ||
| val streamIdToNumRecordsCompleted = new ArrayBuffer[Map[Int, Long]]() | ||
|
|
||
| override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = | ||
| synchronized { | ||
| streamIdToNumRecordsSubmitted += batchSubmitted.batchInfo.streamIdToNumRecords | ||
| } | ||
|
|
||
| override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized { | ||
| streamIdToNumRecordsStarted += batchStarted.batchInfo.streamIdToNumRecords | ||
| } | ||
|
|
||
| override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = | ||
| synchronized { | ||
| streamIdToNumRecordsCompleted += batchCompleted.batchInfo.streamIdToNumRecords | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -192,8 +192,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) | |
| val latestReceiverNumRecords = latestBatchInfos.map(_.receiverNumRecords) | ||
| val streamIds = ssc.graph.getInputStreams().map(_.id) | ||
| streamIds.map { id => | ||
| val recordsOfParticularReceiver = | ||
| latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration) | ||
| val recordsOfParticularReceiver = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to change this indent. This method will be removed in #5533 |
||
| latestReceiverNumRecords.map(v => v.getOrElse(id, 0L).toDouble * 1000 / batchDuration) | ||
| val distribution = Distribution(recordsOfParticularReceiver) | ||
| (id, distribution) | ||
| }.toMap | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentNumRecordssounds like "current number of records to be processed". That is not what this is. Also the way you are calculating this is not very intuitive. A better way would beThat is, map each range to the number of record and add them.