Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5545,6 +5545,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION =
buildConf("spark.sql.legacy.dataFrameWriterV2IgnorePathOption")
.internal()
.doc("When set to true, DataFrameWriterV2 ignores the 'path' option and always write data " +
"to the default table location.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we fix the bug? It means users might already use the 'path' option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it. The bug exists about 3 years. So let the bug as a legacy behavior looks better.

.version("4.0.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for 3.5.5. BTW, @cloud-fan , if we want to include this, we need to change this to 3.5.5.

.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.TableWritePrivilege._
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

/**
Expand Down Expand Up @@ -146,25 +148,30 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])

/** @inheritdoc */
override def create(): Unit = {
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
collation = None,
serde = None,
external = false)
runCommand(
CreateTableAsSelect(
UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty) ++ clustering,
logicalPlan,
tableSpec,
buildTableSpec(),
options.toMap,
false))
}

private def buildTableSpec(): UnresolvedTableSpec = {
val ignorePathOption = sparkSession.sessionState.conf.getConf(
SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION)
UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = if (ignorePathOption) None else CaseInsensitiveMap(options.toMap).get("path"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual change.

comment = None,
collation = None,
serde = None,
external = false)
}

/** @inheritdoc */
override def replace(): Unit = {
internalReplace(orCreate = false)
Expand Down Expand Up @@ -212,20 +219,11 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}

private def internalReplace(orCreate: Boolean): Unit = {
val tableSpec = UnresolvedTableSpec(
properties = properties.toMap,
provider = provider,
optionExpression = OptionList(Seq.empty),
location = None,
comment = None,
collation = None,
serde = None,
external = false)
runCommand(ReplaceTableAsSelect(
UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty) ++ clustering,
logicalPlan,
tableSpec,
buildTableSpec(),
writeOptions = options.toMap,
orCreate = orCreate))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,4 +839,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
condition = "CALL_ON_STREAMING_DATASET_UNSUPPORTED",
parameters = Map("methodName" -> "`writeTo`"))
}

test("SPARK-51281: create/replace file source tables") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: are we testing replace here? Why not just the name of the jira?

Seq(true, false).foreach { ignorePath =>
withSQLConf(SQLConf.LEGACY_DF_WRITER_V2_IGNORE_PATH_OPTION.key -> ignorePath.toString) {
withTable("t1", "t2") {
spark.range(10).writeTo("t1").using("json").create()
checkAnswer(spark.table("t1"), spark.range(10).toDF())
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. spark.range(10).toDF() seems to be repeated three times. Shall we define one to reuse during this unit test?


withTempPath { p =>
val path = p.getCanonicalPath
spark.range(10).writeTo("t2").using("json").option("path", path).create()
checkAnswer(spark.table("t2"), spark.range(10).toDF())
if (ignorePath) {
assert(!p.exists())
} else {
checkAnswer(spark.read.json(path), spark.range(10).toDF())
}
}
}
}
}
}
}