Skip to content

Commit e82784d

Browse files
tedyuzsxwing
authored andcommitted
[SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0
## What changes were proposed in this pull request? This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated. ## How was this patch tested? This PR uses existing Kafka related unit tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: tedyu <[email protected]> Closes #21488 from tedyu/master.
1 parent 1223a20 commit e82784d

File tree

4 files changed

+53
-15
lines changed

4 files changed

+53
-15
lines changed

external/kafka-0-10-sql/pom.xml

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
3030
<properties>
3131
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
32-
<kafka.version>0.10.0.1</kafka.version>
32+
<kafka.version>2.0.0</kafka.version>
3333
</properties>
3434
<packaging>jar</packaging>
35-
<name>Kafka 0.10 Source for Structured Streaming</name>
35+
<name>Kafka 0.10+ Source for Structured Streaming</name>
3636
<url>http://spark.apache.org/</url>
3737

3838
<dependencies>
@@ -73,13 +73,33 @@
7373
<artifactId>kafka_${scala.binary.version}</artifactId>
7474
<version>${kafka.version}</version>
7575
<scope>test</scope>
76+
<exclusions>
77+
<exclusion>
78+
<groupId>com.fasterxml.jackson.core</groupId>
79+
<artifactId>jackson-core</artifactId>
80+
</exclusion>
81+
<exclusion>
82+
<groupId>com.fasterxml.jackson.core</groupId>
83+
<artifactId>jackson-databind</artifactId>
84+
</exclusion>
85+
<exclusion>
86+
<groupId>com.fasterxml.jackson.core</groupId>
87+
<artifactId>jackson-annotations</artifactId>
88+
</exclusion>
89+
</exclusions>
7690
</dependency>
7791
<dependency>
7892
<groupId>net.sf.jopt-simple</groupId>
7993
<artifactId>jopt-simple</artifactId>
8094
<version>3.2</version>
8195
<scope>test</scope>
8296
</dependency>
97+
<dependency>
98+
<groupId>org.eclipse.jetty</groupId>
99+
<artifactId>jetty-servlet</artifactId>
100+
<version>${jetty.version}</version>
101+
<scope>test</scope>
102+
</dependency>
83103
<dependency>
84104
<groupId>org.scalacheck</groupId>
85105
<artifactId>scalacheck_${scala.binary.version}</artifactId>

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
4242
.format("kafka")
4343
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
4444
.option("kafka.metadata.max.age.ms", "1")
45+
.option("kafka.default.api.timeout.ms", "3000")
4546
.option("subscribePattern", s"$topicPrefix-.*")
4647
.option("failOnDataLoss", "false")
4748

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
290290
.format("kafka")
291291
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
292292
.option("kafka.metadata.max.age.ms", "1")
293+
.option("kafka.default.api.timeout.ms", "3000")
293294
.option("subscribePattern", s"$topicPrefix-.*")
294295
.option("failOnDataLoss", "false")
295296

@@ -467,6 +468,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
467468
.format("kafka")
468469
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
469470
.option("kafka.metadata.max.age.ms", "1")
471+
.option("kafka.default.api.timeout.ms", "3000")
470472
.option("subscribe", topic)
471473
// If a topic is deleted and we try to poll data starting from offset 0,
472474
// the Kafka consumer will just block until timeout and return an empty result.
@@ -1103,6 +1105,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
11031105
.option("kafka.metadata.max.age.ms", "1")
11041106
.option("subscribePattern", "stress.*")
11051107
.option("failOnDataLoss", "false")
1108+
.option("kafka.default.api.timeout.ms", "3000")
11061109
.load()
11071110
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
11081111
.as[(String, String)]
@@ -1173,7 +1176,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
11731176
// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
11741177
// least 30 seconds.
11751178
props.put("log.cleaner.backoff.ms", "100")
1176-
props.put("log.segment.bytes", "40")
1179+
// The size of RecordBatch V2 increases to support transactional write.
1180+
props.put("log.segment.bytes", "70")
11771181
props.put("log.retention.bytes", "40")
11781182
props.put("log.retention.check.interval.ms", "100")
11791183
props.put("delete.retention.ms", "10")
@@ -1215,6 +1219,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
12151219
.format("kafka")
12161220
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
12171221
.option("kafka.metadata.max.age.ms", "1")
1222+
.option("kafka.default.api.timeout.ms", "3000")
12181223
.option("subscribePattern", "failOnDataLoss.*")
12191224
.option("startingOffsets", "earliest")
12201225
.option("failOnDataLoss", "false")

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,15 @@ import scala.util.Random
2929

3030
import kafka.admin.AdminUtils
3131
import kafka.api.Request
32-
import kafka.common.TopicAndPartition
33-
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
32+
import kafka.server.{KafkaConfig, KafkaServer}
33+
import kafka.server.checkpoints.OffsetCheckpointFile
3434
import kafka.utils.ZkUtils
35+
import org.apache.kafka.clients.CommonClientConfigs
36+
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
3537
import org.apache.kafka.clients.consumer.KafkaConsumer
3638
import org.apache.kafka.clients.producer._
3739
import org.apache.kafka.common.TopicPartition
40+
import org.apache.kafka.common.network.ListenerName
3841
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
3942
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
4043
import org.scalatest.concurrent.Eventually._
@@ -61,6 +64,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
6164
private var zookeeper: EmbeddedZookeeper = _
6265

6366
private var zkUtils: ZkUtils = _
67+
private var adminClient: AdminClient = null
6468

6569
// Kafka broker related configurations
6670
private val brokerHost = "localhost"
@@ -113,17 +117,23 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
113117
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
114118
server = new KafkaServer(brokerConf)
115119
server.startup()
116-
brokerPort = server.boundPort()
120+
brokerPort = server.boundPort(new ListenerName("PLAINTEXT"))
117121
(server, brokerPort)
118122
}, new SparkConf(), "KafkaBroker")
119123

120124
brokerReady = true
125+
val props = new Properties()
126+
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
127+
adminClient = AdminClient.create(props)
121128
}
122129

123130
/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
124131
def setup(): Unit = {
125132
setupEmbeddedZookeeper()
126133
setupEmbeddedKafkaServer()
134+
eventually(timeout(60.seconds)) {
135+
assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
136+
}
127137
}
128138

129139
/** Teardown the whole servers, including Kafka broker and Zookeeper */
@@ -203,7 +213,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
203213

204214
/** Add new partitions to a Kafka topic */
205215
def addPartitions(topic: String, partitions: Int): Unit = {
206-
AdminUtils.addPartitions(zkUtils, topic, partitions)
216+
adminClient.createPartitions(
217+
Map(topic -> NewPartitions.increaseTo(partitions)).asJava,
218+
new CreatePartitionsOptions)
207219
// wait until metadata is propagated
208220
(0 until partitions).foreach { p =>
209221
waitUntilMetadataIsPropagated(topic, p)
@@ -296,6 +308,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
296308
props.put("replica.socket.timeout.ms", "1500")
297309
props.put("delete.topic.enable", "true")
298310
props.put("offsets.topic.num.partitions", "1")
311+
props.put("offsets.topic.replication.factor", "1")
312+
props.put("group.initial.rebalance.delay.ms", "10")
299313
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
300314
// See https://github.com/scala/bug/issues/10418
301315
withBrokerProps.foreach { case (k, v) => props.put(k, v) }
@@ -327,7 +341,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
327341
topic: String,
328342
numPartitions: Int,
329343
servers: Seq[KafkaServer]): Unit = {
330-
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
344+
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
331345

332346
import ZkUtils._
333347
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
@@ -337,16 +351,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
337351
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
338352
// ensure that the topic-partition has been deleted from all brokers' replica managers
339353
assert(servers.forall(server => topicAndPartitions.forall(tp =>
340-
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
354+
server.replicaManager.getPartition(tp) == None)),
341355
s"topic $topic still exists in the replica manager")
342356
// ensure that logs from all replicas are deleted if delete topic is marked successful
343357
assert(servers.forall(server => topicAndPartitions.forall(tp =>
344358
server.getLogManager().getLog(tp).isEmpty)),
345359
s"topic $topic still exists in log mananger")
346360
// ensure that topic is removed from all cleaner offsets
347361
assert(servers.forall(server => topicAndPartitions.forall { tp =>
348-
val checkpoints = server.getLogManager().logDirs.map { logDir =>
349-
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
362+
val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
363+
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
350364
}
351365
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
352366
}), s"checkpoint for topic $topic still exists")
@@ -379,11 +393,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
379393
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
380394
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
381395
case Some(partitionState) =>
382-
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
383-
384396
zkUtils.getLeaderForPartition(topic, partition).isDefined &&
385-
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
386-
leaderAndInSyncReplicas.isr.nonEmpty
397+
Request.isValidBrokerId(partitionState.basePartitionState.leader) &&
398+
!partitionState.basePartitionState.replicas.isEmpty
387399

388400
case _ =>
389401
false

0 commit comments

Comments
 (0)