diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 96c4ac310fba6..25ca186c65f04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -286,7 +286,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def save(path: String): Unit = { if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior && - extraOptions.contains("path") && path.nonEmpty) { + extraOptions.contains("path")) { throw new AnalysisException("There is a 'path' option set and save() is called with a path " + "parameter. Either remove the path option, or call save() without the parameter. " + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 4064df0b04d12..0c17dafeed683 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -241,7 +241,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo */ def load(path: String): DataFrame = { if (!sparkSession.sessionState.conf.legacyPathOptionBehavior && - extraOptions.contains("path") && path.nonEmpty) { + extraOptions.contains("path")) { throw new AnalysisException("There is a 'path' option set and load() is called with a path" + "parameter. Either remove the path option, or call load() without the parameter. " + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 45250c50a970e..491eef4e2dc5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -268,7 +268,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { */ def start(path: String): StreamingQuery = { if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior && - extraOptions.contains("path") && path.nonEmpty) { + extraOptions.contains("path")) { throw new AnalysisException("There is a 'path' option set and start() is called with a " + "path parameter. Either remove the path option, or call start() without the parameter. " + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 8f34106d3d8f5..f04deb242ba5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -712,7 +712,9 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } verifyLoadFails(spark.readStream.option("path", "tmp1").parquet("tmp2")) + verifyLoadFails(spark.readStream.option("path", "tmp1").parquet("")) verifyLoadFails(spark.readStream.option("path", "tmp1").format("parquet").load("tmp2")) + verifyLoadFails(spark.readStream.option("path", "tmp1").format("parquet").load("")) withClue("SPARK-32516: legacy behavior") { withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") { @@ -731,24 +733,35 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .load("tmp1") - val e = intercept[AnalysisException] { + def verifyStartFails(f: => StreamingQuery): Unit = { + val e = intercept[AnalysisException](f) + assert(e.getMessage.contains( + "Either remove the path option, or call start() without the parameter")) + } + + verifyStartFails( df.writeStream .format("org.apache.spark.sql.streaming.test") .option("path", "tmp2") - .start("tmp3") - .stop() - } - assert(e.getMessage.contains( - "Either remove the path option, or call start() without the parameter")) + .start("tmp3")) + verifyStartFails( + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("path", "tmp2") + .start("")) withClue("SPARK-32516: legacy behavior") { - withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") { - spark.readStream - .format("org.apache.spark.sql.streaming.test") - .option("path", "tmp4") - .load("tmp5") - // The legacy behavior overwrites the path option. - assert(LastOptions.parameters("path") == "tmp5") + withTempDir { checkpointPath => + withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true", + SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) { + val query = df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("path", "tmp4") + .start("tmp5") + // The legacy behavior overwrites the path option. + assert(LastOptions.parameters("path") == "tmp5") + query.stop() + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index c84d361024309..c4ca85d6237b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -1135,17 +1135,21 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } test("SPARK-32516: 'path' or 'paths' option cannot coexist with load()'s path parameters") { - def verifyLoadFails(f: () => DataFrame): Unit = { - val e = intercept[AnalysisException](f()) + def verifyLoadFails(f: => DataFrame): Unit = { + val e = intercept[AnalysisException](f) assert(e.getMessage.contains( "Either remove the path option if it's the same as the path parameter")) } val path = "/tmp" - verifyLoadFails(() => spark.read.option("path", path).parquet(path)) - verifyLoadFails(() => spark.read.option("path", path).format("parquet").load(path)) - verifyLoadFails(() => spark.read.option("paths", path).parquet(path)) - verifyLoadFails(() => spark.read.option("paths", path).format("parquet").load(path)) + verifyLoadFails(spark.read.option("path", path).parquet(path)) + verifyLoadFails(spark.read.option("path", path).parquet("")) + verifyLoadFails(spark.read.option("path", path).format("parquet").load(path)) + verifyLoadFails(spark.read.option("path", path).format("parquet").load("")) + verifyLoadFails(spark.read.option("paths", path).parquet(path)) + verifyLoadFails(spark.read.option("paths", path).parquet("")) + verifyLoadFails(spark.read.option("paths", path).format("parquet").load(path)) + verifyLoadFails(spark.read.option("paths", path).format("parquet").load("")) } test("SPARK-32516: legacy path option behavior in load()") { @@ -1182,6 +1186,8 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with val df = Seq(1).toDF val path = "tmp" verifyLoadFails(df.write.option("path", path).parquet(path)) + verifyLoadFails(df.write.option("path", path).parquet("")) verifyLoadFails(df.write.option("path", path).format("parquet").save(path)) + verifyLoadFails(df.write.option("path", path).format("parquet").save("")) } }