Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[kafka010] case class KafkaSource(
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt

private val offsetFetchAttemptIntervalMs =
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "100").toLong
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increase this value because if not sleeping enough time, it's easy to fail with the same error when deleting a topic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 100ms enough or should this be increase to 1 second?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased the default value to 1 second.


private val maxOffsetsPerTrigger =
sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}

abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {

Expand Down Expand Up @@ -811,6 +812,12 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared

private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"

override def createSparkSession(): TestSparkSession = {
// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context",
sparkConf.set("spark.sql.testkey", "true")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this key for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only used on SQLConfSuite. Removed it from this test.

}

override def beforeAll(): Unit = {
super.beforeAll()
testUtils = new KafkaTestUtils {
Expand Down Expand Up @@ -839,7 +846,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared
}
}

ignore("stress test for failOnDataLoss=false") {
test("stress test for failOnDataLoss=false") {
val reader = spark
.readStream
.format("kafka")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,36 +286,56 @@ class KafkaTestUtils extends Logging {
props
}

/** Assert topic is deleted in all places, e.g, brokers, zookeeper. */
private def assertTopicDeleted(
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]): Unit = {
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))

import ZkUtils._
Copy link
Member Author

@zsxwing zsxwing Dec 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this method to assertTopicDeleted and moved out of this method. I also rewrote it to make it tell us which line fails.

// wait until admin path for delete topic is deleted, signaling completion of topic deletion
assert(
!zkUtils.pathExists(getDeleteTopicPath(topic)),
s"${getDeleteTopicPath(topic)} still exists")
assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists")
// ensure that the topic-partition has been deleted from all brokers' replica managers
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
s"topic $topic still exists in the replica manager")
// ensure that logs from all replicas are deleted if delete topic is marked successful
assert(servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty)),
s"topic $topic still exists in log mananger")
// ensure that topic is removed from all cleaner offsets
assert(servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
}), s"checkpoint for topic $topic still exists")
// ensure the topic is gone
assert(
!zkUtils.getAllTopics().contains(topic),
s"topic $topic still exists on zookeeper")
}

private def verifyTopicDeletion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its hard to differentiate the semantics of verifyTopicDeletion and assertTopicDeleted. How about renamed them to verifyTopicDeletionWithRetries and verifyTopicDeletion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

zkUtils: ZkUtils,
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]) {
import ZkUtils._
val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
def isDeleted(): Boolean = {
// wait until admin path for delete topic is deleted, signaling completion of topic deletion
val deletePath = !zkUtils.pathExists(getDeleteTopicPath(topic))
val topicPath = !zkUtils.pathExists(getTopicPath(topic))
// ensure that the topic-partition has been deleted from all brokers' replica managers
val replicaManager = servers.forall(server => topicAndPartitions.forall(tp =>
server.replicaManager.getPartition(tp.topic, tp.partition) == None))
// ensure that logs from all replicas are deleted if delete topic is marked successful
val logManager = servers.forall(server => topicAndPartitions.forall(tp =>
server.getLogManager().getLog(tp).isEmpty))
// ensure that topic is removed from all cleaner offsets
val cleaner = servers.forall(server => topicAndPartitions.forall { tp =>
val checkpoints = server.getLogManager().logDirs.map { logDir =>
new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
}
checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp))
})
// ensure the topic is gone
val deleted = !zkUtils.getAllTopics().contains(topic)
deletePath && topicPath && replicaManager && logManager && cleaner && deleted
}
eventually(timeout(60.seconds)) {
assert(isDeleted, s"$topic not deleted after timeout")
eventually(timeout(60.seconds), interval(200.millis)) {
try {
assertTopicDeleted(topic, numPartitions, servers)
} catch {
case e: Throwable =>
// As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
// chance that a topic will be recreated after deletion due to the asynchronous update.
// Hence, delete the topic and retry.
AdminUtils.deleteTopic(zkUtils, topic)
throw e
}
}
}

Expand All @@ -331,7 +351,7 @@ class KafkaTestUtils extends Logging {
case _ =>
false
}
eventually(timeout(10.seconds)) {
eventually(timeout(60.seconds)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increase this timeout as it seems 10 seconds is not enough sometimes.

assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
*/
protected implicit def sqlContext: SQLContext = _spark.sqlContext

protected def createSparkSession: TestSparkSession = {
new TestSparkSession(
sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
}

/**
* Initialize the [[TestSparkSession]].
*/
protected override def beforeAll(): Unit = {
SparkSession.sqlListener.set(null)
if (_spark == null) {
_spark = new TestSparkSession(
sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
_spark = createSparkSession
}
// Ensure we have initialized the context before calling parent code
super.beforeAll()
Expand Down