Skip to content

Commit 881b206

Browse files
committed
Stress test for adding new topics
1 parent 6bc2994 commit 881b206

File tree

2 files changed

+43
-12
lines changed

2 files changed

+43
-12
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext {
3636
private val topicId = new AtomicInteger(0)
3737
private var testUtils: KafkaTestUtils = _
3838

39-
override val streamingTimeout = 10.seconds
39+
override val streamingTimeout = 30.seconds
4040

4141
override def beforeAll(): Unit = {
4242
super.beforeAll()
@@ -75,11 +75,20 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext {
7575
}
7676

7777
test("stress test with multiple topics and partitions") {
78-
val topics = (1 to 5).map(i => s"stress$i").toSet
79-
var partitionRange = (1, 5)
78+
val topicId = new AtomicInteger(1)
79+
80+
def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
81+
82+
@volatile var topics = (1 to 5).map(_ => newStressTopic).toSet
83+
84+
@volatile var partitionRange = (1, 5)
85+
86+
def newPartitionRange: (Int, Int) = (partitionRange._1 + 5, partitionRange._2 + 5)
87+
8088
def randomPartitions: Int = {
8189
Random.nextInt(partitionRange._2 + 1 - partitionRange._1) + partitionRange._1
8290
}
91+
8392
topics.foreach { topic =>
8493
testUtils.createTopic(topic, partitions = randomPartitions)
8594
testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
@@ -91,8 +100,8 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext {
91100
.format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
92101
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
93102
.option("kafka.group.id", s"group-stress-test")
94-
.option("subscribe", topics.mkString(","))
95103
.option("kafka.metadata.max.age.ms", "1")
104+
.option("subscribePattern", "stress.*")
96105
.load()
97106
.select("key", "value")
98107
.as[(Array[Byte], Array[Byte])]
@@ -102,13 +111,20 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext {
102111
runStressTest(
103112
mapped,
104113
d => {
105-
if (Random.nextInt(5) == 0) {
106-
partitionRange = (partitionRange._1 + 5, partitionRange._2 + 5)
107-
val addPartitions = topics.toSeq.map(_ => randomPartitions)
108-
AddKafkaData(topics, d: _*)(
109-
ensureDataInMultiplePartition = false, addPartitions = Some(addPartitions))
110-
} else {
111-
AddKafkaData(topics, d: _*)(ensureDataInMultiplePartition = false)
114+
Random.nextInt(5) match {
115+
case 0 =>
116+
partitionRange = newPartitionRange
117+
val addPartitions = topics.toSeq.map(_ => randomPartitions)
118+
AddKafkaData(topics, d: _*)(
119+
ensureDataInMultiplePartition = false, addPartitions = Some(addPartitions))
120+
case 1 =>
121+
topics = topics + newStressTopic
122+
partitionRange = newPartitionRange
123+
val addPartitions = topics.toSeq.map(_ => randomPartitions)
124+
AddKafkaData(topics, d: _*)(
125+
ensureDataInMultiplePartition = false, addPartitions = Some(addPartitions))
126+
case _ =>
127+
AddKafkaData(topics, d: _*)(ensureDataInMultiplePartition = false)
112128
}
113129
},
114130
iterations = 50)
@@ -214,16 +230,27 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext {
214230
)
215231
}
216232

233+
/**
234+
* Add data to Kafka. If any topic in `topics` does not exist, it will be created automatically.
235+
*
236+
* `addPartitions` is the new partition numbers of the topics. The caller should make sure using
237+
* a bigger partition number. Otherwise, it will throw an exception.
238+
*/
217239
case class AddKafkaData(topics: Set[String], data: Int*)
218240
(implicit ensureDataInMultiplePartition: Boolean = false,
219241
addPartitions: Option[Seq[Int]] = None) extends AddData {
220242

221243
override def addData(query: Option[StreamExecution]): (Source, Offset) = {
244+
val allTopics = testUtils.getAllTopics().toSet
222245
if (addPartitions.nonEmpty) {
223246
require(topics.size == addPartitions.get.size,
224247
s"$addPartitions should have the same size of $topics")
225248
topics.zip(addPartitions.get).foreach { case (topic, partitions) =>
226-
testUtils.addPartitions(topic, partitions)
249+
if (allTopics.contains(topic)) {
250+
testUtils.addPartitions(topic, partitions)
251+
} else {
252+
testUtils.createTopic(topic, partitions)
253+
}
227254
}
228255
}
229256
require(

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ class KafkaTestUtils extends Logging {
162162
}
163163
}
164164

165+
def getAllTopics(): Seq[String] = {
166+
zkUtils.getAllTopics()
167+
}
168+
165169
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
166170
def createTopic(topic: String): Unit = {
167171
createTopic(topic, 1)

0 commit comments

Comments
 (0)