Skip to content

Commit 810d59c

Browse files
jose-torreszsxwing
authored andcommitted
[SPARK-24882][FOLLOWUP] Fix flaky synchronization in Kafka tests.
## What changes were proposed in this pull request? Fix flaky synchronization in Kafka tests - we need to use the scan config that was persisted rather than reconstructing it to identify the stream's current configuration. We caught most instances of this in the original PR, but this one slipped through. ## How was this patch tested? n/a Closes #22245 from jose-torres/fixflake. Authored-by: Jose Torres <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 381a967 commit 810d59c

File tree

1 file changed

+5
-5
lines changed

1 file changed

+5
-5
lines changed

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.kafka010
1919

2020
import org.apache.spark.sql.Dataset
21-
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
21+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
2222
import org.apache.spark.sql.streaming.Trigger
2323

2424
// Run tests in KafkaSourceSuiteBase in continuous execution mode.
@@ -60,10 +60,10 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
6060
testUtils.createTopic(topic2, partitions = 5)
6161
eventually(timeout(streamingTimeout)) {
6262
assert(
63-
query.lastExecution.logical.collectFirst {
64-
case r: StreamingDataSourceV2Relation
65-
if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
66-
r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
63+
query.lastExecution.executedPlan.collectFirst {
64+
case scan: DataSourceV2ScanExec
65+
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
66+
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
6767
}.exists { config =>
6868
// Ensure the new topic is present and the old topic is gone.
6969
config.knownPartitions.exists(_.topic == topic2)

0 commit comments

Comments
 (0)