Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
<kafka.version>0.10.0.1</kafka.version>
<kafka.version>2.0.0-SNAPSHOT</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10 Source for Structured Streaming</name>
<name>Kafka 2.0.0 Source for Structured Streaming</name>
<url>http://spark.apache.org/</url>

<dependencies>
Expand Down Expand Up @@ -74,6 +74,11 @@
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this come from? Or it can be just a test dependency?

<artifactId>jetty-servlet</artifactId>
<version>9.3.9.v20160517</version>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ import scala.util.Random

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
Expand All @@ -61,6 +66,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private var zookeeper: EmbeddedZookeeper = _

private var zkUtils: ZkUtils = _
private var zkClient: KafkaZkClient = null
private var adminClient: AdminClient = null

// Kafka broker related configurations
private val brokerHost = "localhost"
Expand Down Expand Up @@ -99,7 +106,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
val zkSvr = s"$zkHost:$zkPort";
zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM)
zkReady = true
}

Expand All @@ -113,11 +122,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
brokerPort = server.boundPort()
brokerPort = server.boundPort(new ListenerName("CLIENT"))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")

brokerReady = true
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
adminClient = AdminClient.create(props)
}

/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
Expand Down Expand Up @@ -203,7 +215,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L

/** Add new partitions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
AdminUtils.addPartitions(zkUtils, topic, partitions)
val existingAssignment = zkClient.getReplicaAssignmentForTopics(
collection.immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get replica assignment information via AdminClient too. I think we should try to avoid the internal ZkUtils and KafkaZkClient as much as we can.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
adminClient.createPartitions(Map(topic ->
NewPartitions.increaseTo(partitions)).asJava, actuallyDoIt)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
Expand Down Expand Up @@ -327,7 +345,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))

import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
Expand All @@ -337,16 +355,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
server.replicaManager.getPartition(tp) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
Expand Down Expand Up @@ -379,7 +397,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
val tp = new TopicPartition(topic, partition)
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
val leaderAndInSyncReplicas = leaderIsrAndControllerEpochMap(tp).leaderAndIsr

zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
Expand Down