Skip to content

Commit dff4031

Browse files
committed
[SPARK-51281][SQL] DataFrameWriterV2 should respect the path option
Unlike `DataFrameWriter.saveAsTable` where we explicitly get the "path" option and treat it as table location, `DataFrameWriterV2` doesn't do it and treats the "path" option as a normal option which doesn't have any real impact. This PR fixes it, and adds a legacy config to restore the old behavior. bug fix Yes, now `DataFrameWriterV2` can correctly write data to the specified path for file source tables. new test no Closes #50040 from cloud-fan/prop. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit a3671e5) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 1ecdcae commit dff4031

File tree

3 files changed

+53
-18
lines changed

3 files changed

+53
-18
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4424,6 +4424,15 @@ object SQLConf {
44244424
.booleanConf
44254425
.createWithDefault(false)
44264426

4427+
val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION =
4428+
buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption")
4429+
.internal()
4430+
.doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " +
4431+
"to the default table location.")
4432+
.version("3.5.6")
4433+
.booleanConf
4434+
.createWithDefault(false)
4435+
44274436
/**
44284437
* Holds information about keys that have been deprecated.
44294438
*

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import org.apache.spark.annotation.Experimental
2424
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
2626
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OptionList, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, UnresolvedTableSpec}
27+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2728
import org.apache.spark.sql.connector.catalog.TableWritePrivilege._
2829
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
2930
import org.apache.spark.sql.errors.QueryCompilationErrors
3031
import org.apache.spark.sql.execution.QueryExecution
32+
import org.apache.spark.sql.internal.SQLConf
3133
import org.apache.spark.sql.types.IntegerType
3234

3335
/**
@@ -109,24 +111,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
109111
}
110112

111113
override def create(): Unit = {
112-
val tableSpec = UnresolvedTableSpec(
113-
properties = properties.toMap,
114-
provider = provider,
115-
optionExpression = OptionList(Seq.empty),
116-
location = None,
117-
comment = None,
118-
serde = None,
119-
external = false)
120114
runCommand(
121115
CreateTableAsSelect(
122116
UnresolvedIdentifier(tableName),
123117
partitioning.getOrElse(Seq.empty),
124118
logicalPlan,
125-
tableSpec,
119+
buildTableSpec(),
126120
options.toMap,
127121
false))
128122
}
129123

124+
private def buildTableSpec(): UnresolvedTableSpec = {
125+
val ignorePathOption = sparkSession.sessionState.conf.getConf(
126+
SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION)
127+
UnresolvedTableSpec(
128+
properties = properties.toMap,
129+
provider = provider,
130+
optionExpression = OptionList(Seq.empty),
131+
location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"),
132+
comment = None,
133+
serde = None,
134+
external = false)
135+
}
136+
137+
/** @inheritdoc */
130138
override def replace(): Unit = {
131139
internalReplace(orCreate = false)
132140
}
@@ -202,19 +210,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
202210
}
203211

204212
private def internalReplace(orCreate: Boolean): Unit = {
205-
val tableSpec = UnresolvedTableSpec(
206-
properties = properties.toMap,
207-
provider = provider,
208-
optionExpression = OptionList(Seq.empty),
209-
location = None,
210-
comment = None,
211-
serde = None,
212-
external = false)
213213
runCommand(ReplaceTableAsSelect(
214214
UnresolvedIdentifier(tableName),
215215
partitioning.getOrElse(Seq.empty),
216216
logicalPlan,
217-
tableSpec,
217+
buildTableSpec(),
218218
writeOptions = options.toMap,
219219
orCreate = orCreate))
220220
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,4 +789,30 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
789789
errorClass = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
790790
parameters = Map("methodName" -> "`writeTo`"))
791791
}
792+
793+
test("SPARK-51281: DataFrameWriterV2 should respect the path option") {
794+
def checkResults(df: DataFrame): Unit = {
795+
checkAnswer(df, spark.range(10).toDF())
796+
}
797+
798+
Seq(true, false).foreach { ignorePath =>
799+
withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) {
800+
withTable("t1", "t2") {
801+
spark.range(10).writeTo("t1").using("json").create()
802+
checkResults(spark.table("t1"))
803+
804+
withTempPath { p =>
805+
val path = p.getCanonicalPath
806+
spark.range(10).writeTo("t2").using("json").option("path", path).create()
807+
checkResults(spark.table("t2"))
808+
if (ignorePath) {
809+
assert(!p.exists())
810+
} else {
811+
checkResults(spark.read.json(path))
812+
}
813+
}
814+
}
815+
}
816+
}
817+
}
792818
}

0 commit comments

Comments
 (0)