diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 3b66694556af9..0e03eac410985 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -38,7 +38,7 @@ license: | - In Spark 3.1, when `spark.sql.ansi.enabled` is false, Spark always returns null if the sum of decimal type column overflows. In Spark 3.0 or earlier, in the case, the sum of decimal type column may return null or incorrect result, or even fails at runtime (depending on the actual query plan execution). - - In Spark 3.1, when loading a dataframe, `path` or `paths` option cannot coexist with `load()`'s path parameters. For example, `spark.read.format("csv").option("path", "/tmp").load("/tmp2")` or `spark.read.option("path", "/tmp").csv("/tmp2")` will throw `org.apache.spark.sql.AnalysisException`. In Spark version 3.0 and below, `path` option is overwritten if one path parameter is passed to `load()`, or `path` option is added to the overall paths if multiple path parameters are passed to `load()`. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.pathOptionBehavior.enabled` to `true`. + - In Spark 3.1, `path` option cannot coexist when the following methods are called with path parameter(s): `DataFrameReader.load()`, `DataFrameWriter.save()`, `DataStreamReader.load()`, or `DataStreamWriter.start()`. In addition, `paths` option cannot coexist for `DataFrameReader.load()`. For example, `spark.read.format("csv").option("path", "/tmp").load("/tmp2")` or `spark.read.option("path", "/tmp").csv("/tmp2")` will throw `org.apache.spark.sql.AnalysisException`. In Spark version 3.0 and below, `path` option is overwritten if one path parameter is passed to above methods; `path` option is added to the overall paths if multiple path parameters are passed to `DataFrameReader.load()`. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.pathOptionBehavior.enabled` to `true`. ## Upgrading from Spark SQL 3.0 to 3.0.1 diff --git a/python/pyspark/sql/tests/test_streaming.py b/python/pyspark/sql/tests/test_streaming.py index caac67d7efdf3..34ff92b323c73 100644 --- a/python/pyspark/sql/tests/test_streaming.py +++ b/python/pyspark/sql/tests/test_streaming.py @@ -68,9 +68,12 @@ def test_stream_read_options(self): def test_stream_read_options_overwrite(self): bad_schema = StructType([StructField("test", IntegerType(), False)]) schema = StructType([StructField("data", StringType(), False)]) - df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \ - .schema(bad_schema)\ - .load(path='python/test_support/sql/streaming', schema=schema, format='text') + # SPARK-32516 disables the overwrite behavior by default. + with self.sql_conf({"spark.sql.legacy.pathOptionBehavior.enabled": True}): + df = self.spark.readStream.format('csv')\ + .option('path', 'python/test_support/sql/fake')\ + .schema(bad_schema)\ + .load(path='python/test_support/sql/streaming', schema=schema, format='text') self.assertTrue(df.isStreaming) self.assertEqual(df.schema.simpleString(), "struct") @@ -110,10 +113,12 @@ def test_stream_save_options_overwrite(self): chk = os.path.join(tmpPath, 'chk') fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') - q = df.writeStream.option('checkpointLocation', fake1)\ - .format('memory').option('path', fake2) \ - .queryName('fake_query').outputMode('append') \ - .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) + # SPARK-32516 disables the overwrite behavior by default. + with self.sql_conf({"spark.sql.legacy.pathOptionBehavior.enabled": True}): + q = df.writeStream.option('checkpointLocation', fake1)\ + .format('memory').option('path', fake2) \ + .queryName('fake_query').outputMode('append') \ + .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: self.assertEqual(q.name, 'this_query') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3e82b8e12df02..47cd3c7d62a72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2716,8 +2716,9 @@ object SQLConf { buildConf("spark.sql.legacy.pathOptionBehavior.enabled") .internal() .doc("When true, \"path\" option is overwritten if one path parameter is passed to " + - "DataFramerReader.load(), or \"path\" option is added to the overall paths if multiple " + - "path parameters are passed to DataFramerReader.load()") + "DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or " + + "DataStreamWriter.start(). Also, \"path\" option is added to the overall paths if " + + "multiple path parameters are passed to DataFrameReader.load()") .version("3.1.0") .booleanConf .createWithDefault(false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 8d3a7eea05c77..5ffff20853180 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -255,7 +255,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { (extraOptions.contains("path") || extraOptions.contains("paths")) && paths.nonEmpty) { throw new AnalysisException("There is a 'path' or 'paths' option set and load() is called " + "with path parameters. Either remove the path option if it's the same as the path " + - "parameter, or add it to the load() parameter if you do want to read multiple paths.") + "parameter, or add it to the load() parameter if you do want to read multiple paths. " + + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") } val updatedPaths = if (!legacyPathOptionBehavior && paths.length == 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f463166a9f268..2da8814d66aea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -284,6 +285,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def save(path: String): Unit = { + if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior && + extraOptions.contains("path") && path.nonEmpty) { + throw new AnalysisException("There is a 'path' option set and save() is called with a path " + + "parameter. Either remove the path option, or call save() without the parameter. " + + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") + } this.extraOptions += ("path" -> path) save() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 2b0db4381c6e4..6122b96c9a0bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2} import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -239,6 +240,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * @since 2.0.0 */ def load(path: String): DataFrame = { + if (!sparkSession.sessionState.conf.legacyPathOptionBehavior && + extraOptions.contains("path") && path.nonEmpty) { + throw new AnalysisException("There is a 'path' option set and load() is called with a path" + + "parameter. Either remove the path option, or call load() without the parameter. " + + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") + } option("path", path).load() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 1d0ca4d9453a5..45250c50a970e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -266,6 +267,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * @since 2.0.0 */ def start(path: String): StreamingQuery = { + if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior && + extraOptions.contains("path") && path.nonEmpty) { + throw new AnalysisException("There is a 'path' option set and start() is called with a " + + "path parameter. Either remove the path option, or call start() without the parameter. " + + s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") + } option("path", path).start() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index f9fc540c2ab80..8f34106d3d8f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -109,6 +109,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { } class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { + import testImplicits._ private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath @@ -435,7 +436,6 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = { - import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext) val df = ms.toDF().toDF("a") val tableName = "test" @@ -703,4 +703,53 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { queries.foreach(_.stop()) } } + + test("SPARK-32516: 'path' cannot coexist with load()'s path parameter") { + def verifyLoadFails(f: => DataFrame): Unit = { + val e = intercept[AnalysisException](f) + assert(e.getMessage.contains( + "Either remove the path option, or call load() without the parameter")) + } + + verifyLoadFails(spark.readStream.option("path", "tmp1").parquet("tmp2")) + verifyLoadFails(spark.readStream.option("path", "tmp1").format("parquet").load("tmp2")) + + withClue("SPARK-32516: legacy behavior") { + withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .option("path", "tmp1") + .load("tmp2") + // The legacy behavior overwrites the path option. + assert(LastOptions.parameters("path") == "tmp2") + } + } + } + + test("SPARK-32516: 'path' cannot coexist with start()'s path parameter") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load("tmp1") + + val e = intercept[AnalysisException] { + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("path", "tmp2") + .start("tmp3") + .stop() + } + assert(e.getMessage.contains( + "Either remove the path option, or call start() without the parameter")) + + withClue("SPARK-32516: legacy behavior") { + withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .option("path", "tmp4") + .load("tmp5") + // The legacy behavior overwrites the path option. + assert(LastOptions.parameters("path") == "tmp5") + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 85036ac8476fe..c84d361024309 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -224,7 +224,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(LastOptions.parameters("opt3") == "3") } - test("SPARK-32364: later option should override earlier options") { + test("SPARK-32364: later option should override earlier options for load()") { spark.read .format("org.apache.spark.sql.test") .option("paTh", "1") @@ -249,15 +249,29 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } - test("SPARK-32364: path argument of save function should override all existing options") { + test("SPARK-32364: later option should override earlier options for save()") { Seq(1).toDF.write .format("org.apache.spark.sql.test") .option("paTh", "1") .option("PATH", "2") .option("Path", "3") .option("patH", "4") - .save("5") + .option("path", "5") + .save() assert(LastOptions.parameters("path") == "5") + + withClue("SPARK-32516: legacy path option behavior") { + withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") { + Seq(1).toDF.write + .format("org.apache.spark.sql.test") + .option("paTh", "1") + .option("PATH", "2") + .option("Path", "3") + .option("patH", "4") + .save("5") + assert(LastOptions.parameters("path") == "5") + } + } } test("pass partitionBy as options") { @@ -1157,4 +1171,17 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with } } } + + test("SPARK-32516: 'path' option cannot coexist with save()'s path parameter") { + def verifyLoadFails(f: => Unit): Unit = { + val e = intercept[AnalysisException](f) + assert(e.getMessage.contains( + "Either remove the path option, or call save() without the parameter")) + } + + val df = Seq(1).toDF + val path = "tmp" + verifyLoadFails(df.write.option("path", path).parquet(path)) + verifyLoadFails(df.write.option("path", path).format("parquet").save(path)) + } }