File tree Expand file tree Collapse file tree 2 files changed +6
-2
lines changed
external/kafka/src/main/scala/org/apache/spark/streaming/kafka Expand file tree Collapse file tree 2 files changed +6
-2
lines changed Original file line number Diff line number Diff line change @@ -128,7 +128,9 @@ class KafkaReceiver[
128128 def run () {
129129 logInfo(" Starting MessageHandler." )
130130 try {
131- for (msgAndMetadata <- stream) {
131+ val streamIterator = stream.iterator()
132+ while (streamIterator.hasNext()) {
133+ val msgAndMetadata = streamIterator.next()
132134 store((msgAndMetadata.key, msgAndMetadata.message))
133135 }
134136 } catch {
Original file line number Diff line number Diff line change @@ -206,7 +206,9 @@ class ReliableKafkaReceiver[
206206 override def run (): Unit = {
207207 logInfo(s " Starting message process thread ${Thread .currentThread().getId}. " )
208208 try {
209- for (msgAndMetadata <- stream) {
209+ val streamIterator = stream.iterator()
210+ while (streamIterator.hasNext()) {
211+ val msgAndMetadata = streamIterator.next()
210212 val topicAndPartition = TopicAndPartition (
211213 msgAndMetadata.topic, msgAndMetadata.partition)
212214 blockGenerator.synchronized {
You can’t perform that action at this time.
0 commit comments