Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def load(self, path=None, format=None, schema=None, **options):
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param options: all other string options

>>> df = spark.read.load('python/test_support/sql/parquet_partitioned', opt1=True,
... opt2=1, opt3='str')
>>> df = spark.read.format("parquet").load('python/test_support/sql/parquet_partitioned',
... opt1=True, opt2=1, opt3='str')

@dongjoon-hyun dongjoon-hyun Mar 1, 2018

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the other things, there is some difference from the original semantics.
As an alternative approach, we can add the following if we need to keep the original spark.read.load.

spark.conf.set("spark.sql.sources.default", "parquet")

>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ object SQLConf {
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.stringConf
.createWithDefault("parquet")
.createWithDefault("orc")

@dongjoon-hyun dongjoon-hyun Mar 1, 2018

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a testing purpose during reviews.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change it back?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It's back now, @gatorsmile .


val CONVERT_CTAS = buildConf("spark.sql.hive.convertCTAS")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2150,7 +2150,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("data source table created in InMemoryCatalog should be able to read/write") {
withTable("tbl") {
sql("CREATE TABLE tbl(i INT, j STRING) USING parquet")
val provider = spark.sessionState.conf.defaultDataSourceName

@HyukjinKwon HyukjinKwon Mar 3, 2018

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm .. how about just explicitly setting spark.sql.sources.default to parquet in all places rather than using the default? If it's set to, for example, text, this test becomes failed. I thought it's a bit odd that a test is dependent on a default value.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is SQLQuerySuite. The test case is correctly testing its purpose. Every data source have its own capability and limitation. Your example is only text data source's limitation supporting a single column schema, isn't it? For the other csv/json/orc/parquet will pass this specific test.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, the purpose of this PR is setting once in SQLConf.scala to order to test a new data source to find out the limitation instead of touching every data suite.

BTW, spark.sql.sources.default=parquet doesn't help this existing code because the SQL has a fixed string USING parquet.

sql(s"CREATE TABLE tbl(i INT, j STRING) USING $provider")
checkAnswer(sql("SELECT i, j FROM tbl"), Nil)

Seq(1 -> "a", 2 -> "b").toDF("i", "j").write.mode("overwrite").insertInto("tbl")
Expand Down Expand Up @@ -2476,7 +2477,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
withTempDir { dir =>
val parquetDir = new File(dir, "parquet").getCanonicalPath
spark.range(10).withColumn("_col", $"id").write.partitionBy("_col").save(parquetDir)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the data format may not be parquet, maybe the directory name should be more generic, like dataDir.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @bersprockets .

spark.read.parquet(parquetDir)
spark.read.load(parquetDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-22673: InMemoryRelation should utilize existing stats of the plan to be cached") {
withSQLConf("spark.sql.cbo.enabled" -> "true") {
// This test case depends on the size of parquet in statistics.
withSQLConf(
SQLConf.CBO_ENABLED.key -> "true",
SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "parquet") {
withTempPath { workDir =>
withTable("table1") {
val workDirPath = workDir.getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,15 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")

val e = intercept[AnalysisException] {
Seq(5 -> "e").toDF("i", "j").write.mode("append").format("json").saveAsTable("t1")
val format = if (spark.sessionState.conf.defaultDataSourceName.equalsIgnoreCase("json")) {
"orc"
} else {
"json"
}
Seq(5 -> "e").toDF("i", "j").write.mode("append").format(format).saveAsTable("t1")
}
assert(e.message.contains("The format of the existing table default.t1 is " +
"`ParquetFileFormat`. It doesn't match the specified format `JsonFileFormat`."))
assert(e.message.contains("The format of the existing table default.t1 is "))
assert(e.message.contains("It doesn't match the specified format"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,15 +739,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
withTempPath { dir =>
df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
checkAnswer(spark.read.parquet(dir.toString).select(fields: _*), row)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is ParquetPartitionDiscoverySuite, parquet is more proper than load.

}

withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name).cast(f.dataType))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
.load(dir.toString).select(fields: _*), row)
.parquet(dir.toString).select(fields: _*), row)
}
}

Expand Down Expand Up @@ -781,15 +781,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
withTempPath { dir =>
df.write.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.load(dir.toString).select(fields: _*), row)
checkAnswer(spark.read.parquet(dir.toString).select(fields: _*), row)
}

withTempPath { dir =>
df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
.format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString)
val fields = schema.map(f => Column(f.name))
checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "GMT")
.load(dir.toString).select(fields: _*), row)
.parquet(dir.toString).select(fields: _*), row)
}
}

Expand Down Expand Up @@ -1095,7 +1095,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha

withTempPath { path =>
df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
checkAnswer(spark.read.load(path.getAbsolutePath), df)
checkAnswer(spark.read.parquet(path.getAbsolutePath), df)
}
}

Expand All @@ -1104,7 +1104,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val df = Seq((1, "2014-01-01"), (2, "2016-01-01"), (3, "2015-01-01 00:01:00")).toDF("i", "ts")
df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath)
checkAnswer(
spark.read.load(path.getAbsolutePath),
spark.read.parquet(path.getAbsolutePath),
Row(1, Timestamp.valueOf("2014-01-01 00:00:00")) ::
Row(2, Timestamp.valueOf("2016-01-01 00:00:00")) ::
Row(3, Timestamp.valueOf("2015-01-01 00:01:00")) :: Nil)
Expand All @@ -1114,7 +1114,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
val df = Seq((1, "1"), (2, "3"), (3, "2" * 30)).toDF("i", "decimal")
df.write.format("parquet").partitionBy("decimal").save(path.getAbsolutePath)
checkAnswer(
spark.read.load(path.getAbsolutePath),
spark.read.parquet(path.getAbsolutePath),
Row(1, BigDecimal("1")) ::
Row(2, BigDecimal("3")) ::
Row(3, BigDecimal("2" * 30)) :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
"and a same-name temp view exist") {
withTable("same_name") {
withTempView("same_name") {
sql("CREATE TABLE same_name(id LONG) USING parquet")
val format = spark.sessionState.conf.defaultDataSourceName
sql(s"CREATE TABLE same_name(id LONG) USING $format")
spark.range(10).createTempView("same_name")
spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name")
checkAnswer(spark.table("same_name"), spark.range(10).toDF())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("Pre insert nullability check (ArrayType)") {
withTable("arrayInParquet") {
withTable("array") {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good, maybe in a future cleanup, to replace all these repeating string literals (e.g, "array" 7 times, "map" 7 times) with a variable name.

{
val df = (Tuple1(Seq(Int.box(1), null: Integer)) :: Nil).toDF("a")
val expectedSchema =
Expand All @@ -604,9 +604,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("arrayInParquet")
.saveAsTable("array")
}

{
Expand All @@ -621,25 +620,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("arrayInParquet")
.insertInto("array")
}

(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
.saveAsTable("array") // This one internally calls df2.insertInto.

(Tuple1(Seq(Int.box(6), null: Integer)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet")
.saveAsTable("array")

sparkSession.catalog.refreshTable("arrayInParquet")
sparkSession.catalog.refreshTable("array")

checkAnswer(
sql("SELECT a FROM arrayInParquet"),
sql("SELECT a FROM array"),
Row(ArrayBuffer(1, null)) ::
Row(ArrayBuffer(2, 3)) ::
Row(ArrayBuffer(4, 5)) ::
Expand All @@ -648,7 +646,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}

test("Pre insert nullability check (MapType)") {
withTable("mapInParquet") {
withTable("map") {
{
val df = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("a")
val expectedSchema =
Expand All @@ -661,9 +659,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("mapInParquet")
.saveAsTable("map")
}

{
Expand All @@ -678,27 +675,24 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(df.schema === expectedSchema)

df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("mapInParquet")
.insertInto("map")
}

(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
.saveAsTable("map") // This one internally calls df2.insertInto.

(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet")
.saveAsTable("map")

sparkSession.catalog.refreshTable("mapInParquet")
sparkSession.catalog.refreshTable("map")

checkAnswer(
sql("SELECT a FROM mapInParquet"),
sql("SELECT a FROM map"),
Row(Map(1 -> null)) ::
Row(Map(2 -> 3)) ::
Row(Map(4 -> 5)) ::
Expand Down Expand Up @@ -852,52 +846,52 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
}

withTable("insertParquet") {
createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
withTable("t") {
createDF(0, 9).write.saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"),
(6 to 9).map(i => Row(i, s"str$i")))

intercept[AnalysisException] {
createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
createDF(10, 19).write.saveAsTable("t")
}

createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
createDF(10, 19).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))

createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
createDF(20, 29).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 25"),
(6 to 24).map(i => Row(i, s"str$i")))

intercept[AnalysisException] {
createDF(30, 39).write.saveAsTable("insertParquet")
createDF(30, 39).write.saveAsTable("t")
}

createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))

createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
createDF(40, 49).write.mode(SaveMode.Append).insertInto("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))

createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 51 AND p.c1 < 55"),
(52 to 54).map(i => Row(i, s"str$i")))
createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
sql("SELECT p.c1, c2 FROM t p"),
(50 to 59).map(i => Row(i, s"str$i")))

createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("t")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
sql("SELECT p.c1, c2 FROM t p"),
(70 to 79).map(i => Row(i, s"str$i")))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about why the test named "SPARK-8156:create table to specific database by 'use dbname'" still has a hard-coded format of parquet. Is it testing functionality that is orthogonal to the format maybe?

I changed the hard-coded format to json, orc, and csv, and each time that test passed.

Similarly with
Suite: org.apache.spark.sql.sources.SaveLoadSuite
Test: SPARK-23459: Improve error message when specified unknown column in partition columns

@dongjoon-hyun dongjoon-hyun Mar 9, 2018

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is because this PR minimally changed only the test case causing failures. We cannot generalize all test cases at an one-shot huge PR for all modules. That will make it difficult to backport the other commits. The main goal of this PR is improving test-ability for new data sources.

For example, although SPARK-8156:create table to specific database by 'use dbname' writes to parquet, but reads with SQL, not by read.load. So, it doesn't fail. That's not generalized test case, but also not too much malicious.

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class PartitionProviderCompatibilitySuite
spark.range(5).selectExpr("id as fieldOne", "id as partCol").write
.partitionBy("partCol")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
.save(dir.getAbsolutePath)

spark.sql(s"""
|create table $tableName (fieldOne long, partCol int)
|using parquet
|using ${spark.sessionState.conf.defaultDataSourceName}
|options (path "${dir.toURI}")
|partitioned by (partCol)""".stripMargin)
}
Expand Down Expand Up @@ -358,7 +358,7 @@ class PartitionProviderCompatibilitySuite
try {
spark.sql(s"""
|create table test (id long, P1 int, P2 int)
|using parquet
|using ${spark.sessionState.conf.defaultDataSourceName}
|options (path "${base.toURI}")
|partitioned by (P1, P2)""".stripMargin)
spark.sql(s"alter table test add partition (P1=0, P2=0) location '${a.toURI}'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ class PartitionedTablePerfStatsSuite
import spark.implicits._
Seq(1).toDF("a").write.mode("overwrite").save(dir.getAbsolutePath)
HiveCatalogMetrics.reset()
spark.read.parquet(dir.getAbsolutePath)
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,8 +1658,8 @@ class HiveDDLSuite
Seq(5 -> "e").toDF("i", "j")
.write.format("hive").mode("append").saveAsTable("t1")
}
assert(e.message.contains("The format of the existing table default.t1 is " +
"`ParquetFileFormat`. It doesn't match the specified format `HiveFileFormat`."))
assert(e.message.contains("The format of the existing table default.t1 is "))
assert(e.message.contains("It doesn't match the specified format `HiveFileFormat`."))
}
}

Expand Down Expand Up @@ -1709,11 +1709,12 @@ class HiveDDLSuite
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tblName)).schema.map(_.name)
}

val provider = spark.sessionState.conf.defaultDataSourceName
withTable("t", "t1", "t2", "t3", "t4", "t5", "t6") {
sql("CREATE TABLE t(a int, b int, c int, d int) USING parquet PARTITIONED BY (d, b)")
sql(s"CREATE TABLE t(a int, b int, c int, d int) USING $provider PARTITIONED BY (d, b)")
assert(getTableColumns("t") == Seq("a", "c", "d", "b"))

sql("CREATE TABLE t1 USING parquet PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
sql(s"CREATE TABLE t1 USING $provider PARTITIONED BY (d, b) AS SELECT 1 a, 1 b, 1 c, 1 d")
assert(getTableColumns("t1") == Seq("a", "c", "d", "b"))

Seq((1, 1, 1, 1)).toDF("a", "b", "c", "d").write.partitionBy("d", "b").saveAsTable("t2")
Expand All @@ -1723,7 +1724,7 @@ class HiveDDLSuite
val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
Seq(1 -> 1).toDF("a", "c").write.save(dataPath)

sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}'")
sql(s"CREATE TABLE t3 USING $provider LOCATION '${path.toURI}'")
assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
}

Expand Down
Loading