Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
<kafka.version>0.10.0.1</kafka.version>
<kafka.version>2.0.0-SNAPSHOT</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10 Source for Structured Streaming</name>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this line to reflect the change too

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import scala.util.Random

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
Expand All @@ -61,6 +64,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private var zookeeper: EmbeddedZookeeper = _

private var zkUtils: ZkUtils = _
private var zkClient: KafkaZkClient = null
private var adminZkClient: AdminZkClient = null

// Kafka broker related configurations
private val brokerHost = "localhost"
Expand Down Expand Up @@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
val zkSvr = s"$zkHost:$zkPort";
zookeeper = new EmbeddedZookeeper(zkSvr)
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM)
adminZkClient = new AdminZkClient(zkClient)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the Java AdminClient instead of these internal classes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AdminClient is abstract.
KafkaAdminClient doesn't provide addPartitions.

Mind giving some pointer ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AdminClient.create gives you a concrete instance. createPartitions is the method you're looking for.

zkReady = true
}

Expand All @@ -113,7 +121,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
brokerPort = server.boundPort()
brokerPort = server.boundPort(new ListenerName("l"))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")

Expand Down Expand Up @@ -203,7 +211,12 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L

/** Add new partitions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
AdminUtils.addPartitions(zkUtils, topic, partitions)
val existingAssignment = zkClient.getReplicaAssignmentForTopics(
collection.immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get replica assignment information via AdminClient too. I think we should try to avoid the internal ZkUtils and KafkaZkClient as much as we can.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(),
partitions)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
Expand Down Expand Up @@ -327,7 +340,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))

import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
Expand All @@ -337,16 +350,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
server.replicaManager.getPartition(tp) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
Expand Down Expand Up @@ -379,7 +392,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
val tp = new TopicPartition(topic, partition)
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
val leaderAndInSyncReplicas = leaderIsrAndControllerEpochMap(tp).leaderAndIsr

zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
Expand Down