Skip to content

Commit 516fd4a

Browse files
committed
fix test
1 parent c3abd70 commit 516fd4a

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class ContinuousExecution(
5454
sparkSession, name, checkpointRoot, analyzedPlan, sink,
5555
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
5656

57-
@volatile protected var continuousSources: Seq[ContinuousReader] = _
57+
@volatile protected var continuousSources: Seq[ContinuousReader] = Seq()
5858
override protected def sources: Seq[BaseStreamingSource] = continuousSources
5959

6060
override lazy val logicalPlan: LogicalPlan = {
@@ -69,7 +69,7 @@ class ContinuousExecution(
6969
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
7070
})
7171
case StreamingRelationV2(_, sourceName, _, _, _) =>
72-
throw new AnalysisException(
72+
throw new UnsupportedOperationException(
7373
s"Data source $sourceName does not support continuous processing.")
7474
}
7575
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -169,18 +169,19 @@ class StreamingDataSourceV2Suite extends StreamTest {
169169
writeFormat: String,
170170
trigger: Trigger,
171171
errorMsg: String) = {
172-
val ex = intercept[StreamingQueryException] {
173-
spark.readStream
174-
.format(readFormat)
175-
.load()
176-
.writeStream
177-
.format(writeFormat)
178-
.trigger(trigger)
179-
.start()
180-
.processAllAvailable()
172+
val query = spark.readStream
173+
.format(readFormat)
174+
.load()
175+
.writeStream
176+
.format(writeFormat)
177+
.trigger(trigger)
178+
.start()
179+
180+
eventually(timeout(streamingTimeout)) {
181+
assert(query.exception.isDefined)
182+
assert(query.exception.get.cause != null)
183+
assert(query.exception.get.cause.getMessage.contains(errorMsg))
181184
}
182-
assert(ex.cause != null)
183-
assert(ex.cause.getMessage.contains(errorMsg))
184185
}
185186

186187
// Get a list of (read, write, trigger) tuples for test cases.

0 commit comments

Comments
 (0)