Skip to content

Commit 786af2f

Browse files
authored
Merge pull request #4 from koeninger/kafka-source-deletion
test case that shows why the current implementation is wrong from an …
2 parents 881b206 + 8e86f98 commit 786af2f

File tree

2 files changed

+72
-1
lines changed

2 files changed

+72
-1
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,38 @@ class KafkaSourceSuite extends StreamTest with SharedSQLContext {
155155
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
156156
}
157157

158+
test("users will delete topics") {
159+
val topicPrefix = newTopic()
160+
val topic = topicPrefix + "-seems"
161+
val topic2 = topicPrefix + "-bad"
162+
testUtils.createTopic(topic, partitions = 5)
163+
testUtils.sendMessages(topic, Array("-1"))
164+
require(testUtils.getLatestOffsets(Set(topic)).size === 5)
165+
166+
val reader = spark
167+
.readStream
168+
.format("kafka")
169+
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
170+
.option("kafka.group.id", s"group-$topic")
171+
.option("kafka.auto.offset.reset", s"latest")
172+
.option("kafka.metadata.max.age.ms", "1")
173+
.option("subscribePattern", s"$topicPrefix-.*")
174+
175+
val kafka = reader.load().select("key", "value").as[(Array[Byte], Array[Byte])]
176+
val mapped = kafka.map(kv => new String(kv._2).toInt + 1)
177+
178+
testStream(mapped)(
179+
AddKafkaData(Set(topic), 1, 2, 3),
180+
CheckAnswer(2, 3, 4),
181+
Assert {
182+
testUtils.deleteTopic(topic, 5)
183+
testUtils.createTopic(topic2, partitions = 5)
184+
true
185+
},
186+
AddKafkaData(Set(topic2), 4, 5, 6),
187+
CheckAnswer(2, 3, 4, 5, 6, 7)
188+
)
189+
}
158190

159191
private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
160192

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import scala.util.Random
2929

3030
import kafka.admin.AdminUtils
3131
import kafka.api.Request
32-
import kafka.server.{KafkaConfig, KafkaServer}
32+
import kafka.common.TopicAndPartition
33+
import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint}
3334
import kafka.utils.ZkUtils
3435
import org.apache.kafka.clients.consumer.KafkaConsumer
3536
import org.apache.kafka.clients.producer._
@@ -171,6 +172,12 @@ class KafkaTestUtils extends Logging {
171172
createTopic(topic, 1)
172173
}
173174

175+
/** Delete a Kafka topic and wait until it is propagated to the whole cluster */
176+
def deleteTopic(topic: String, partitions: Int): Unit = {
177+
AdminUtils.deleteTopic(zkUtils, topic)
178+
verifyTopicDeletion(zkUtils, topic, partitions, List(this.server))
179+
}
180+
174181
/** Add new paritions to a Kafka topic */
175182
def addPartitions(topic: String, partitions: Int): Unit = {
176183
AdminUtils.addPartitions(zkUtils, topic, partitions)
@@ -234,6 +241,7 @@ class KafkaTestUtils extends Logging {
234241
props.put("zookeeper.connect", zkAddress)
235242
props.put("log.flush.interval.messages", "1")
236243
props.put("replica.socket.timeout.ms", "1500")
244+
props.put("delete.topic.enable", "true")
237245
props
238246
}
239247

@@ -257,6 +265,37 @@ class KafkaTestUtils extends Logging {
257265
props
258266
}
259267

268+
private def verifyTopicDeletion(
269+
zkUtils: ZkUtils,
270+
topic: String,
271+
numPartitions: Int,
272+
servers: Seq[KafkaServer]) {
273+
import ZkUtils._
274+
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
275+
def isDeleted(): Boolean = {
276+
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
277+
val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
278+
val topicPath = !zkUtils.pathExists(getTopicPath(topic))
279+
// ensure that the topic-partition has been deleted from all brokers' replica managers
280+
val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
281+
server.replicaManager.getPartition(tp.topic, tp.partition) == None))
282+
// ensure that logs from all replicas are deleted if delete topic is marked successful
283+
val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
284+
server.getLogManager().getLog(tp).isEmpty))
285+
// ensure that topic is removed from all cleaner offsets
286+
val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
287+
val checkpoints = server.getLogManager().logDirs.map { logDir =>
288+
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
289+
}
290+
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
291+
})
292+
deletePath && topicPath && replicaManager && logManager && cleaner
293+
}
294+
eventually(timeout(10.seconds)) {
295+
assert(isDeleted, s"$topic not deleted after timeout")
296+
}
297+
}
298+
260299
private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
261300
def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
262301
case Some(partitionState) =>

0 commit comments

Comments
 (0)