Skip to content

Commit c17a8ff

Browse files
committed
[SPARK-25214][SS][FOLLOWUP] Fix the issue that Kafka v2 source may return duplicated records when failOnDataLoss=false
## What changes were proposed in this pull request? This is a follow up PR for #22207 to fix a potential flaky test. `processAllAvailable` doesn't work for continuous processing so we should not use it for a continuous query. ## How was this patch tested? Jenkins. Closes #22230 from zsxwing/SPARK-25214-2. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 6c66ab8 commit c17a8ff

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ trait KafkaMissingOffsetsTest extends SharedSQLContext {
8080
}
8181
}
8282

83-
class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
83+
class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest {
8484

8585
import testImplicits._
8686

@@ -165,7 +165,11 @@ class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
165165
.trigger(Trigger.Continuous(100))
166166
.start()
167167
try {
168-
query.processAllAvailable()
168+
// `processAllAvailable` doesn't work for continuous processing, so just wait until the last
169+
// record appears in the table.
170+
eventually(timeout(streamingTimeout)) {
171+
assert(spark.table(table).as[String].collect().contains("49"))
172+
}
169173
} finally {
170174
query.stop()
171175
}

0 commit comments

Comments
 (0)