Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
<kafka.version>0.10.0.1</kafka.version>
<kafka.version>2.0.0-SNAPSHOT</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10 Source for Structured Streaming</name>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this line to reflect the change too

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ import scala.util.Random

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
Expand All @@ -61,6 +66,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private var zookeeper: EmbeddedZookeeper = _

private var zkUtils: ZkUtils = _
private var zkClient: KafkaZkClient = null
private var adminClient: AdminClient = null

// Kafka broker related configurations
private val brokerHost = "localhost"
Expand Down Expand Up @@ -96,10 +103,12 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
val zkSvr = s"$zkHost:$zkPort";
zookeeper = new EmbeddedZookeeper(zkSvr)
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM)
zkReady = true
}

Expand All @@ -113,11 +122,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
brokerPort = server.boundPort()
brokerPort = server.boundPort(new ListenerName("l"))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")

brokerReady = true
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "$brokerHost:$brokerPort")
adminClient = AdminClient.create(props)
}

/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
Expand Down Expand Up @@ -203,7 +215,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L

/** Add new partitions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
AdminUtils.addPartitions(zkUtils, topic, partitions)
val existingAssignment = zkClient.getReplicaAssignmentForTopics(
collection.immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get replica assignment information via AdminClient too. I think we should try to avoid the internal ZkUtils and KafkaZkClient as much as we can.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
adminClient.createPartitions(Map(topic ->
NewPartitions.increaseTo(partitions)).asJava, actuallyDoIt)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
Expand Down Expand Up @@ -327,7 +345,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))

import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
Expand All @@ -337,16 +355,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
server.replicaManager.getPartition(tp) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
Expand Down Expand Up @@ -379,7 +397,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
val tp = new TopicPartition(topic, partition)
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
val leaderAndInSyncReplicas = leaderIsrAndControllerEpochMap(tp).leaderAndIsr

zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
Expand Down