diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5b42c157e598..984f4da20170 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -5545,6 +5545,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION = + buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption") + .internal() + .doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " + + "to the default table location.") + .version("3.5.6") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala index e4efee93d2a0..5dee09175839 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala @@ -28,10 +28,12 @@ import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.TableWritePrivilege._ import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.IntegerType /** @@ -146,25 +148,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) /** @inheritdoc */ override def create(): Unit = { - val tableSpec = UnresolvedTableSpec( - properties = properties.toMap, - provider = provider, - optionExpression = OptionList(Seq.empty), - location = None, - comment = None, - collation = None, - serde = None, - external = false) runCommand( CreateTableAsSelect( UnresolvedIdentifier(tableName), partitioning.getOrElse(Seq.empty) ++ clustering, logicalPlan, - tableSpec, + buildTableSpec(), options.toMap, false)) } + private def buildTableSpec(): UnresolvedTableSpec = { + val ignorePathOption = sparkSession.sessionState.conf.getConf( + SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION) + UnresolvedTableSpec( + properties = properties.toMap, + provider = provider, + optionExpression = OptionList(Seq.empty), + location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"), + comment = None, + collation = None, + serde = None, + external = false) + } + /** @inheritdoc */ override def replace(): Unit = { internalReplace(orCreate = false) @@ -212,20 +219,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } private def internalReplace(orCreate: Boolean): Unit = { - val tableSpec = UnresolvedTableSpec( - properties = properties.toMap, - provider = provider, - optionExpression = OptionList(Seq.empty), - location = None, - comment = None, - collation = None, - serde = None, - external = false) runCommand(ReplaceTableAsSelect( UnresolvedIdentifier(tableName), partitioning.getOrElse(Seq.empty) ++ clustering, logicalPlan, - tableSpec, + buildTableSpec(), writeOptions = options.toMap, orCreate = orCreate)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index b7ac6af22a20..41d4d4dec8b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -839,4 +839,30 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED", parameters = Map("methodName" -> "`writeTo`")) } + + test("SPARK-51281: DataFrameWriterV2 should respect the path option") { + def checkResults(df: DataFrame): Unit = { + checkAnswer(df, spark.range(10).toDF()) + } + + Seq(true, false).foreach { ignorePath => + withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) { + withTable("t1", "t2") { + spark.range(10).writeTo("t1").using("json").create() + checkResults(spark.table("t1")) + + withTempPath { p => + val path = p.getCanonicalPath + spark.range(10).writeTo("t2").using("json").option("path", path).create() + checkResults(spark.table("t2")) + if (ignorePath) { + assert(!p.exists()) + } else { + checkResults(spark.read.json(path)) + } + } + } + } + } + } }