-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25214][SS]Fix the issue that Kafka v2 source may return duplicated records when failOnDataLoss=false
#22207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray | ||
| } | ||
|
|
||
| override def count(): Long = offsetRanges.map(_.size).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These methods are never used as Dataset always uses this RDD:
| rdd.mapPartitionsInternal { iter => |
MapPartitionsRDD just calls the default RDD implementation. In addition, they may return wrong answers when failOnDataLoss=false. Hence, I just removed them.
| } | ||
| } | ||
|
|
||
| class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to KafkaDontFailOnDataLossSuite.scala
| } | ||
| } | ||
|
|
||
| class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from KafkaMicroBatchSourceSuite.scala. I also moved the set up codes to KafkaMissingOffsetsTest to share with KafkaDontFailOnDataLossSuite.
|
Test build #95177 has finished for PR 22207 at commit
|
|
Test build #95179 has finished for PR 22207 at commit
|
| val result = spark.table(table).as[String].collect().toList | ||
| assert(result.distinct.size === result.size, s"$result contains duplicated records") | ||
| // Make sure Kafka did remove some records so that this test is valid. | ||
| assert(result.size > 0 && result.size < 50) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you ensure that the above configure retention policy will not completely delete all records?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked Kafka codes and it will keep at least one segment for a topic. I also did a simple test to make sure it will not delete all records: Added Thread.sleep(120000) after eventually(timeout(60.seconds)) { assert( testUtils.getEarliestOffsets(Set(topic)).head._2 > 0, "Kafka didn't delete records after 1 minute") } and the assertion still passed.
tdas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. thanks for finding this bug. Just a few nits in my comments.
| } else { | ||
| spark.read | ||
| .format("kafka") | ||
| .option("kafka.bootstrap.servers", testUtils.brokerAddress) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dedup these options into map... just to make sure they are never in consistent.
| .start() | ||
| try { | ||
| eventually(timeout(60.seconds)) { | ||
| assert(spark.table(table).as[String].collect().contains("49")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesnt processAllAvailable work in continuous processing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know it works!
| protected def startStream(ds: Dataset[Int]) = { | ||
| ds.writeStream.foreach(new ForeachWriter[Int] { | ||
|
|
||
| override def open(partitionId: Long, version: Long): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make single line.
| Thread.sleep(Random.nextInt(500)) | ||
| } | ||
|
|
||
| override def close(errorOrNull: Throwable): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make single line.
|
Test build #95224 has finished for PR 22207 at commit
|
|
Thanks! Merging to master and |
|
I just realized the Kafka source v2 is not in 2.3 :) |
…turn duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query. ## How was this patch tested? Jenkins. Closes #22230 from zsxwing/SPARK-25214-2. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
…cated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false` because it doesn't skip missing offsets. This PR fixes the issue and also adds regression tests for all Kafka readers. ## How was this patch tested? New tests. Closes apache#22207 from zsxwing/SPARK-25214. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
…turn duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? This is a follow up PR for apache#22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query. ## How was this patch tested? Jenkins. Closes apache#22230 from zsxwing/SPARK-25214-2. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
What changes were proposed in this pull request?
When there are missing offsets, Kafka v2 source may return duplicated records when
failOnDataLoss=falsebecause it doesn't skip missing offsets.This PR fixes the issue and also adds regression tests for all Kafka readers.
How was this patch tested?
New tests.