Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
<kafka.version>0.10.0.1</kafka.version>
<kafka.version>2.0.0-SNAPSHOT</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10 Source for Structured Streaming</name>
<name>Kafka 2.0.0 Source for Structured Streaming</name>
<url>http://spark.apache.org/</url>

<dependencies>
Expand Down Expand Up @@ -74,6 +74,11 @@
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this come from? Or it can be just a test dependency?

<artifactId>jetty-servlet</artifactId>
<version>9.3.9.v20160517</version>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.time.{Duration => JDuration}
import java.util.concurrent.{Executors, ThreadFactory}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader(
def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
// Poll to get the latest assigned partitions
consumer.poll(0)
consumer.poll(JDuration.ofMillis(0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you revert these changes? We don't use java.time.Duration in Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on the Kafka release we agree upon, I can revert.
Duration is recommended API for 2.0.0 release

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedyu just realized this is ofMillis rather than toMillis. We definitely cannot use it as this poll overload doesn't exist in previous versions and we want to support Kafka versions from 0.10 to 2.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing Why do you want to support Kafka clients jars from 0.10 to 2.0? Since newer clients jars support older brokers, we recommend people use the latest Kafka clients jar whenever possible.

Copy link
Member

@zsxwing zsxwing Jul 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. However, supporting all these versions is pretty cheap for Spark right now. Spark is using only APIs in 0.10. In addition, if the Kafka client version we pick up here has some critical issue, the user can just switch to an old version.

val partitions = consumer.assignment()
consumer.pause(partitions)
partitions.asScala.toSet
Expand All @@ -135,7 +136,7 @@ private[kafka010] class KafkaOffsetReader(
val fetched = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
consumer.poll(JDuration.ofMillis(0))
val partitions = consumer.assignment()
consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
Expand Down Expand Up @@ -177,7 +178,7 @@ private[kafka010] class KafkaOffsetReader(
def fetchEarliestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
consumer.poll(JDuration.ofMillis(0))
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the beginning")
Expand All @@ -196,7 +197,7 @@ private[kafka010] class KafkaOffsetReader(
def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
consumer.poll(JDuration.ofMillis(0))
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")
Expand All @@ -220,7 +221,7 @@ private[kafka010] class KafkaOffsetReader(
runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
consumer.poll(JDuration.ofMillis(0))
val partitions = consumer.assignment()
consumer.pause(partitions)
logDebug(s"\tPartitions assigned to consumer: $partitions")
Expand Down
4 changes: 2 additions & 2 deletions external/kafka-0-10-sql/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

# Set everything to be logged to the file target/unit-tests.log
log4j.rootCategory=INFO, file
log4j.rootCategory=DEBUG, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
Expand All @@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark-project.jetty=WARN

log4j.logger.org.apache.kafka=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.kafka010
import java.io.{File, IOException}
import java.lang.{Integer => JInt}
import java.net.InetSocketAddress
import java.time.{Duration => JDuration}
import java.util.{Map => JMap, Properties}
import java.util.concurrent.TimeUnit

Expand All @@ -29,13 +30,18 @@ import scala.util.Random

import kafka.admin.AdminUtils
import kafka.api.Request
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
Expand All @@ -61,6 +67,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private var zookeeper: EmbeddedZookeeper = _

private var zkUtils: ZkUtils = _
private var zkClient: KafkaZkClient = null
private var adminClient: AdminClient = null

// Kafka broker related configurations
private val brokerHost = "localhost"
Expand Down Expand Up @@ -99,7 +107,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false)
val zkSvr = s"$zkHost:$zkPort";
zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue, Time.SYSTEM)
zkReady = true
}

Expand All @@ -113,11 +123,14 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()
brokerPort = server.boundPort()
brokerPort = server.boundPort(new ListenerName("PLAINTEXT"))
(server, brokerPort)
}, new SparkConf(), "KafkaBroker")

brokerReady = true
val props = new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort")
adminClient = AdminClient.create(props)
}

/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
Expand Down Expand Up @@ -203,7 +216,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L

/** Add new partitions to a Kafka topic */
def addPartitions(topic: String, partitions: Int): Unit = {
AdminUtils.addPartitions(zkUtils, topic, partitions)
val existingAssignment = zkClient.getReplicaAssignmentForTopics(
collection.immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get replica assignment information via AdminClient too. I think we should try to avoid the internal ZkUtils and KafkaZkClient as much as we can.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
adminClient.createPartitions(Map(topic ->
NewPartitions.increaseTo(partitions)).asJava, actuallyDoIt)
// wait until metadata is propagated
(0 until partitions).foreach { p =>
waitUntilMetadataIsPropagated(topic, p)
Expand Down Expand Up @@ -260,7 +279,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
val kc = new KafkaConsumer[String, String](consumerConfiguration)
logInfo("Created consumer to get earliest offsets")
kc.subscribe(topics.asJavaCollection)
kc.poll(0)
kc.poll(JDuration.ofMillis(0))
val partitions = kc.assignment()
kc.pause(partitions)
kc.seekToBeginning(partitions)
Expand All @@ -274,7 +293,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
val kc = new KafkaConsumer[String, String](consumerConfiguration)
logInfo("Created consumer to get latest offsets")
kc.subscribe(topics.asJavaCollection)
kc.poll(0)
kc.poll(JDuration.ofMillis(0))
val partitions = kc.assignment()
kc.pause(partitions)
kc.seekToEnd(partitions)
Expand Down Expand Up @@ -327,7 +346,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))

import ZkUtils._
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
Expand All @@ -337,16 +356,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
server.replicaManager.getPartition(tp) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
val checkpoints = server.getLogManager().liveLogDirs.map { logDir =>
new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
Expand Down Expand Up @@ -379,7 +398,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
val tp = new TopicPartition(topic, partition)
val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
val leaderAndInSyncReplicas = leaderIsrAndControllerEpochMap(tp).leaderAndIsr

zkUtils.getLeaderForPartition(topic, partition).isDefined &&
Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
Expand Down