Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ license: |

- In Spark 3.1, when `spark.sql.ansi.enabled` is false, Spark always returns null if the sum of decimal type column overflows. In Spark 3.0 or earlier, in the case, the sum of decimal type column may return null or incorrect result, or even fails at runtime (depending on the actual query plan execution).

- In Spark 3.1, when loading a dataframe, `path` or `paths` option cannot coexist with `load()`'s path parameters. For example, `spark.read.format("csv").option("path", "/tmp").load("/tmp2")` or `spark.read.option("path", "/tmp").csv("/tmp2")` will throw `org.apache.spark.sql.AnalysisException`. In Spark version 3.0 and below, `path` option is overwritten if one path parameter is passed to `load()`, or `path` option is added to the overall paths if multiple path parameters are passed to `load()`. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.pathOptionBehavior.enabled` to `true`.

## Upgrading from Spark SQL 3.0 to 3.0.1

- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2712,6 +2712,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PATH_OPTION_BEHAVIOR =
buildConf("spark.sql.legacy.pathOptionBehavior.enabled")
.internal()
.doc("When true, \"path\" option is overwritten if one path parameter is passed to " +
"DataFramerReader.load(), or \"path\" option is added to the overall paths if multiple " +
"path parameters are passed to DataFramerReader.load()")
.version("3.1.0")
.booleanConf
.createWithDefault(false)

/**
* Holds information about keys that have been deprecated.
*
Expand Down Expand Up @@ -3322,6 +3332,8 @@ class SQLConf extends Serializable with Logging {
def optimizeNullAwareAntiJoin: Boolean =
getConf(SQLConf.OPTIMIZE_NULL_AWARE_ANTI_JOIN)

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
30 changes: 25 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -229,7 +230,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
def load(path: String): DataFrame = {
// force invocation of `load(...varargs...)`
option("path", path).load(Seq.empty: _*)
if (sparkSession.sessionState.conf.legacyPathOptionBehavior) {
option("path", path).load(Seq.empty: _*)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to restore the legacy behavior. Note that the legacy behavior is different for the following:

  • spark.read.option("path", path).parquet(path) => path is read twice
  • spark.read.format("parquet").option("path", path).load(path) => path is read once (option is overwritten)

} else {
load(Seq(path): _*)
}
}

/**
Expand All @@ -245,15 +250,30 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.")
}

val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior
if (!legacyPathOptionBehavior &&
(extraOptions.contains("path") || extraOptions.contains("paths")) && paths.nonEmpty) {
throw new AnalysisException("There is a 'path' or 'paths' option set and load() is called " +
"with path parameters. Either remove the path option if it's the same as the path " +
"parameter, or add it to the load() parameter if you do want to read multiple paths.")
}

val updatedPaths = if (!legacyPathOptionBehavior && paths.length == 1) {
option("path", paths.head)
Seq.empty
} else {
paths
}

DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
val catalogManager = sparkSession.sessionState.catalogManager
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
val pathsOption = if (paths.isEmpty) {
val pathsOption = if (updatedPaths.isEmpty) {
None
} else {
val objectMapper = new ObjectMapper()
Some("paths" -> objectMapper.writeValueAsString(paths.toArray))
Some("paths" -> objectMapper.writeValueAsString(updatedPaths.toArray))
}

val finalOptions = sessionOptions ++ extraOptions.originalMap ++ pathsOption
Expand Down Expand Up @@ -281,9 +301,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
sparkSession,
DataSourceV2Relation.create(table, catalog, ident, dsOptions))

case _ => loadV1Source(paths: _*)
case _ => loadV1Source(updatedPaths: _*)
}
}.getOrElse(loadV1Source(paths: _*))
}.getOrElse(loadV1Source(updatedPaths: _*))
}

private def loadV1Source(paths: String*) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,27 +826,6 @@ class FileBasedDataSourceSuite extends QueryTest
}
}

test("File table location should include both values of option `path` and `paths`") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPaths(3) { paths =>
paths.zipWithIndex.foreach { case (path, index) =>
Seq(index).toDF("a").write.mode("overwrite").parquet(path.getCanonicalPath)
}
val df = spark
.read
.option("path", paths.head.getCanonicalPath)
.parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath)
df.queryExecution.optimizedPlan match {
case PhysicalOperation(_, _, DataSourceV2ScanRelation(table: ParquetTable, _, _)) =>
assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet)
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
}
checkAnswer(df, Seq(0, 1, 2).map(Row(_)))
}
}
}

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
Seq("parquet", "").foreach { format =>
withSQLConf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,29 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
assert(LastOptions.parameters("opt3") == "3")
}

test("SPARK-32364: path argument of load function should override all existing options") {
test("SPARK-32364: later option should override earlier options") {
spark.read
.format("org.apache.spark.sql.test")
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.load("5")
.option("path", "5")
.load()
assert(LastOptions.parameters("path") == "5")

withClue("SPARK-32516: legacy path option behavior") {
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") {
spark.read
.format("org.apache.spark.sql.test")
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.load("5")
assert(LastOptions.parameters("path") == "5")
}
}
}

test("SPARK-32364: path argument of save function should override all existing options") {
Expand Down Expand Up @@ -1105,4 +1119,42 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}
}

test("SPARK-32516: 'path' or 'paths' option cannot coexist with load()'s path parameters") {
def verifyLoadFails(f: () => DataFrame): Unit = {
val e = intercept[AnalysisException](f())
assert(e.getMessage.contains(
"Either remove the path option if it's the same as the path parameter"))
}

val path = "/tmp"
verifyLoadFails(() => spark.read.option("path", path).parquet(path))
verifyLoadFails(() => spark.read.option("path", path).format("parquet").load(path))
verifyLoadFails(() => spark.read.option("paths", path).parquet(path))
verifyLoadFails(() => spark.read.option("paths", path).format("parquet").load(path))
}

test("SPARK-32516: legacy path option behavior in load()") {
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") {
withTempDir { dir =>
val path = dir.getCanonicalPath
Seq(1).toDF.write.mode("overwrite").parquet(path)

// When there is one path parameter to load(), "path" option is overwritten.
checkAnswer(spark.read.format("parquet").option("path", path).load(path), Row(1))

// When there are multiple path parameters to load(), "path" option is added.
checkAnswer(
spark.read.format("parquet").option("path", path).load(path, path),
Seq(Row(1), Row(1), Row(1)))

// When built-in datasource functions are invoked (e.g, `csv`, `parquet`, etc.),
// the path option is always added regardless of the number of path parameters.
checkAnswer(spark.read.option("path", path).parquet(path), Seq(Row(1), Row(1)))
checkAnswer(
spark.read.option("path", path).parquet(path, path),
Seq(Row(1), Row(1), Row(1)))
}
}
}
}