diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index ca82c908f441b..1aa7181eaad09 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import java.io.Closeable +import java.time.Duration import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ @@ -71,7 +72,7 @@ private[kafka010] class InternalKafkaConsumer( // Seek to the offset because we may call seekToBeginning or seekToEnd before this. seek(offset) - val p = consumer.poll(pollTimeoutMs) + val p = consumer.poll(Duration.ofMillis(pollTimeoutMs)) val r = p.records(topicPartition) logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") val offsetAfterPoll = consumer.position(topicPartition) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 0179f4dd822f1..fd066941bafae 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.time.{Duration => jDuration} import java.util.concurrent.Executors import scala.collection.JavaConverters._ @@ -116,7 +117,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions - consumer.poll(0) + consumer.poll(jDuration.ZERO) val partitions = consumer.assignment() consumer.pause(partitions) partitions.asScala.toSet @@ -379,7 +380,7 @@ private[kafka010] class KafkaOffsetReader( withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(0) + consumer.poll(jDuration.ZERO) val partitions = consumer.assignment() if (!fetchingEarliestOffset) { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 47ae7be85ce02..4b43cb6bf4544 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io.{File, IOException} import java.net.{InetAddress, InetSocketAddress} import java.nio.charset.StandardCharsets +import java.time.Duration import java.util.{Collections, Properties, UUID} import java.util.concurrent.TimeUnit import javax.security.auth.login.Configuration @@ -455,7 +456,7 @@ class KafkaTestUtils( val kc = new KafkaConsumer[String, String](consumerConfiguration) logInfo("Created consumer to get earliest offsets") kc.subscribe(topics.asJavaCollection) - kc.poll(0) + kc.poll(Duration.ZERO) val partitions = kc.assignment() kc.pause(partitions) kc.seekToBeginning(partitions) @@ -469,7 +470,7 @@ class KafkaTestUtils( val kc = new KafkaConsumer[String, String](consumerConfiguration) logInfo("Created consumer to get latest offsets") kc.subscribe(topics.asJavaCollection) - kc.poll(0) + kc.poll(Duration.ZERO) val partitions = kc.assignment() kc.pause(partitions) kc.seekToEnd(partitions) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 3e32b592b3a3a..3a96c22285292 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka010 import java.{lang => jl, util => ju} +import java.time.Duration import java.util.Locale import scala.collection.JavaConverters._ @@ -105,7 +106,7 @@ private case class Subscribe[K, V]( val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE" try { - consumer.poll(0) + consumer.poll(Duration.ZERO) } catch { case x: NoOffsetForPartitionException if shouldSuppress => logWarning("Catching NoOffsetForPartitionException since " + @@ -159,7 +160,7 @@ private case class SubscribePattern[K, V]( val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE" try { - consumer.poll(0) + consumer.poll(Duration.ZERO) } catch { case x: NoOffsetForPartitionException if shouldSuppress => logWarning("Catching NoOffsetForPartitionException since " + diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index a449a8bb7213e..c03fe0cb0a3e9 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka010 import java.{ util => ju } +import java.time.Duration import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicReference @@ -168,7 +169,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( private def paranoidPoll(c: Consumer[K, V]): Unit = { // don't actually want to consume any messages, so pause all partitions c.pause(c.assignment()) - val msgs = c.poll(0) + val msgs = c.poll(Duration.ZERO) if (!msgs.isEmpty) { // position should be minimum offset per topicpartition msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) => diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 142e946188ace..09af5a0815147 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kafka010 import java.{util => ju} +import java.time.Duration import scala.collection.JavaConverters._ @@ -203,7 +204,7 @@ private[kafka010] class InternalKafkaConsumer[K, V]( } private def poll(timeout: Long): Unit = { - val p = consumer.poll(timeout) + val p = consumer.poll(Duration.ofMillis(timeout)) val r = p.records(topicPartition) logDebug(s"Polled ${p.partitions()} ${r.size}") buffer = r.listIterator diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 925327d9d58e6..225a15d99f7b1 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka010 import java.io.File import java.lang.{Long => JLong} +import java.time.{Duration => jDuration} import java.util.{Arrays, HashMap => JHashMap, Map => JMap, UUID} import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue @@ -255,7 +256,7 @@ class DirectKafkaStreamSuite preferredHosts, ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams.asScala), new DefaultPerPartitionConfig(sparkConf)) - s.consumer.poll(0) + s.consumer.poll(jDuration.ZERO) assert( s.consumer.position(topicPartition) >= offsetBeforeStart, "Start offset not from latest" @@ -311,7 +312,7 @@ class DirectKafkaStreamSuite kafkaParams.asScala, Map(topicPartition -> 11L)), new DefaultPerPartitionConfig(sparkConf)) - s.consumer.poll(0) + s.consumer.poll(jDuration.ZERO) assert( s.consumer.position(topicPartition) >= offsetBeforeStart, "Start offset not from latest" @@ -473,7 +474,7 @@ class DirectKafkaStreamSuite ssc.stop() val consumer = new KafkaConsumer[String, String](kafkaParams) consumer.subscribe(Arrays.asList(topic)) - consumer.poll(0) + consumer.poll(jDuration.ZERO) committed.asScala.foreach { case (k, v) => // commits are async, not exactly once