diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index a59eca25fe28e..8d39704c61d4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -43,11 +43,13 @@ object LastOptions { var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null + var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil def clear(): Unit = { parameters = null + sinkParameters = null schema = null partitionColumns = null reset(mockStreamSourceProvider) @@ -101,7 +103,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { - LastOptions.parameters = parameters + LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} @@ -170,20 +172,19 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { LastOptions.clear() - val query = df.writeStream + df.writeStream .format("org.apache.spark.sql.streaming.test") .option("opt1", "5") .options(Map("opt2" -> "4")) .options(map) .option("checkpointLocation", newMetadataDir) .start() + .stop() - assert(LastOptions.parameters("opt1") == "5") - assert(LastOptions.parameters("opt2") == "4") - assert(LastOptions.parameters("opt3") == "3") - assert(LastOptions.parameters.contains("checkpointLocation")) - - query.stop() + assert(LastOptions.sinkParameters("opt1") == "5") + assert(LastOptions.sinkParameters("opt2") == "4") + assert(LastOptions.sinkParameters("opt3") == "3") + assert(LastOptions.sinkParameters.contains("checkpointLocation")) } test("SPARK-32832: later option should override earlier options for load()") { @@ -204,7 +205,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .load() assert(LastOptions.parameters.isEmpty) - val query = ds.writeStream + ds.writeStream .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .option("paTh", "1") @@ -213,8 +214,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("patH", "4") .option("path", "5") .start() - assert(LastOptions.parameters("path") == "5") - query.stop() + .stop() + assert(LastOptions.sinkParameters("path") == "5") } test("partitioning") { @@ -787,13 +788,13 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { withTempDir { checkpointPath => withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true", SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) { - val query = df.writeStream + df.writeStream .format("org.apache.spark.sql.streaming.test") .option("path", "tmp4") .start("tmp5") + .stop() // The legacy behavior overwrites the path option. - assert(LastOptions.parameters("path") == "tmp5") - query.stop() + assert(LastOptions.sinkParameters("path") == "tmp5") } } }