From 1fc5863db88cac9dfd0be09318c4ca8779a51682 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 6 Oct 2016 20:08:01 -0500 Subject: [PATCH 1/2] [SPARK-17782][STREAMING][KAFKA] eliminate race condition of poll being called twice and moving position --- .../apache/spark/streaming/kafka010/ConsumerStrategy.scala | 5 ++++- .../spark/streaming/kafka010/DirectKafkaInputDStream.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 60255fc655e5f..18c6dc8b7e252 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -104,6 +104,8 @@ private case class Subscribe[K, V]( toSeek.asScala.foreach { case (topicPartition, offset) => consumer.seek(topicPartition, offset) } + // we've called poll, we must pause or next poll may consume messages and set position + consumer.pause(consumer.assignment()) } consumer @@ -154,8 +156,9 @@ private case class SubscribePattern[K, V]( toSeek.asScala.foreach { case (topicPartition, offset) => consumer.seek(topicPartition, offset) } + // we've called poll, we must pause or next poll may consume messages and set position + consumer.pause(consumer.assignment()) } - consumer } } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 13827f68f2cb5..a658e0a10c4a4 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -223,7 +223,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( override def start(): Unit = { val c = consumer - c.poll(0) + assert(c.poll(0).isEmpty, "Driver shouldn't consume messages; pause if you poll during setup") if (currentOffsets.isEmpty) { currentOffsets = c.assignment().asScala.map { tp => tp -> c.position(tp) From aca55de0624f5634acb04f91636dce79af875fab Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Thu, 6 Oct 2016 20:20:43 -0500 Subject: [PATCH 2/2] [SPARK-17782][STREAMING][KAFKA] whitespace fix --- .../org/apache/spark/streaming/kafka010/ConsumerStrategy.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 18c6dc8b7e252..778c06ea16a2b 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -159,6 +159,7 @@ private case class SubscribePattern[K, V]( // we've called poll, we must pause or next poll may consume messages and set position consumer.pause(consumer.assignment()) } + consumer } }