Skip to content

Commit a3671e5

Browse files
committed
[SPARK-51281][SQL] DataFrameWriterV2 should respect the path option
### What changes were proposed in this pull request? Unlike `DataFrameWriter.saveAsTable` where we explicitly get the "path" option and treat it as table location, `DataFrameWriterV2` doesn't do it and treats the "path" option as a normal option which doesn't have any real impact. This PR fixes it, and adds a legacy config to restore the old behavior. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? Yes, now `DataFrameWriterV2` can correctly write data to the specified path for file source tables. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#50040 from cloud-fan/prop. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 412da42 commit a3671e5

File tree

3 files changed

+53
-20
lines changed

3 files changed

+53
-20
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5566,6 +5566,15 @@ object SQLConf {
55665566
.booleanConf
55675567
.createWithDefault(false)
55685568

5569+
val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION =
5570+
buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption")
5571+
.internal()
5572+
.doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " +
5573+
"to the default table location.")
5574+
.version("3.5.6")
5575+
.booleanConf
5576+
.createWithDefault(false)
5577+
55695578
/**
55705579
* Holds information about keys that have been deprecated.
55715580
*

sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ import org.apache.spark.sql.Column
2828
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation}
2929
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal}
3030
import org.apache.spark.sql.catalyst.plans.logical._
31+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3132
import org.apache.spark.sql.connector.catalog.TableWritePrivilege._
3233
import org.apache.spark.sql.connector.expressions._
3334
import org.apache.spark.sql.errors.QueryCompilationErrors
3435
import org.apache.spark.sql.execution.QueryExecution
36+
import org.apache.spark.sql.internal.SQLConf
3537
import org.apache.spark.sql.types.IntegerType
3638

3739
/**
@@ -146,25 +148,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
146148

147149
/** @inheritdoc */
148150
override def create(): Unit = {
149-
val tableSpec = UnresolvedTableSpec(
150-
properties = properties.toMap,
151-
provider = provider,
152-
optionExpression = OptionList(Seq.empty),
153-
location = None,
154-
comment = None,
155-
collation = None,
156-
serde = None,
157-
external = false)
158151
runCommand(
159152
CreateTableAsSelect(
160153
UnresolvedIdentifier(tableName),
161154
partitioning.getOrElse(Seq.empty) ++ clustering,
162155
logicalPlan,
163-
tableSpec,
156+
buildTableSpec(),
164157
options.toMap,
165158
false))
166159
}
167160

161+
private def buildTableSpec(): UnresolvedTableSpec = {
162+
val ignorePathOption = sparkSession.sessionState.conf.getConf(
163+
SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION)
164+
UnresolvedTableSpec(
165+
properties = properties.toMap,
166+
provider = provider,
167+
optionExpression = OptionList(Seq.empty),
168+
location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"),
169+
comment = None,
170+
collation = None,
171+
serde = None,
172+
external = false)
173+
}
174+
168175
/** @inheritdoc */
169176
override def replace(): Unit = {
170177
internalReplace(orCreate = false)
@@ -212,20 +219,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
212219
}
213220

214221
private def internalReplace(orCreate: Boolean): Unit = {
215-
val tableSpec = UnresolvedTableSpec(
216-
properties = properties.toMap,
217-
provider = provider,
218-
optionExpression = OptionList(Seq.empty),
219-
location = None,
220-
comment = None,
221-
collation = None,
222-
serde = None,
223-
external = false)
224222
runCommand(ReplaceTableAsSelect(
225223
UnresolvedIdentifier(tableName),
226224
partitioning.getOrElse(Seq.empty) ++ clustering,
227225
logicalPlan,
228-
tableSpec,
226+
buildTableSpec(),
229227
writeOptions = options.toMap,
230228
orCreate = orCreate))
231229
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,4 +839,30 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
839839
condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
840840
parameters = Map("methodName" -> "`writeTo`"))
841841
}
842+
843+
test("SPARK-51281: DataFrameWriterV2 should respect the path option") {
844+
def checkResults(df: DataFrame): Unit = {
845+
checkAnswer(df, spark.range(10).toDF())
846+
}
847+
848+
Seq(true, false).foreach { ignorePath =>
849+
withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) {
850+
withTable("t1", "t2") {
851+
spark.range(10).writeTo("t1").using("json").create()
852+
checkResults(spark.table("t1"))
853+
854+
withTempPath { p =>
855+
val path = p.getCanonicalPath
856+
spark.range(10).writeTo("t2").using("json").option("path", path).create()
857+
checkResults(spark.table("t2"))
858+
if (ignorePath) {
859+
assert(!p.exists())
860+
} else {
861+
checkResults(spark.read.json(path))
862+
}
863+
}
864+
}
865+
}
866+
}
867+
}
842868
}

0 commit comments

Comments
 (0)