File tree Expand file tree Collapse file tree 1 file changed +5
-5
lines changed
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010 Expand file tree Collapse file tree 1 file changed +5
-5
lines changed Original file line number Diff line number Diff line change 1818package org .apache .spark .sql .kafka010
1919
2020import org .apache .spark .sql .Dataset
21- import org .apache .spark .sql .execution .datasources .v2 .StreamingDataSourceV2Relation
21+ import org .apache .spark .sql .execution .datasources .v2 .DataSourceV2ScanExec
2222import org .apache .spark .sql .streaming .Trigger
2323
2424// Run tests in KafkaSourceSuiteBase in continuous execution mode.
@@ -60,10 +60,10 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
6060 testUtils.createTopic(topic2, partitions = 5 )
6161 eventually(timeout(streamingTimeout)) {
6262 assert(
63- query.lastExecution.logical .collectFirst {
64- case r : StreamingDataSourceV2Relation
65- if r .readSupport.isInstanceOf [KafkaContinuousReadSupport ] =>
66- r.scanConfigBuilder.build() .asInstanceOf [KafkaContinuousScanConfig ]
63+ query.lastExecution.executedPlan .collectFirst {
64+ case scan : DataSourceV2ScanExec
65+ if scan .readSupport.isInstanceOf [KafkaContinuousReadSupport ] =>
66+ scan.scanConfig .asInstanceOf [KafkaContinuousScanConfig ]
6767 }.exists { config =>
6868 // Ensure the new topic is present and the old topic is gone.
6969 config.knownPartitions.exists(_.topic == topic2)
You can’t perform that action at this time.
0 commit comments