Skip to content

Commit 74fb2ec

Browse files
jerryshaopwendell
authored andcommitted
[SPARK-3615][Streaming]Fix Kafka unit test hard coded Zookeeper port issue
Details can be seen in [SPARK-3615](https://issues.apache.org/jira/browse/SPARK-3615). Author: jerryshao <[email protected]> Closes apache#2483 from jerryshao/SPARK_3615 and squashes the following commits: 8555563 [jerryshao] Fix Kafka unit test hard coded Zookeeper port issue
1 parent bb96012 commit 74fb2ec

File tree

2 files changed

+34
-14
lines changed

2 files changed

+34
-14
lines changed

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void testKafkaStream() throws InterruptedException {
8181
Predef.<Tuple2<String, Object>>conforms()));
8282

8383
HashMap<String, String> kafkaParams = new HashMap<String, String>();
84-
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
84+
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
8585
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
8686
kafkaParams.put("auto.offset.reset", "smallest");
8787

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.{Properties, Random}
2424
import scala.collection.mutable
2525

2626
import kafka.admin.CreateTopicCommand
27-
import kafka.common.TopicAndPartition
27+
import kafka.common.{KafkaException, TopicAndPartition}
2828
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
2929
import kafka.utils.ZKStringSerializer
3030
import kafka.serializer.{StringDecoder, StringEncoder}
@@ -42,14 +42,13 @@ import org.apache.spark.util.Utils
4242
class KafkaStreamSuite extends TestSuiteBase {
4343
import KafkaTestUtils._
4444

45-
val zkConnect = "localhost:2181"
45+
val zkHost = "localhost"
46+
var zkPort: Int = 0
4647
val zkConnectionTimeout = 6000
4748
val zkSessionTimeout = 6000
4849

49-
val brokerPort = 9092
50-
val brokerProps = getBrokerConfig(brokerPort, zkConnect)
51-
val brokerConf = new KafkaConfig(brokerProps)
52-
50+
protected var brokerPort = 9092
51+
protected var brokerConf: KafkaConfig = _
5352
protected var zookeeper: EmbeddedZookeeper = _
5453
protected var zkClient: ZkClient = _
5554
protected var server: KafkaServer = _
@@ -59,16 +58,35 @@ class KafkaStreamSuite extends TestSuiteBase {
5958

6059
override def beforeFunction() {
6160
// Zookeeper server startup
62-
zookeeper = new EmbeddedZookeeper(zkConnect)
61+
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
62+
// Get the actual zookeeper binding port
63+
zkPort = zookeeper.actualPort
6364
logInfo("==================== 0 ====================")
64-
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
65+
66+
zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
67+
ZKStringSerializer)
6568
logInfo("==================== 1 ====================")
6669

6770
// Kafka broker startup
68-
server = new KafkaServer(brokerConf)
69-
logInfo("==================== 2 ====================")
70-
server.startup()
71-
logInfo("==================== 3 ====================")
71+
var bindSuccess: Boolean = false
72+
while(!bindSuccess) {
73+
try {
74+
val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
75+
brokerConf = new KafkaConfig(brokerProps)
76+
server = new KafkaServer(brokerConf)
77+
logInfo("==================== 2 ====================")
78+
server.startup()
79+
logInfo("==================== 3 ====================")
80+
bindSuccess = true
81+
} catch {
82+
case e: KafkaException =>
83+
if (e.getMessage != null && e.getMessage.contains("Socket server failed to bind to")) {
84+
brokerPort += 1
85+
}
86+
case e: Exception => throw new Exception("Kafka server create failed", e)
87+
}
88+
}
89+
7290
Thread.sleep(2000)
7391
logInfo("==================== 4 ====================")
7492
super.beforeFunction()
@@ -92,7 +110,7 @@ class KafkaStreamSuite extends TestSuiteBase {
92110
createTopic(topic)
93111
produceAndSendMessage(topic, sent)
94112

95-
val kafkaParams = Map("zookeeper.connect" -> zkConnect,
113+
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
96114
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
97115
"auto.offset.reset" -> "smallest")
98116

@@ -200,6 +218,8 @@ object KafkaTestUtils {
200218
factory.configure(new InetSocketAddress(ip, port), 16)
201219
factory.startup(zookeeper)
202220

221+
val actualPort = factory.getLocalPort
222+
203223
def shutdown() {
204224
factory.shutdown()
205225
Utils.deleteRecursively(snapshotDir)

0 commit comments

Comments
 (0)