From feff1e417353b944e050de1acdbf7bd92f727feb Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 21 Feb 2025 16:43:11 +0800 Subject: [PATCH 1/7] DataFrameWriterV2 should respect the path option --- .../apache/spark/sql/internal/SQLConf.scala | 9 +++++ .../spark/sql/classic/DataFrameWriterV2.scala | 38 +++++++++---------- .../spark/sql/DataFrameWriterV2Suite.scala | 22 +++++++++++ 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5b42c157e598..f53e07cc9f92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -5545,6 +5545,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION = + buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption") + .internal() + .doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " + + "to the default table location.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala index e4efee93d2a0..5dee09175839 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala @@ -28,10 +28,12 @@ import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.TableWritePrivilege._ import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.IntegerType /** @@ -146,25 +148,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) /** @inheritdoc */ override def create(): Unit = { - val tableSpec = UnresolvedTableSpec( - properties = properties.toMap, - provider = provider, - optionExpression = OptionList(Seq.empty), - location = None, - comment = None, - collation = None, - serde = None, - external = false) runCommand( CreateTableAsSelect( UnresolvedIdentifier(tableName), partitioning.getOrElse(Seq.empty) ++ clustering, logicalPlan, - tableSpec, + buildTableSpec(), options.toMap, false)) } + private def buildTableSpec(): UnresolvedTableSpec = { + val ignorePathOption = sparkSession.sessionState.conf.getConf( + SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION) + UnresolvedTableSpec( + properties = properties.toMap, + provider = provider, + optionExpression = OptionList(Seq.empty), + location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"), + comment = None, + collation = None, + serde = None, + external = false) + } + /** @inheritdoc */ override def replace(): Unit = { internalReplace(orCreate = false) @@ -212,20 +219,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } private def internalReplace(orCreate: Boolean): Unit = { - val tableSpec = UnresolvedTableSpec( - properties = properties.toMap, - provider = provider, - optionExpression = OptionList(Seq.empty), - location = None, - comment = None, - collation = None, - serde = None, - external = false) runCommand(ReplaceTableAsSelect( UnresolvedIdentifier(tableName), partitioning.getOrElse(Seq.empty) ++ clustering, logicalPlan, - tableSpec, + buildTableSpec(), writeOptions = options.toMap, orCreate = orCreate)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index b7ac6af22a20..a0c5fc262564 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -839,4 +839,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED", parameters = Map("methodName" -> "`writeTo`")) } + + test("create/replace file source tables") { + Seq(true, false).foreach { ignorePath => + withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) { + withTable("t1", "t2") { + spark.range(10).writeTo("t1").using("json").create() + checkAnswer(spark.table("t1"), spark.range(10).toDF()) + + withTempPath { p => + val path = p.getCanonicalPath + spark.range(10).writeTo("t2").using("json").option("path", path).create() + checkAnswer(spark.table("t2"), spark.range(10).toDF()) + if (ignorePath) { + assert(!p.exists()) + } else { + checkAnswer(spark.read.json(path), spark.range(10).toDF()) + } + } + } + } + } + } } From 215a606de6e2f2966007520789c24a559b5be561 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 21 Feb 2025 16:48:55 +0800 Subject: [PATCH 2/7] Update sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala --- .../scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index a0c5fc262564..81ef200b042f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -840,7 +840,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo parameters = Map("methodName" -> "`writeTo`")) } - test("create/replace file source tables") { + test("SPARK-51281: create/replace file source tables") { Seq(true, false).foreach { ignorePath => withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) { withTable("t1", "t2") { From 88299aff6cda6f55b9938c3d3363ec8495f73a3e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 24 Feb 2025 10:44:37 +0800 Subject: [PATCH 3/7] Apply suggestions from code review --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f53e07cc9f92..e9bba9fd372d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -5550,7 +5550,7 @@ object SQLConf { .internal() .doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " + "to the default table location.") - .version("4.0.0") + .version("3.5.5") .booleanConf .createWithDefault(false) From 8f1298494928c75ccdf4f84c3c13df2293214ff5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 24 Feb 2025 21:22:46 +0800 Subject: [PATCH 4/7] Update DataFrameWriterV2Suite.scala --- .../org/apache/spark/sql/DataFrameWriterV2Suite.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 81ef200b042f..ec86c29986da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -841,20 +841,24 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo } test("SPARK-51281: create/replace file source tables") { + def checkResults(df: DataFrame): Unit = { + checkAnswer(df, spark.range(10).toDF()) + } + Seq(true, false).foreach { ignorePath => withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) { withTable("t1", "t2") { spark.range(10).writeTo("t1").using("json").create() - checkAnswer(spark.table("t1"), spark.range(10).toDF()) + checkResults(spark.table("t1")) withTempPath { p => val path = p.getCanonicalPath spark.range(10).writeTo("t2").using("json").option("path", path).create() - checkAnswer(spark.table("t2"), spark.range(10).toDF()) + checkResults(spark.table("t2")) if (ignorePath) { assert(!p.exists()) } else { - checkAnswer(spark.read.json(path), spark.range(10).toDF()) + checkResults(spark.read.json(path)) } } } From d2cc6e5c3524e1c2ebf182c9d1ddf329723be647 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 25 Feb 2025 13:28:46 +0800 Subject: [PATCH 5/7] Update DataFrameWriterV2Suite.scala --- .../scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index ec86c29986da..bfa53dc3da1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -844,7 +844,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo def checkResults(df: DataFrame): Unit = { checkAnswer(df, spark.range(10).toDF()) } - + Seq(true, false).foreach { ignorePath => withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) { withTable("t1", "t2") { From f046b2e66dbb0490f1ce88ca0e2e24c6d3871741 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 26 Feb 2025 18:42:08 +0800 Subject: [PATCH 6/7] Update sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala --- .../scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index bfa53dc3da1c..41d4d4dec8b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -840,7 +840,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo parameters = Map("methodName" -> "`writeTo`")) } - test("SPARK-51281: create/replace file source tables") { + test("SPARK-51281: DataFrameWriterV2 should respect the path option") { def checkResults(df: DataFrame): Unit = { checkAnswer(df, spark.range(10).toDF()) } From 87028833bf645005d3b28a0c6c7ccb18a47103ec Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 27 Feb 2025 17:55:56 +0800 Subject: [PATCH 7/7] Update sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e9bba9fd372d..984f4da20170 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -5550,7 +5550,7 @@ object SQLConf { .internal() .doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " + "to the default table location.") - .version("3.5.5") + .version("3.5.6") .booleanConf .createWithDefault(false)