From e768164fca1c93ec0a99f7020e301368f798156c Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Sun, 7 Dec 2014 10:50:44 -0500 Subject: [PATCH 1/2] #2808 update kafka to version 0.8.2 --- external/kafka/pom.xml | 2 +- .../streaming/kafka/KafkaStreamSuite.scala | 38 +++++++++++-------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index b3f44471cd32..f1e848070164 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -44,7 +44,7 @@ org.apache.kafka kafka_${scala.binary.version} - 0.8.0 + 0.8.2-beta com.sun.jmx diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index b19c053ebfc4..ff7f5d5dd3e0 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -26,7 +26,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.common.{KafkaException, TopicAndPartition} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.{StringDecoder, StringEncoder} @@ -77,8 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin var bindSuccess: Boolean = false while(!bindSuccess) { try { - val brokerProps = getBrokerConfig() - brokerConf = new KafkaConfig(brokerProps) + brokerConf = new KafkaConfig(brokerConfig) server = new KafkaServer(brokerConf) logInfo("==================== 2 ====================") server.startup() @@ -123,27 +122,26 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin private def createTestMessage(topic: String, sent: Map[String, Int]) : Seq[KeyedMessage[String, String]] = { - val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { + (for ((s, freq) <- sent; i <- 0 until freq) yield { new KeyedMessage[String, String](topic, s) - } - messages.toSeq + }).toSeq } def createTopic(topic: String) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) logInfo("==================== 5 ====================") // wait until metadata is propagated - waitUntilMetadataIsPropagated(topic, 0) + waitUntilMetadataIsPropagated(Seq(server), topic, 0) } def produceAndSendMessage(topic: String, sent: Map[String, Int]) { - producer = new Producer[String, String](new ProducerConfig(getProducerConfig())) + producer = new Producer[String, String](new ProducerConfig(producerConfig)) producer.send(createTestMessage(topic, sent): _*) producer.close() logInfo("==================== 6 ====================") } - private def getBrokerConfig(): Properties = { + private def brokerConfig: Properties = { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") @@ -155,7 +153,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin props } - private def getProducerConfig(): Properties = { + private def producerConfig: Properties = { val brokerAddr = brokerConf.hostName + ":" + brokerConf.port val props = new Properties() props.put("metadata.broker.list", brokerAddr) @@ -163,13 +161,21 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin props } - private def waitUntilMetadataIsPropagated(topic: String, partition: Int) { + private def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int): Int = { + var leader: Int = -1 eventually(timeout(1000 milliseconds), interval(100 milliseconds)) { - assert( - server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)), - s"Partition [$topic, $partition] metadata not propagated after timeout" - ) + assert(servers.foldLeft(true) { + (result, server) => + val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition) + partitionStateOpt match { + case None => false + case Some(partitionState) => + leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader + result && leader >= 0 // is valid broker id + } + }, s"Partition [$topic, $partition] metadata not propagated after timeout") } + leader } class EmbeddedZookeeper(val zkConnect: String) { From 2e67c66b174bd1641ea8986edde3ce5598add612 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Thu, 5 Feb 2015 08:08:19 -0500 Subject: [PATCH 2/2] #SPARK-2808 Update to Kafka 0.8.2.0 GA from beta. --- external/kafka/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index f1e848070164..8e937e25a9f1 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -44,7 +44,7 @@ org.apache.kafka kafka_${scala.binary.version} - 0.8.2-beta + 0.8.2.0 com.sun.jmx