Skip to content

Commit 3290d2d

Browse files
jerryshaotdas
authored andcommitted
[SPARK-6211][Streaming] Add Python Kafka API unit test
Refactor the Kafka unit test and add Python API support. CC tdas davies please help to review, thanks a lot. Author: jerryshao <[email protected]> Author: Saisai Shao <[email protected]> Closes #4961 from jerryshao/SPARK-6211 and squashes the following commits: ee4b919 [jerryshao] Fixed newly merged issue 82c756e [jerryshao] Address the comments 92912d1 [jerryshao] Address the commits 0708bb1 [jerryshao] Fix rebase issue 40b47a3 [Saisai Shao] Style fix f889657 [Saisai Shao] Update the code according 8a2f3e2 [jerryshao] Address the issues 0f1b7ce [jerryshao] Still fix the bug 61a04f0 [jerryshao] Fix bugs and address the issues 64d9877 [jerryshao] Fix rebase bugs 8ad442f [jerryshao] Add kafka-assembly in run-tests 6020b00 [jerryshao] Add more debug info in Shell 8102d6e [jerryshao] Fix bug in Jenkins test fde1213 [jerryshao] Code style changes 5536f95 [jerryshao] Refactor the Kafka unit test and add Python Kafka unittest support
1 parent e236081 commit 3290d2d

File tree

12 files changed

+502
-311
lines changed

12 files changed

+502
-311
lines changed

dev/run-tests

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ CURRENT_BLOCK=$BLOCK_BUILD
173173
build/mvn $HIVE_BUILD_ARGS clean package -DskipTests
174174
else
175175
echo -e "q\n" \
176-
| build/sbt $HIVE_BUILD_ARGS package assembly/assembly \
176+
| build/sbt $HIVE_BUILD_ARGS package assembly/assembly streaming-kafka-assembly/assembly \
177177
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
178178
fi
179179
}
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka
19+
20+
import java.io.File
21+
import java.lang.{Integer => JInt}
22+
import java.net.InetSocketAddress
23+
import java.util.{Map => JMap}
24+
import java.util.Properties
25+
import java.util.concurrent.TimeoutException
26+
27+
import scala.annotation.tailrec
28+
import scala.language.postfixOps
29+
import scala.util.control.NonFatal
30+
31+
import kafka.admin.AdminUtils
32+
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
33+
import kafka.serializer.StringEncoder
34+
import kafka.server.{KafkaConfig, KafkaServer}
35+
import kafka.utils.ZKStringSerializer
36+
import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
37+
import org.I0Itec.zkclient.ZkClient
38+
39+
import org.apache.spark.{Logging, SparkConf}
40+
import org.apache.spark.streaming.Time
41+
import org.apache.spark.util.Utils
42+
43+
/**
44+
* This is a helper class for Kafka test suites. This has the functionality to set up
45+
* and tear down local Kafka servers, and to push data using Kafka producers.
46+
*
47+
* The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
48+
*/
49+
private class KafkaTestUtils extends Logging {
50+
51+
// Zookeeper related configurations
52+
private val zkHost = "localhost"
53+
private var zkPort: Int = 0
54+
private val zkConnectionTimeout = 6000
55+
private val zkSessionTimeout = 6000
56+
57+
private var zookeeper: EmbeddedZookeeper = _
58+
59+
private var zkClient: ZkClient = _
60+
61+
// Kafka broker related configurations
62+
private val brokerHost = "localhost"
63+
private var brokerPort = 9092
64+
private var brokerConf: KafkaConfig = _
65+
66+
// Kafka broker server
67+
private var server: KafkaServer = _
68+
69+
// Kafka producer
70+
private var producer: Producer[String, String] = _
71+
72+
// Flag to test whether the system is correctly started
73+
private var zkReady = false
74+
private var brokerReady = false
75+
76+
def zkAddress: String = {
77+
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
78+
s"$zkHost:$zkPort"
79+
}
80+
81+
def brokerAddress: String = {
82+
assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
83+
s"$brokerHost:$brokerPort"
84+
}
85+
86+
def zookeeperClient: ZkClient = {
87+
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
88+
Option(zkClient).getOrElse(
89+
throw new IllegalStateException("Zookeeper client is not yet initialized"))
90+
}
91+
92+
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
93+
private def setupEmbeddedZookeeper(): Unit = {
94+
// Zookeeper server startup
95+
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
96+
// Get the actual zookeeper binding port
97+
zkPort = zookeeper.actualPort
98+
zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
99+
ZKStringSerializer)
100+
zkReady = true
101+
}
102+
103+
// Set up the Embedded Kafka server
104+
private def setupEmbeddedKafkaServer(): Unit = {
105+
assert(zkReady, "Zookeeper should be set up beforehand")
106+
107+
// Kafka broker startup
108+
Utils.startServiceOnPort(brokerPort, port => {
109+
brokerPort = port
110+
brokerConf = new KafkaConfig(brokerConfiguration)
111+
server = new KafkaServer(brokerConf)
112+
server.startup()
113+
(server, port)
114+
}, new SparkConf(), "KafkaBroker")
115+
116+
brokerReady = true
117+
}
118+
119+
/** setup the whole embedded servers, including Zookeeper and Kafka brokers */
120+
def setup(): Unit = {
121+
setupEmbeddedZookeeper()
122+
setupEmbeddedKafkaServer()
123+
}
124+
125+
/** Teardown the whole servers, including Kafka broker and Zookeeper */
126+
def teardown(): Unit = {
127+
brokerReady = false
128+
zkReady = false
129+
130+
if (producer != null) {
131+
producer.close()
132+
producer = null
133+
}
134+
135+
if (server != null) {
136+
server.shutdown()
137+
server = null
138+
}
139+
140+
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
141+
142+
if (zkClient != null) {
143+
zkClient.close()
144+
zkClient = null
145+
}
146+
147+
if (zookeeper != null) {
148+
zookeeper.shutdown()
149+
zookeeper = null
150+
}
151+
}
152+
153+
/** Create a Kafka topic and wait until it propagated to the whole cluster */
154+
def createTopic(topic: String): Unit = {
155+
AdminUtils.createTopic(zkClient, topic, 1, 1)
156+
// wait until metadata is propagated
157+
waitUntilMetadataIsPropagated(topic, 0)
158+
}
159+
160+
/** Java-friendly function for sending messages to the Kafka broker */
161+
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
162+
import scala.collection.JavaConversions._
163+
sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*))
164+
}
165+
166+
/** Send the messages to the Kafka broker */
167+
def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
168+
val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
169+
sendMessages(topic, messages)
170+
}
171+
172+
/** Send the array of messages to the Kafka broker */
173+
def sendMessages(topic: String, messages: Array[String]): Unit = {
174+
producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
175+
producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
176+
producer.close()
177+
producer = null
178+
}
179+
180+
private def brokerConfiguration: Properties = {
181+
val props = new Properties()
182+
props.put("broker.id", "0")
183+
props.put("host.name", "localhost")
184+
props.put("port", brokerPort.toString)
185+
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
186+
props.put("zookeeper.connect", zkAddress)
187+
props.put("log.flush.interval.messages", "1")
188+
props.put("replica.socket.timeout.ms", "1500")
189+
props
190+
}
191+
192+
private def producerConfiguration: Properties = {
193+
val props = new Properties()
194+
props.put("metadata.broker.list", brokerAddress)
195+
props.put("serializer.class", classOf[StringEncoder].getName)
196+
props
197+
}
198+
199+
// A simplified version of scalatest eventually, rewritten here to avoid adding extra test
200+
// dependency
201+
def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
202+
def makeAttempt(): Either[Throwable, T] = {
203+
try {
204+
Right(func)
205+
} catch {
206+
case e if NonFatal(e) => Left(e)
207+
}
208+
}
209+
210+
val startTime = System.currentTimeMillis()
211+
@tailrec
212+
def tryAgain(attempt: Int): T = {
213+
makeAttempt() match {
214+
case Right(result) => result
215+
case Left(e) =>
216+
val duration = System.currentTimeMillis() - startTime
217+
if (duration < timeout.milliseconds) {
218+
Thread.sleep(interval.milliseconds)
219+
} else {
220+
throw new TimeoutException(e.getMessage)
221+
}
222+
223+
tryAgain(attempt + 1)
224+
}
225+
}
226+
227+
tryAgain(1)
228+
}
229+
230+
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
231+
eventually(Time(10000), Time(100)) {
232+
assert(
233+
server.apis.metadataCache.containsTopicAndPartition(topic, partition),
234+
s"Partition [$topic, $partition] metadata not propagated after timeout"
235+
)
236+
}
237+
}
238+
239+
private class EmbeddedZookeeper(val zkConnect: String) {
240+
val snapshotDir = Utils.createTempDir()
241+
val logDir = Utils.createTempDir()
242+
243+
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
244+
val (ip, port) = {
245+
val splits = zkConnect.split(":")
246+
(splits(0), splits(1).toInt)
247+
}
248+
val factory = new NIOServerCnxnFactory()
249+
factory.configure(new InetSocketAddress(ip, port), 16)
250+
factory.startup(zookeeper)
251+
252+
val actualPort = factory.getLocalPort
253+
254+
def shutdown() {
255+
factory.shutdown()
256+
Utils.deleteRecursively(snapshotDir)
257+
Utils.deleteRecursively(logDir)
258+
}
259+
}
260+
}
261+

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,28 @@
4141

4242
public class JavaDirectKafkaStreamSuite implements Serializable {
4343
private transient JavaStreamingContext ssc = null;
44-
private transient KafkaStreamSuiteBase suiteBase = null;
44+
private transient KafkaTestUtils kafkaTestUtils = null;
4545

4646
@Before
4747
public void setUp() {
48-
suiteBase = new KafkaStreamSuiteBase() { };
49-
suiteBase.setupKafka();
50-
System.clearProperty("spark.driver.port");
51-
SparkConf sparkConf = new SparkConf()
52-
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
53-
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
48+
kafkaTestUtils = new KafkaTestUtils();
49+
kafkaTestUtils.setup();
50+
SparkConf sparkConf = new SparkConf()
51+
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
52+
ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
5453
}
5554

5655
@After
5756
public void tearDown() {
57+
if (ssc != null) {
5858
ssc.stop();
5959
ssc = null;
60-
System.clearProperty("spark.driver.port");
61-
suiteBase.tearDownKafka();
60+
}
61+
62+
if (kafkaTestUtils != null) {
63+
kafkaTestUtils.teardown();
64+
kafkaTestUtils = null;
65+
}
6266
}
6367

6468
@Test
@@ -74,7 +78,7 @@ public void testKafkaStream() throws InterruptedException {
7478
sent.addAll(Arrays.asList(topic2data));
7579

7680
HashMap<String, String> kafkaParams = new HashMap<String, String>();
77-
kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
81+
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
7882
kafkaParams.put("auto.offset.reset", "smallest");
7983

8084
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
@@ -147,8 +151,8 @@ private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long off
147151

148152
private String[] createTopicAndSendData(String topic) {
149153
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
150-
suiteBase.createTopic(topic);
151-
suiteBase.sendMessages(topic, data);
154+
kafkaTestUtils.createTopic(topic);
155+
kafkaTestUtils.sendMessages(topic, data);
152156
return data;
153157
}
154158
}

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,28 @@
3737

3838
public class JavaKafkaRDDSuite implements Serializable {
3939
private transient JavaSparkContext sc = null;
40-
private transient KafkaStreamSuiteBase suiteBase = null;
40+
private transient KafkaTestUtils kafkaTestUtils = null;
4141

4242
@Before
4343
public void setUp() {
44-
suiteBase = new KafkaStreamSuiteBase() { };
45-
suiteBase.setupKafka();
46-
System.clearProperty("spark.driver.port");
44+
kafkaTestUtils = new KafkaTestUtils();
45+
kafkaTestUtils.setup();
4746
SparkConf sparkConf = new SparkConf()
4847
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
4948
sc = new JavaSparkContext(sparkConf);
5049
}
5150

5251
@After
5352
public void tearDown() {
54-
sc.stop();
55-
sc = null;
56-
System.clearProperty("spark.driver.port");
57-
suiteBase.tearDownKafka();
53+
if (sc != null) {
54+
sc.stop();
55+
sc = null;
56+
}
57+
58+
if (kafkaTestUtils != null) {
59+
kafkaTestUtils.teardown();
60+
kafkaTestUtils = null;
61+
}
5862
}
5963

6064
@Test
@@ -66,7 +70,7 @@ public void testKafkaRDD() throws InterruptedException {
6670
String[] topic2data = createTopicAndSendData(topic2);
6771

6872
HashMap<String, String> kafkaParams = new HashMap<String, String>();
69-
kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
73+
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
7074

7175
OffsetRange[] offsetRanges = {
7276
OffsetRange.create(topic1, 0, 0, 1),
@@ -75,7 +79,7 @@ public void testKafkaRDD() throws InterruptedException {
7579

7680
HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>();
7781
HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>();
78-
String[] hostAndPort = suiteBase.brokerAddress().split(":");
82+
String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
7983
Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
8084
leaders.put(new TopicAndPartition(topic1, 0), broker);
8185
leaders.put(new TopicAndPartition(topic2, 0), broker);
@@ -144,8 +148,8 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception
144148

145149
private String[] createTopicAndSendData(String topic) {
146150
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
147-
suiteBase.createTopic(topic);
148-
suiteBase.sendMessages(topic, data);
151+
kafkaTestUtils.createTopic(topic);
152+
kafkaTestUtils.sendMessages(topic, data);
149153
return data;
150154
}
151155
}

0 commit comments

Comments
 (0)