From 0a22686d9a388a21d5dd38513854341d3f37f738 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 3 Jun 2018 12:54:22 -0700 Subject: [PATCH 01/17] SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0 --- external/kafka-0-10-sql/pom.xml | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 37 +++++++++++++------ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 16bbc6db641c..57c43edda0e9 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,7 +29,7 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 - 0.10.0.1 + 2.0.0-SNAPSHOT jar Kafka 0.10 Source for Structured Streaming diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 75245943c493..ac840c9af631 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -29,13 +29,16 @@ import scala.util.Random import kafka.admin.AdminUtils import kafka.api.Request -import kafka.common.TopicAndPartition -import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils +import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.apache.kafka.common.utils.Time import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -61,6 +64,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private var zookeeper: EmbeddedZookeeper = _ private var zkUtils: ZkUtils = _ + private var zkClient: KafkaZkClient = null + private var adminZkClient: AdminZkClient = null // Kafka broker related configurations private val brokerHost = "localhost" @@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L // Set up the Embedded Zookeeper server and get the proper Zookeeper port private def setupEmbeddedZookeeper(): Unit = { // Zookeeper server startup - zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") + val zkSvr = s"$zkHost:$zkPort"; + zookeeper = new EmbeddedZookeeper(zkSvr) // Get the actual zookeeper binding port zkPort = zookeeper.actualPort - zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) + zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) + zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM) + adminZkClient = new AdminZkClient(zkClient) zkReady = true } @@ -113,7 +121,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(new ListenerName("l")) (server, brokerPort) }, new SparkConf(), "KafkaBroker") @@ -203,7 +211,12 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Add new partitions to a Kafka topic */ def addPartitions(topic: String, partitions: Int): Unit = { - AdminUtils.addPartitions(zkUtils, topic, partitions) + val existingAssignment = zkClient.getReplicaAssignmentForTopics( + collection.immutable.Set(topic)).map { + case (topicPartition, replicas) => topicPartition.partition -> replicas + } + adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), + partitions) // wait until metadata is propagated (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) @@ -327,7 +340,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = { - val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) + val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) import ZkUtils._ // wait until admin path for delete topic is deleted, signaling completion of topic deletion @@ -337,7 +350,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists") // ensure that the topic-partition has been deleted from all brokers' replica managers assert(servers.forall(server => topicAndPartitions.forall(tp => - server.replicaManager.getPartition(tp.topic, tp.partition) == None)), + server.replicaManager.getPartition(tp) == None)), s"topic $topic still exists in the replica manager") // ensure that logs from all replicas are deleted if delete topic is marked successful assert(servers.forall(server => topicAndPartitions.forall(tp => @@ -345,8 +358,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L s"topic $topic still exists in log mananger") // ensure that topic is removed from all cleaner offsets assert(servers.forall(server => topicAndPartitions.forall { tp => - val checkpoints = server.getLogManager().logDirs.map { logDir => - new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() + val checkpoints = server.getLogManager().liveLogDirs.map { logDir => + new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read() } checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) }), s"checkpoint for topic $topic still exists") @@ -379,7 +392,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { case Some(partitionState) => - val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr + val tp = new TopicPartition(topic, partition) + val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) + val leaderAndInSyncReplicas = leaderIsrAndControllerEpochMap(tp).leaderAndIsr zkUtils.getLeaderForPartition(topic, partition).isDefined && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && From f76da891e6eb004f92b0c423ba371ec971da7584 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 3 Jun 2018 15:36:24 -0700 Subject: [PATCH 02/17] Use AdminClient for creating partitions --- .../spark/sql/kafka010/KafkaTestUtils.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index ac840c9af631..ffb711101e99 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -32,7 +32,9 @@ import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils -import kafka.zk.{AdminZkClient, KafkaZkClient} +import kafka.zk.KafkaZkClient +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition @@ -65,7 +67,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private var zkUtils: ZkUtils = _ private var zkClient: KafkaZkClient = null - private var adminZkClient: AdminZkClient = null + private var adminClient: AdminClient = null // Kafka broker related configurations private val brokerHost = "localhost" @@ -107,7 +109,6 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L zkPort = zookeeper.actualPort zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM) - adminZkClient = new AdminZkClient(zkClient) zkReady = true } @@ -126,6 +127,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L }, new SparkConf(), "KafkaBroker") brokerReady = true + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "$brokerHost:$brokerPort") + adminClient = AdminClient.create(props) } /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ @@ -215,8 +219,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L collection.immutable.Set(topic)).map { case (topicPartition, replicas) => topicPartition.partition -> replicas } - adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), - partitions) + val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false) + adminClient.createPartitions(Map(topic -> + NewPartitions.increaseTo(partitions)).asJava, actuallyDoIt) // wait until metadata is propagated (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) From 062c6d00d9a7625a97d85f677ea03cc695d9c0dc Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 3 Jun 2018 16:16:05 -0700 Subject: [PATCH 03/17] Drop trailing semicolon --- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index ffb711101e99..e03b18bd6fa2 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -33,7 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils import kafka.zk.KafkaZkClient -import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ From 48f56982fddca40242961169e3aeb7407492da3c Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 5 Jun 2018 06:20:17 -0700 Subject: [PATCH 04/17] Correct assignment of zkPort --- external/kafka-0-10-sql/pom.xml | 5 +++++ .../org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 57c43edda0e9..ae4e69f46039 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -74,6 +74,11 @@ ${kafka.version} test + + org.eclipse.jetty + jetty-servlet + 9.3.9.v20160517 + net.sf.jopt-simple jopt-simple diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index e03b18bd6fa2..180b75d8f842 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -103,10 +103,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L // Set up the Embedded Zookeeper server and get the proper Zookeeper port private def setupEmbeddedZookeeper(): Unit = { // Zookeeper server startup - val zkSvr = s"$zkHost:$zkPort"; - zookeeper = new EmbeddedZookeeper(zkSvr) + zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort + val zkSvr = s"$zkHost:$zkPort"; zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM) zkReady = true @@ -122,7 +122,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort(new ListenerName("l")) + brokerPort = server.boundPort(new ListenerName("CLIENT")) (server, brokerPort) }, new SparkConf(), "KafkaBroker") From 758378e72ed9dccefbf191d464fd6cc40cc03645 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 5 Jun 2018 06:48:31 -0700 Subject: [PATCH 05/17] Add missing leading 's' --- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 180b75d8f842..3cff07ce53ce 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -128,7 +128,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L brokerReady = true val props = new Properties() - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "$brokerHost:$brokerPort") + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort") adminClient = AdminClient.create(props) } From b77398229a190fac7587970a803cfee974f5f5f4 Mon Sep 17 00:00:00 2001 From: tedyu Date: Wed, 6 Jun 2018 13:38:19 -0700 Subject: [PATCH 06/17] Change version of Kafka --- external/kafka-0-10-sql/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index ae4e69f46039..4f71ba28ca44 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -32,7 +32,7 @@ 2.0.0-SNAPSHOT jar - Kafka 0.10 Source for Structured Streaming + Kafka 2.0.0 Source for Structured Streaming http://spark.apache.org/ From 90745b27dd064580d1368a9efe1b056e8108ba58 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 7 Jun 2018 09:20:20 -0700 Subject: [PATCH 07/17] Correct listener name --- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 3cff07ce53ce..2d8e4df81a51 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -122,7 +122,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort(new ListenerName("CLIENT")) + brokerPort = server.boundPort(new ListenerName("PLAINTEXT")) (server, brokerPort) }, new SparkConf(), "KafkaBroker") From 15d23bbe57669e4a7f3e9e18e04f334641e49bbc Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 8 Jun 2018 15:56:09 -0700 Subject: [PATCH 08/17] Use Duration.ofMillis(0) for poll(0) --- .../apache/spark/sql/kafka010/KafkaOffsetReader.scala | 11 ++++++----- .../src/test/resources/log4j.properties | 4 ++-- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 5 +++-- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 82066697cb95..893a935e30c3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.kafka010 +import java.time.{Duration => JDuration} import java.{util => ju} import java.util.concurrent.{Executors, ThreadFactory} @@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions - consumer.poll(0) + consumer.poll(JDuration.ofMillis(0)) val partitions = consumer.assignment() consumer.pause(partitions) partitions.asScala.toSet @@ -135,7 +136,7 @@ private[kafka010] class KafkaOffsetReader( val fetched = runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(0) + consumer.poll(JDuration.ofMillis(0)) val partitions = consumer.assignment() consumer.pause(partitions) assert(partitions.asScala == partitionOffsets.keySet, @@ -177,7 +178,7 @@ private[kafka010] class KafkaOffsetReader( def fetchEarliestOffsets(): Map[TopicPartition, Long] = runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(0) + consumer.poll(JDuration.ofMillis(0)) val partitions = consumer.assignment() consumer.pause(partitions) logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning") @@ -196,7 +197,7 @@ private[kafka010] class KafkaOffsetReader( def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(0) + consumer.poll(JDuration.ofMillis(0)) val partitions = consumer.assignment() consumer.pause(partitions) logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.") @@ -220,7 +221,7 @@ private[kafka010] class KafkaOffsetReader( runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(0) + consumer.poll(JDuration.ofMillis(0)) val partitions = consumer.assignment() consumer.pause(partitions) logDebug(s"\tPartitions assigned to consumer: $partitions") diff --git a/external/kafka-0-10-sql/src/test/resources/log4j.properties b/external/kafka-0-10-sql/src/test/resources/log4j.properties index 75e3b53a093f..15df35b602d3 100644 --- a/external/kafka-0-10-sql/src/test/resources/log4j.properties +++ b/external/kafka-0-10-sql/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file +log4j.rootCategory=DEBUG, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log @@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{ # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.spark-project.jetty=WARN - +log4j.logger.org.apache.kafka=DEBUG diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 2d8e4df81a51..a4f165aef502 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010 import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress +import java.time.{Duration => JDuration} import java.util.{Map => JMap, Properties} import java.util.concurrent.TimeUnit @@ -278,7 +279,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L val kc = new KafkaConsumer[String, String](consumerConfiguration) logInfo("Created consumer to get earliest offsets") kc.subscribe(topics.asJavaCollection) - kc.poll(0) + kc.poll(JDuration.ofMillis(0)) val partitions = kc.assignment() kc.pause(partitions) kc.seekToBeginning(partitions) @@ -292,7 +293,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L val kc = new KafkaConsumer[String, String](consumerConfiguration) logInfo("Created consumer to get latest offsets") kc.subscribe(topics.asJavaCollection) - kc.poll(0) + kc.poll(JDuration.ofMillis(0)) val partitions = kc.assignment() kc.pause(partitions) kc.seekToEnd(partitions) From 7a04afedaf423f3f36d5f18e0bdee012cb39de0d Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 8 Jun 2018 16:02:38 -0700 Subject: [PATCH 09/17] Switch import order --- .../scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 893a935e30c3..45f74e54bde9 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.kafka010 -import java.time.{Duration => JDuration} import java.{util => ju} +import java.time.{Duration => JDuration} import java.util.concurrent.{Executors, ThreadFactory} import scala.collection.JavaConverters._ From 30eb1bf4ba702986f5943d64897be62b3752c4bc Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Jul 2018 11:05:06 -0700 Subject: [PATCH 10/17] Use Kafka 1.1.0 release --- external/kafka-0-10-sql/pom.xml | 7 +------ .../apache/spark/sql/kafka010/KafkaOffsetReader.scala | 11 +++++------ .../src/test/resources/log4j.properties | 4 ++-- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 4f71ba28ca44..954450c1be09 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,7 +29,7 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 - 2.0.0-SNAPSHOT + 1.1.0 jar Kafka 2.0.0 Source for Structured Streaming @@ -74,11 +74,6 @@ ${kafka.version} test - - org.eclipse.jetty - jetty-servlet - 9.3.9.v20160517 - net.sf.jopt-simple jopt-simple diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 45f74e54bde9..82066697cb95 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.time.{Duration => JDuration} import java.util.concurrent.{Executors, ThreadFactory} import scala.collection.JavaConverters._ @@ -116,7 +115,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions - consumer.poll(JDuration.ofMillis(0)) + consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) partitions.asScala.toSet @@ -136,7 +135,7 @@ private[kafka010] class KafkaOffsetReader( val fetched = runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(JDuration.ofMillis(0)) + consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) assert(partitions.asScala == partitionOffsets.keySet, @@ -178,7 +177,7 @@ private[kafka010] class KafkaOffsetReader( def fetchEarliestOffsets(): Map[TopicPartition, Long] = runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(JDuration.ofMillis(0)) + consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning") @@ -197,7 +196,7 @@ private[kafka010] class KafkaOffsetReader( def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(JDuration.ofMillis(0)) + consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.") @@ -221,7 +220,7 @@ private[kafka010] class KafkaOffsetReader( runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions - consumer.poll(JDuration.ofMillis(0)) + consumer.poll(0) val partitions = consumer.assignment() consumer.pause(partitions) logDebug(s"\tPartitions assigned to consumer: $partitions") diff --git a/external/kafka-0-10-sql/src/test/resources/log4j.properties b/external/kafka-0-10-sql/src/test/resources/log4j.properties index 15df35b602d3..75e3b53a093f 100644 --- a/external/kafka-0-10-sql/src/test/resources/log4j.properties +++ b/external/kafka-0-10-sql/src/test/resources/log4j.properties @@ -16,7 +16,7 @@ # # Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=DEBUG, file +log4j.rootCategory=INFO, file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.append=true log4j.appender.file.file=target/unit-tests.log @@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{ # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.spark-project.jetty=WARN -log4j.logger.org.apache.kafka=DEBUG + From dd50b67834a0c8ec2b9bf7e1c2233439f72315c9 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Jul 2018 11:13:46 -0700 Subject: [PATCH 11/17] Revert use of Duration in KafkaTestUtils.scala --- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index a4f165aef502..2d8e4df81a51 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.kafka010 import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.time.{Duration => JDuration} import java.util.{Map => JMap, Properties} import java.util.concurrent.TimeUnit @@ -279,7 +278,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L val kc = new KafkaConsumer[String, String](consumerConfiguration) logInfo("Created consumer to get earliest offsets") kc.subscribe(topics.asJavaCollection) - kc.poll(JDuration.ofMillis(0)) + kc.poll(0) val partitions = kc.assignment() kc.pause(partitions) kc.seekToBeginning(partitions) @@ -293,7 +292,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L val kc = new KafkaConsumer[String, String](consumerConfiguration) logInfo("Created consumer to get latest offsets") kc.subscribe(topics.asJavaCollection) - kc.poll(JDuration.ofMillis(0)) + kc.poll(0) val partitions = kc.assignment() kc.pause(partitions) kc.seekToEnd(partitions) From 66d617253cf2bd679e42225dddae81faa700f986 Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Jul 2018 17:46:45 -0700 Subject: [PATCH 12/17] fix test and clean up --- external/kafka-0-10-sql/pom.xml | 16 +++++++++++- .../kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 26 +++++-------------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 954450c1be09..b0e463e39d6d 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -32,7 +32,7 @@ 1.1.0 jar - Kafka 2.0.0 Source for Structured Streaming + Kafka 0.10+ Source for Structured Streaming http://spark.apache.org/ @@ -73,6 +73,20 @@ kafka_${scala.binary.version} ${kafka.version} test + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + net.sf.jopt-simple diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index c6412eac97db..e83a7b0171e1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -1089,7 +1089,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest { start + Random.nextInt(start + end - 1) } - test("stress test with multiple topics and partitions") { + ignore("stress test with multiple topics and partitions") { topics.foreach { topic => testUtils.createTopic(topic, partitions = nextInt(1, 6)) testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 2d8e4df81a51..b60082bcde23 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -32,7 +32,6 @@ import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils -import kafka.zk.KafkaZkClient import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions} import org.apache.kafka.clients.consumer.KafkaConsumer @@ -40,7 +39,6 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} -import org.apache.kafka.common.utils.Time import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -66,7 +64,6 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private var zookeeper: EmbeddedZookeeper = _ private var zkUtils: ZkUtils = _ - private var zkClient: KafkaZkClient = null private var adminClient: AdminClient = null // Kafka broker related configurations @@ -106,9 +103,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port zkPort = zookeeper.actualPort - val zkSvr = s"$zkHost:$zkPort"; - zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) - zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM) + zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) zkReady = true } @@ -215,13 +210,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Add new partitions to a Kafka topic */ def addPartitions(topic: String, partitions: Int): Unit = { - val existingAssignment = zkClient.getReplicaAssignmentForTopics( - collection.immutable.Set(topic)).map { - case (topicPartition, replicas) => topicPartition.partition -> replicas - } - val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false) - adminClient.createPartitions(Map(topic -> - NewPartitions.increaseTo(partitions)).asJava, actuallyDoIt) + adminClient.createPartitions( + Map(topic -> NewPartitions.increaseTo(partitions)).asJava, + new CreatePartitionsOptions) // wait until metadata is propagated (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) @@ -314,6 +305,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") props.put("offsets.topic.num.partitions", "1") + props.put("offsets.topic.replication.factor", "1") // Can not use properties.putAll(propsMap.asJava) in scala-2.12 // See https://github.com/scala/bug/issues/10418 withBrokerProps.foreach { case (k, v) => props.put(k, v) } @@ -397,13 +389,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { case Some(partitionState) => - val tp = new TopicPartition(topic, partition) - val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) - val leaderAndInSyncReplicas = leaderIsrAndControllerEpochMap(tp).leaderAndIsr - zkUtils.getLeaderForPartition(topic, partition).isDefined && - Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.nonEmpty + Request.isValidBrokerId(partitionState.basePartitionState.leader) && + !partitionState.basePartitionState.replicas.isEmpty case _ => false From e7318a9ac7597c0284d5b0732926fce5caec40ad Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Jul 2018 17:47:24 -0700 Subject: [PATCH 13/17] set a small group.initial.rebalance.delay.ms to speed up tests - Ryan --- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index b60082bcde23..82294905c24b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -131,6 +131,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L def setup(): Unit = { setupEmbeddedZookeeper() setupEmbeddedKafkaServer() + eventually(timeout(60.seconds)) { + assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds") + } } /** Teardown the whole servers, including Kafka broker and Zookeeper */ @@ -306,6 +309,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("delete.topic.enable", "true") props.put("offsets.topic.num.partitions", "1") props.put("offsets.topic.replication.factor", "1") + props.put("group.initial.rebalance.delay.ms", "10") // Can not use properties.putAll(propsMap.asJava) in scala-2.12 // See https://github.com/scala/bug/issues/10418 withBrokerProps.foreach { case (k, v) => props.put(k, v) } From 13a7884279103ed06778e4351616e316beb7566f Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 17 Jul 2018 17:48:04 -0700 Subject: [PATCH 14/17] disable stress tests and topic deletion tests; fix maven build - Ryan --- external/kafka-0-10-sql/pom.xml | 6 ++++++ .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index b0e463e39d6d..c6d262da6eb3 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -94,6 +94,12 @@ 3.2 test + + org.eclipse.jetty + jetty-servlet + ${jetty.version} + test + org.scalacheck scalacheck_${scala.binary.version} diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index e83a7b0171e1..c8bc403b6dc4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -277,7 +277,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } - test("subscribing topic by pattern with topic deletions") { + ignore("subscribing topic by pattern with topic deletions") { val topicPrefix = newTopic() val topic = topicPrefix + "-seems" val topic2 = topicPrefix + "-bad" @@ -455,7 +455,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { query.stop() } - test("delete a topic when a Spark job is running") { + ignore("delete a topic when a Spark job is running") { KafkaSourceSuite.collectedData.clear() val topic = newTopic() @@ -1209,7 +1209,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared }).start() } - test("stress test for failOnDataLoss=false") { + ignore("stress test for failOnDataLoss=false") { val reader = spark .readStream .format("kafka") From 241878c886f206dabc44fd5d55d3fe6908a35a3b Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Jul 2018 16:50:22 -0700 Subject: [PATCH 15/17] Ignore "subscribing topic by pattern with topic deletions" --- .../apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index aab8ec42189f..673a2918d1e0 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -29,7 +29,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { override val brokerProps = Map("auto.create.topics.enable" -> "false") - test("subscribing topic by pattern with topic deletions") { + ignore("subscribing topic by pattern with topic deletions") { val topicPrefix = newTopic() val topic = topicPrefix + "-seems" val topic2 = topicPrefix + "-bad" From 17386429150d26d838f6895ec9698b7176765ffc Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 20 Jul 2018 18:11:49 -0700 Subject: [PATCH 16/17] Upgrade to Kafka 1.1.1 release --- external/kafka-0-10-sql/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index c6d262da6eb3..c0538600758d 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,7 +29,7 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 - 1.1.0 + 1.1.1 jar Kafka 0.10+ Source for Structured Streaming From aa69915165d9aaca2bcb5d22fb2fc9467bf16826 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sat, 28 Jul 2018 18:39:56 -0700 Subject: [PATCH 17/17] Upgrade to Kafka 2.0.0 - Ryan --- external/kafka-0-10-sql/pom.xml | 2 +- .../sql/kafka010/KafkaContinuousSourceSuite.scala | 3 ++- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 15 ++++++++++----- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index c0538600758d..95500037c147 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,7 +29,7 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 - 1.1.1 + 2.0.0 jar Kafka 0.10+ Source for Structured Streaming diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 673a2918d1e0..ea2a2a84d22c 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -29,7 +29,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { override val brokerProps = Map("auto.create.topics.enable" -> "false") - ignore("subscribing topic by pattern with topic deletions") { + test("subscribing topic by pattern with topic deletions") { val topicPrefix = newTopic() val topic = topicPrefix + "-seems" val topic2 = topicPrefix + "-bad" @@ -42,6 +42,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false") diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index c8bc403b6dc4..0584386d81bd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -277,7 +277,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } - ignore("subscribing topic by pattern with topic deletions") { + test("subscribing topic by pattern with topic deletions") { val topicPrefix = newTopic() val topic = topicPrefix + "-seems" val topic2 = topicPrefix + "-bad" @@ -290,6 +290,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false") @@ -455,7 +456,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { query.stop() } - ignore("delete a topic when a Spark job is running") { + test("delete a topic when a Spark job is running") { KafkaSourceSuite.collectedData.clear() val topic = newTopic() @@ -467,6 +468,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribe", topic) // If a topic is deleted and we try to poll data starting from offset 0, // the Kafka consumer will just block until timeout and return an empty result. @@ -1089,7 +1091,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest { start + Random.nextInt(start + end - 1) } - ignore("stress test with multiple topics and partitions") { + test("stress test with multiple topics and partitions") { topics.foreach { topic => testUtils.createTopic(topic, partitions = nextInt(1, 6)) testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray) @@ -1103,6 +1105,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest { .option("kafka.metadata.max.age.ms", "1") .option("subscribePattern", "stress.*") .option("failOnDataLoss", "false") + .option("kafka.default.api.timeout.ms", "3000") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] @@ -1173,7 +1176,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at // least 30 seconds. props.put("log.cleaner.backoff.ms", "100") - props.put("log.segment.bytes", "40") + // The size of RecordBatch V2 increases to support transactional write. + props.put("log.segment.bytes", "70") props.put("log.retention.bytes", "40") props.put("log.retention.check.interval.ms", "100") props.put("delete.retention.ms", "10") @@ -1209,12 +1213,13 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared }).start() } - ignore("stress test for failOnDataLoss=false") { + test("stress test for failOnDataLoss=false") { val reader = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", "failOnDataLoss.*") .option("startingOffsets", "earliest") .option("failOnDataLoss", "false")