Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5545,6 +5545,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION =
buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption")
.internal()
.doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " +
"to the default table location.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we fix the bug? It means users might already use the 'path' option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it. The bug exists about 3 years. So let the bug as a legacy behavior looks better.

.version("3.5.5")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change it to 3.5.6 after the 3.5.5 RC1 vote passes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change it to 3.5.6 after the 3.5.5 RC1 vote passes.

Ya, let's wait and see the vote result because it's still open.

.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.TableWritePrivilege._
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

/**
Expand Down Expand Up @@ -146,25 +148,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])

/** @inheritdoc */
override def create(): Unit = {
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
collation = None,
serde = None,
external = false)
runCommand(
CreateTableAsSelect(
UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty) ++ clustering,
logicalPlan,
tableSpec,
buildTableSpec(),
options.toMap,
false))
}

private def buildTableSpec(): UnresolvedTableSpec = {
val ignorePathOption = sparkSession.sessionState.conf.getConf(
SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION)
UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual change.

comment = None,
collation = None,
serde = None,
external = false)
}

/** @inheritdoc */
override def replace(): Unit = {
internalReplace(orCreate = false)
Expand Down Expand Up @@ -212,20 +219,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}

private def internalReplace(orCreate: Boolean): Unit = {
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
collation = None,
serde = None,
external = false)
runCommand(ReplaceTableAsSelect(
UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty) ++ clustering,
logicalPlan,
tableSpec,
buildTableSpec(),
writeOptions = options.toMap,
orCreate = orCreate))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,4 +839,30 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
parameters = Map("methodName" -> "`writeTo`"))
}

test("SPARK-51281: DataFrameWriterV2 should respect the path option") {
def checkResults(df: DataFrame): Unit = {
checkAnswer(df, spark.range(10).toDF())
}

Seq(true, false).foreach { ignorePath =>
withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) {
withTable("t1", "t2") {
spark.range(10).writeTo("t1").using("json").create()
checkResults(spark.table("t1"))

withTempPath { p =>
val path = p.getCanonicalPath
spark.range(10).writeTo("t2").using("json").option("path", path).create()
checkResults(spark.table("t2"))
if (ignorePath) {
assert(!p.exists())
} else {
checkResults(spark.read.json(path))
}
}
}
}
}
}
}