Skip to content

Commit a524256

Browse files
committed
don't use query.processAllAvailable for continuous processing
1 parent f8346d2 commit a524256

File tree

1 file changed

+6
-2
lines changed

1 file changed

+6
-2
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ trait KafkaMissingOffsetsTest extends SharedSQLContext {
8080
}
8181
}
8282

83-
class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
83+
class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest {
8484

8585
import testImplicits._
8686

@@ -165,7 +165,11 @@ class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
165165
.trigger(Trigger.Continuous(100))
166166
.start()
167167
try {
168-
query.processAllAvailable()
168+
// `processAllAvailable` doesn't work for continuous processing, so just wait until the last
169+
// record appears in the table.
170+
eventually(timeout(streamingTimeout)) {
171+
assert(spark.table(table).as[String].collect().contains("49"))
172+
}
169173
} finally {
170174
query.stop()
171175
}

0 commit comments

Comments
 (0)