Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ license: |

- In Spark 3.1, when `spark.sql.ansi.enabled` is false, Spark always returns null if the sum of decimal type column overflows. In Spark 3.0 or earlier, in the case, the sum of decimal type column may return null or incorrect result, or even fails at runtime (depending on the actual query plan execution).

- In Spark 3.1, when loading a dataframe, `path` or `paths` option cannot coexist with `load()`'s path parameters. For example, `spark.read.format("csv").option("path", "/tmp").load("/tmp2")` or `spark.read.option("path", "/tmp").csv("/tmp2")` will throw `org.apache.spark.sql.AnalysisException`. In Spark version 3.0 and below, `path` option is overwritten if one path parameter is passed to `load()`, or `path` option is added to the overall paths if multiple path parameters are passed to `load()`. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.pathOptionBehavior.enabled` to `true`.
- In Spark 3.1, `path` option cannot coexist when the following methods are called with path parameter(s): `DataFrameReader.load()`, `DataFrameWriter.save()`, `DataStreamReader.load()`, or `DataStreamWriter.start()`. In addition, `paths` option cannot coexist for `DataFrameReader.load()`. For example, `spark.read.format("csv").option("path", "/tmp").load("/tmp2")` or `spark.read.option("path", "/tmp").csv("/tmp2")` will throw `org.apache.spark.sql.AnalysisException`. In Spark version 3.0 and below, `path` option is overwritten if one path parameter is passed to above methods; `path` option is added to the overall paths if multiple path parameters are passed to `DataFrameReader.load()`. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.pathOptionBehavior.enabled` to `true`.

## Upgrading from Spark SQL 3.0 to 3.0.1

Expand Down
19 changes: 12 additions & 7 deletions python/pyspark/sql/tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ def test_stream_read_options(self):
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
.schema(bad_schema)\
.load(path='python/test_support/sql/streaming', schema=schema, format='text')
# SPARK-32516 disables the overwrite behavior by default.
with self.sql_conf({"spark.sql.legacy.pathOptionBehavior.enabled": True}):
df = self.spark.readStream.format('csv')\
.option('path', 'python/test_support/sql/fake')\
.schema(bad_schema)\
.load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")

Expand Down Expand Up @@ -110,10 +113,12 @@ def test_stream_save_options_overwrite(self):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
q = df.writeStream.option('checkpointLocation', fake1)\
.format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
# SPARK-32516 disables the overwrite behavior by default.
with self.sql_conf({"spark.sql.legacy.pathOptionBehavior.enabled": True}):
q = df.writeStream.option('checkpointLocation', fake1)\
.format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)

try:
self.assertEqual(q.name, 'this_query')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2716,8 +2716,9 @@ object SQLConf {
buildConf("spark.sql.legacy.pathOptionBehavior.enabled")
.internal()
.doc("When true, \"path\" option is overwritten if one path parameter is passed to " +
"DataFramerReader.load(), or \"path\" option is added to the overall paths if multiple " +
"path parameters are passed to DataFramerReader.load()")
"DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or " +
"DataStreamWriter.start(). Also, \"path\" option is added to the overall paths if " +
"multiple path parameters are passed to DataFrameReader.load()")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
(extraOptions.contains("path") || extraOptions.contains("paths")) && paths.nonEmpty) {
throw new AnalysisException("There is a 'path' or 'paths' option set and load() is called " +
"with path parameters. Either remove the path option if it's the same as the path " +
"parameter, or add it to the load() parameter if you do want to read multiple paths.")
"parameter, or add it to the load() parameter if you do want to read multiple paths. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
}

val updatedPaths = if (!legacyPathOptionBehavior && paths.length == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -284,6 +285,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def save(path: String): Unit = {
if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path") && path.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path here is a String, do we really need to check path.nonEmpty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think we need to remove the check. The output is more confusing since path option is also set:

scala> Seq(1).toDF.write.option("path", "/tmp/path1").parquet("")
java.lang.IllegalArgumentException: Can not create a Path from an empty string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:168)

I will create a PR for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #29697

throw new AnalysisException("There is a 'path' option set and save() is called with a path " +
"parameter. Either remove the path option, or call save() without the parameter. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
}
this.extraOptions += ("path" -> path)
save()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2}
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -239,6 +240,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* @since 2.0.0
*/
def load(path: String): DataFrame = {
if (!sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path") && path.nonEmpty) {
throw new AnalysisException("There is a 'path' option set and load() is called with a path" +
"parameter. Either remove the path option, or call load() without the parameter. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
}
option("path", path).load()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand Down Expand Up @@ -266,6 +267,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* @since 2.0.0
*/
def start(path: String): StreamingQuery = {
if (!df.sparkSession.sessionState.conf.legacyPathOptionBehavior &&
extraOptions.contains("path") && path.nonEmpty) {
throw new AnalysisException("There is a 'path' option set and start() is called with a " +
"path parameter. Either remove the path option, or call start() without the parameter. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.")
}
option("path", path).start()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
}

class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
import testImplicits._

private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
Expand Down Expand Up @@ -435,7 +436,6 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}

private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = {
import testImplicits._
val ms = new MemoryStream[Int](0, sqlContext)
val df = ms.toDF().toDF("a")
val tableName = "test"
Expand Down Expand Up @@ -703,4 +703,53 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
queries.foreach(_.stop())
}
}

test("SPARK-32516: 'path' cannot coexist with load()'s path parameter") {
def verifyLoadFails(f: => DataFrame): Unit = {
val e = intercept[AnalysisException](f)
assert(e.getMessage.contains(
"Either remove the path option, or call load() without the parameter"))
}

verifyLoadFails(spark.readStream.option("path", "tmp1").parquet("tmp2"))
verifyLoadFails(spark.readStream.option("path", "tmp1").format("parquet").load("tmp2"))

withClue("SPARK-32516: legacy behavior") {
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") {
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.option("path", "tmp1")
.load("tmp2")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be flaky. The directory of tmp2 could be non-empty and contains illegal data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is using org.apache.spark.sql.streaming.test as a format, which has a no-op implementation:

/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
class DefaultSource extends StreamSourceProvider with StreamSinkProvider {

This datasource is being used throughout this test suite with non-existent dirs:

test("stream paths") {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.load("/test")

Am I missing something? Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I think this is not an issue here.

// The legacy behavior overwrites the path option.
assert(LastOptions.parameters("path") == "tmp2")
}
}
}

test("SPARK-32516: 'path' cannot coexist with start()'s path parameter") {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
.load("tmp1")

val e = intercept[AnalysisException] {
df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("path", "tmp2")
.start("tmp3")
.stop()
}
assert(e.getMessage.contains(
"Either remove the path option, or call start() without the parameter"))

withClue("SPARK-32516: legacy behavior") {
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") {
spark.readStream
.format("org.apache.spark.sql.streaming.test")
.option("path", "tmp4")
.load("tmp5")
// The legacy behavior overwrites the path option.
assert(LastOptions.parameters("path") == "tmp5")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
assert(LastOptions.parameters("opt3") == "3")
}

test("SPARK-32364: later option should override earlier options") {
test("SPARK-32364: later option should override earlier options for load()") {
spark.read
.format("org.apache.spark.sql.test")
.option("paTh", "1")
Expand All @@ -249,15 +249,29 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}

test("SPARK-32364: path argument of save function should override all existing options") {
test("SPARK-32364: later option should override earlier options for save()") {
Seq(1).toDF.write
.format("org.apache.spark.sql.test")
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.save("5")
.option("path", "5")
.save()
assert(LastOptions.parameters("path") == "5")

withClue("SPARK-32516: legacy path option behavior") {
withSQLConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key -> "true") {
Seq(1).toDF.write
.format("org.apache.spark.sql.test")
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.save("5")
assert(LastOptions.parameters("path") == "5")
}
}
}

test("pass partitionBy as options") {
Expand Down Expand Up @@ -1157,4 +1171,17 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
}
}
}

test("SPARK-32516: 'path' option cannot coexist with save()'s path parameter") {
def verifyLoadFails(f: => Unit): Unit = {
val e = intercept[AnalysisException](f)
assert(e.getMessage.contains(
"Either remove the path option, or call save() without the parameter"))
}

val df = Seq(1).toDF
val path = "tmp"
verifyLoadFails(df.write.option("path", path).parquet(path))
verifyLoadFails(df.write.option("path", path).format("parquet").save(path))
}
}