Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
if (record != null) {
nextRow = converter.toUnsafeRow(record)
nextOffset = record.offset + 1
true
} else {
false
Expand All @@ -352,7 +353,6 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(

override def get(): UnsafeRow = {
assert(nextRow != null)
nextOffset += 1
nextRow
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
}

override def count(): Long = offsetRanges.map(_.size).sum
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are never used as Dataset always uses this RDD:

and MapPartitionsRDD just calls the default RDD implementation. In addition, they may return wrong answers when failOnDataLoss=false. Hence, I just removed them.


override def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = {
val c = count
new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
}

override def isEmpty(): Boolean = count == 0L

override def take(num: Int): Array[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val nonEmptyPartitions =
this.partitions.map(_.asInstanceOf[KafkaSourceRDDPartition]).filter(_.offsetRange.size > 0)

if (num < 1 || nonEmptyPartitions.isEmpty) {
return new Array[ConsumerRecord[Array[Byte], Array[Byte]]](0)
}

// Determine in advance how many messages need to be taken from each partition
val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
val remain = num - result.values.sum
if (remain > 0) {
val taken = Math.min(remain, part.offsetRange.size)
result + (part.index -> taken.toInt)
} else {
result
}
}

val buf = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
val res = context.runJob(
this,
(tc: TaskContext, it: Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]) =>
it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
)
res.foreach(buf ++= _)
buf.toArray
}

override def getPreferredLocations(split: Partition): Seq[String] = {
val part = split.asInstanceOf[KafkaSourceRDDPartition]
part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.util.Random

import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}

/**
* This is a basic test trait which will set up a Kafka cluster that keeps only several records in
* a topic and ages out records very quickly. This is a helper trait to test
* "failonDataLoss=false" case with missing offsets.
*
* Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up
* records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer
* when running on a slow Jenkins machine) before records start to be removed. To make sure a test
* does see missing offsets, you can check the earliest offset in `eventually` and make sure it's
* not 0 rather than sleeping a hard-code duration.
*/
trait KafkaMissingOffsetsTest extends SharedSQLContext {

protected var testUtils: KafkaTestUtils = _

override def createSparkSession(): TestSparkSession = {
// Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf))
}

override def beforeAll(): Unit = {
super.beforeAll()
testUtils = new KafkaTestUtils {
override def brokerConfiguration: Properties = {
val props = super.brokerConfiguration
// Try to make Kafka clean up messages as fast as possible. However, there is a hard-code
// 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at
// least 30 seconds.
props.put("log.cleaner.backoff.ms", "100")
// The size of RecordBatch V2 increases to support transactional write.
props.put("log.segment.bytes", "70")
props.put("log.retention.bytes", "40")
props.put("log.retention.check.interval.ms", "100")
props.put("delete.retention.ms", "10")
props.put("log.flush.scheduler.interval.ms", "10")
props
}
}
testUtils.setup()
}

override def afterAll(): Unit = {
if (testUtils != null) {
testUtils.teardown()
testUtils = null
}
super.afterAll()
}
}

class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {

import testImplicits._

private val topicId = new AtomicInteger(0)

private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"

/**
* @param testStreamingQuery whether to test a streaming query or a batch query.
* @param writeToTable the function to write the specified [[DataFrame]] to the given table.
*/
private def verifyMissingOffsetsDontCauseDuplicatedRecords(
testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)

eventually(timeout(60.seconds)) {
assert(
testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
"Kafka didn't delete records after 1 minute")
}

val table = "DontFailOnDataLoss"
withTable(table) {
val kafkaOptions = Map(
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"kafka.metadata.max.age.ms" -> "1",
"subscribe" -> topic,
"startingOffsets" -> s"""{"$topic":{"0":0}}""",
"failOnDataLoss" -> "false",
"kafkaConsumer.pollTimeoutMs" -> "1000")
val df =
if (testStreamingQuery) {
val reader = spark.readStream.format("kafka")
kafkaOptions.foreach(kv => reader.option(kv._1, kv._2))
reader.load()
} else {
val reader = spark.read.format("kafka")
kafkaOptions.foreach(kv => reader.option(kv._1, kv._2))
reader.load()
}
writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
val result = spark.table(table).as[String].collect().toList
assert(result.distinct.size === result.size, s"$result contains duplicated records")
// Make sure Kafka did remove some records so that this test is valid.
assert(result.size > 0 && result.size < 50)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you ensure that the above configure retention policy will not completely delete all records?

Copy link
Member Author

@zsxwing zsxwing Aug 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked Kafka codes and it will keep at least one segment for a topic. I also did a simple test to make sure it will not delete all records: Added Thread.sleep(120000) after eventually(timeout(60.seconds)) { assert( testUtils.getEarliestOffsets(Set(topic)).head._2 > 0, "Kafka didn't delete records after 1 minute") } and the assertion still passed.

}
}

test("failOnDataLoss=false should not return duplicated records: v1") {
withSQLConf(
"spark.sql.streaming.disabledV2MicroBatchReaders" ->
classOf[KafkaSourceProvider].getCanonicalName) {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
val query = df.writeStream.format("memory").queryName(table).start()
try {
query.processAllAvailable()
} finally {
query.stop()
}
}
}
}

test("failOnDataLoss=false should not return duplicated records: v2") {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
val query = df.writeStream.format("memory").queryName(table).start()
try {
query.processAllAvailable()
} finally {
query.stop()
}
}
}

test("failOnDataLoss=false should not return duplicated records: continuous processing") {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) =>
val query = df.writeStream
.format("memory")
.queryName(table)
.trigger(Trigger.Continuous(100))
.start()
try {
query.processAllAvailable()
} finally {
query.stop()
}
}
}

test("failOnDataLoss=false should not return duplicated records: batch") {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) =>
df.write.saveAsTable(table)
}
}
}

class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from KafkaMicroBatchSourceSuite.scala. I also moved the set up codes to KafkaMissingOffsetsTest to share with KafkaDontFailOnDataLossSuite.


import testImplicits._

private val topicId = new AtomicInteger(0)

private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}"

protected def startStream(ds: Dataset[Int]) = {
ds.writeStream.foreach(new ForeachWriter[Int] {

override def open(partitionId: Long, version: Long): Boolean = true

override def process(value: Int): Unit = {
// Slow down the processing speed so that messages may be aged out.
Thread.sleep(Random.nextInt(500))
}

override def close(errorOrNull: Throwable): Unit = {}
}).start()
}

test("stress test for failOnDataLoss=false") {
val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.default.api.timeout.ms", "3000")
.option("subscribePattern", "failOnDataLoss.*")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.option("fetchOffset.retryIntervalMs", "3000")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val query = startStream(kafka.map(kv => kv._2.toInt))

val testTime = 1.minutes
val startTime = System.currentTimeMillis()
// Track the current existing topics
val topics = mutable.ArrayBuffer[String]()
// Track topics that have been deleted
val deletedTopics = mutable.Set[String]()
while (System.currentTimeMillis() - testTime.toMillis < startTime) {
Random.nextInt(10) match {
case 0 => // Create a new topic
val topic = newTopic()
topics += topic
// As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
// chance that a topic will be recreated after deletion due to the asynchronous update.
// Hence, always overwrite to handle this race condition.
testUtils.createTopic(topic, partitions = 1, overwrite = true)
logInfo(s"Create topic $topic")
case 1 if topics.nonEmpty => // Delete an existing topic
val topic = topics.remove(Random.nextInt(topics.size))
testUtils.deleteTopic(topic)
logInfo(s"Delete topic $topic")
deletedTopics += topic
case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted.
val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
deletedTopics -= topic
topics += topic
// As pushing messages into Kafka updates Zookeeper asynchronously, there is a small
// chance that a topic will be recreated after deletion due to the asynchronous update.
// Hence, always overwrite to handle this race condition.
testUtils.createTopic(topic, partitions = 1, overwrite = true)
logInfo(s"Create topic $topic")
case 3 =>
Thread.sleep(1000)
case _ => // Push random messages
for (topic <- topics) {
val size = Random.nextInt(10)
for (_ <- 0 until size) {
testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
}
}
}
// `failOnDataLoss` is `false`, we should not fail the query
if (query.exception.nonEmpty) {
throw query.exception.get
}
}

query.stop()
// `failOnDataLoss` is `false`, we should not fail the query
if (query.exception.nonEmpty) {
throw query.exception.get
}
}
}
Loading