diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ee48baa59c7a..6b76cc3e37bd 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2659,7 +2659,7 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume # It makes sure that we can omit path argument in write.df API and then it calls # DataFrameWriter.save() without path. expect_error(write.df(df, source = "csv"), - "Error in save : illegal argument - Expected exactly one path to be specified") + "Error in save : illegal argument - 'path' is not specified") expect_error(write.json(df, jsonPath), "Error in json : analysis error - path file:.*already exists") expect_error(write.text(df, jsonPath), @@ -2684,8 +2684,7 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume # It makes sure that we can omit path argument in read.df API and then it calls # DataFrameWriter.load() without path. expect_error(read.df(source = "json"), - paste("Error in loadDF : analysis error - Unable to infer schema for JSON at .", - "It must be specified manually")) + paste("Error in loadDF : illegal argument - 'path' is not specified")) expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist") expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist") expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 65422f1495f0..878520ed9f25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode @@ -322,6 +323,9 @@ case class DataSource( val equality = sparkSession.sessionState.conf.resolver StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) }.orElse { + if (allPaths.isEmpty && !format.isInstanceOf[TextFileFormat]) { + throw new IllegalArgumentException("'path' is not specified") + } format.inferSchema( sparkSession, caseInsensitiveOptions, @@ -369,6 +373,8 @@ case class DataSource( val path = new Path(allPaths.head) val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf()) path.makeQualified(fs.getUri, fs.getWorkingDirectory) + } else if (allPaths.length < 1) { + throw new IllegalArgumentException("'path' is not specified") } else { throw new IllegalArgumentException("Expected exactly one path to be specified, but " + s"got: ${allPaths.mkString(", ")}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index d0fd23605bea..55e062da2328 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -40,7 +40,7 @@ private[parquet] class ParquetOptions( if (!shortParquetCompressionCodecNames.contains(codecName)) { val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + s"is not available. Known codecs are ${availableCodecs.mkString(", ")}.") } shortParquetCompressionCodecNames(codecName).name() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 491ff72337a8..b8858a392b8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -345,6 +345,32 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } + test("save API - empty path or illegal path") { + var e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.format("csv").save() + }.getMessage + assert(e.contains("'path' is not specified")) + + e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.csv("") + }.getMessage + assert(e.contains("Can not create a Path from an empty string")) + } + + test("load API - empty path") { + val expectedErrorMsg = "'path' is not specified" + var e = intercept[IllegalArgumentException] { + spark.read.csv() + }.getMessage + assert(e.contains(expectedErrorMsg)) + + e = intercept[IllegalArgumentException] { + spark.read.format("csv").load() + }.getMessage + assert(e.contains(expectedErrorMsg)) + } + + test("save csv with quote") { withTempDir { dir => val csvDir = new File(dir, "csv").getCanonicalPath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 456052f79afc..0e850c59fbef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1693,6 +1693,42 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { df.collect() } + test("save API - empty path or illegal path") { + var e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.format("json").save() + }.getMessage + assert(e.contains("'path' is not specified")) + + e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.json("") + }.getMessage + assert(e.contains("Can not create a Path from an empty string")) + } + + test("load API - empty path") { + val expectedErrorMsg = "'path' is not specified" + var e = intercept[IllegalArgumentException] { + spark.read.json() + }.getMessage + assert(e.contains(expectedErrorMsg)) + + e = intercept[IllegalArgumentException] { + spark.read.format("json").load() + }.getMessage + assert(e.contains(expectedErrorMsg)) + } + + test("illegal compression") { + withTempDir { dir => + val path = dir.getCanonicalPath + val df = spark.range(0, 10) + val e = intercept[IllegalArgumentException] { + df.write.option("compression", "illegal").mode("overwrite").format("json").save(path) + }.getMessage + assert(e.contains("Codec [illegal] is not available. Known codecs are")) + } + } + test("Write dates correctly with dateFormat option") { val customSchema = new StructType(Array(StructField("date", DateType, true))) withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 580eade4b141..6c835ccd4513 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -426,6 +426,42 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("save API - empty path or illegal path") { + var e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.format("parquet").save() + }.getMessage + assert(e.contains("'path' is not specified")) + + e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.parquet("") + }.getMessage + assert(e.contains("Can not create a Path from an empty string")) + } + + test("load API - empty path") { + val expectedErrorMsg = "'path' is not specified" + var e = intercept[IllegalArgumentException] { + spark.read.parquet() + }.getMessage + assert(e.contains(expectedErrorMsg)) + + e = intercept[IllegalArgumentException] { + spark.read.format("parquet").load() + }.getMessage + assert(e.contains(expectedErrorMsg)) + } + + test("illegal compression") { + withTempDir { dir => + val path = dir.getCanonicalPath + val df = spark.range(0, 10) + val e = intercept[IllegalArgumentException] { + df.write.option("compression", "illegal").mode("overwrite").format("parquet").save(path) + }.getMessage + assert(e.contains("Codec [illegal] is not available. Known codecs are")) + } + } + test("SPARK-6315 regression test") { // Spark 1.1 and prior versions write Spark schema as case class string into Parquet metadata. // This has been deprecated by JSON format since 1.2. Notice that, 1.3 further refactored data diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index d11c2acb815d..d9f1c7ae3902 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -154,6 +154,35 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("save API - empty path or illegal path") { + var e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.format("text").save() + }.getMessage + assert(e.contains("'path' is not specified")) + + e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.text("") + }.getMessage + assert(e.contains("Can not create a Path from an empty string")) + } + + test("illegal compression") { + withTempDir { dir => + val path = dir.getCanonicalPath + val df = spark.range(0, 10).selectExpr("CAST(id AS STRING) AS s") + val e = intercept[IllegalArgumentException] { + df.write.option("compression", "illegal").mode("overwrite").format("text").save(path) + }.getMessage + assert(e.contains("Codec [illegal] is not available. Known codecs are")) + } + } + + test("load API - empty path") { + val res = Seq.empty[String].toDF("value") + checkAnswer(spark.read.text(), res) + checkAnswer(spark.read.textFile().toDF(), res) + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("test-data/text-suite.txt").toString } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 89ec162c8ed5..1299ad503b4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -307,10 +307,10 @@ class CatalogSuite } test("createExternalTable should fail if path is not given for file-based data source") { - val e = intercept[AnalysisException] { + val e = intercept[IllegalArgumentException] { spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String]) } - assert(e.message.contains("Unable to infer schema")) + assert(e.getMessage.contains("'path' is not specified")) val e2 = intercept[AnalysisException] { spark.catalog.createExternalTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index a7fda0109856..90364cbcb689 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -272,15 +272,34 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("illegal format name") { + val e = intercept[ClassNotFoundException] { + spark.read.format("illegal").load("/test") + } + assert(e.getMessage.contains("Failed to find data source: illegal")) + } + + test("empty partitionBy") { + withTempDir { dir => + val path = dir.getCanonicalPath + val input = spark.range(10).toDF() + input.write.format("parquet").mode("overwrite").partitionBy().save(path) + val output = spark.read.parquet(path) + checkAnswer(input, output) + } + } + test("prevent all column partitioning") { withTempDir { dir => val path = dir.getCanonicalPath - intercept[AnalysisException] { + var e = intercept[AnalysisException] { spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) - } - intercept[AnalysisException] { + }.getMessage + assert(e.contains("Cannot use all columns for partition columns")) + e = intercept[AnalysisException] { spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path) - } + }.getMessage + assert(e.contains("Cannot use all columns for partition columns")) spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path) } } @@ -396,9 +415,10 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be val schema = df.schema // Reader, without user specified schema - intercept[AnalysisException] { + val e = intercept[IllegalArgumentException] { testRead(spark.read.json(), Seq.empty, schema) - } + }.getMessage + assert(e.contains("'path' is not specified")) testRead(spark.read.json(dir), data, schema) testRead(spark.read.json(dir, dir), data ++ data, schema) testRead(spark.read.json(Seq(dir, dir): _*), data ++ data, schema) @@ -422,9 +442,10 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be val schema = df.schema // Reader, without user specified schema - intercept[AnalysisException] { + val e = intercept[IllegalArgumentException] { testRead(spark.read.parquet(), Seq.empty, schema) - } + }.getMessage + assert(e.contains("'path' is not specified")) testRead(spark.read.parquet(dir), data, schema) testRead(spark.read.parquet(dir, dir), data ++ data, schema) testRead(spark.read.parquet(Seq(dir, dir): _*), data ++ data, schema) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index c2a126d3bf9c..78e348cc8b39 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -41,7 +41,7 @@ private[orc] class OrcOptions(@transient private val parameters: Map[String, Str if (!shortOrcCompressionCodecNames.contains(codecName)) { val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + s"is not available. Known codecs are ${availableCodecs.mkString(", ")}.") } shortOrcCompressionCodecNames(codecName) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c50f92e783c8..c0d4635c9888 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -544,18 +544,18 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } test("path required error") { - assert( - intercept[AnalysisException] { - sparkSession.catalog.createExternalTable( - "createdJsonTable", - "org.apache.spark.sql.json", - Map.empty[String, String]) - - table("createdJsonTable") - }.getMessage.contains("Unable to infer schema"), - "We should complain that path is not specified.") + withTable("createdJsonTable") { + assert( + intercept[IllegalArgumentException] { + sparkSession.catalog.createExternalTable( + "createdJsonTable", + "org.apache.spark.sql.json", + Map.empty[String, String]) - sql("DROP TABLE IF EXISTS createdJsonTable") + table("createdJsonTable") + }.getMessage.contains("'path' is not specified"), + "We should complain that path is not specified.") + } } test("scan a parquet table created through a CTAS statement") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 0f37cd7bf365..ae4f1eda8004 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -21,16 +21,19 @@ import java.io.File import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.SparkException +import org.apache.spark.sql._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.sources._ +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils case class OrcData(intField: Int, stringField: String) -abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { - import spark._ +abstract class OrcSuite extends QueryTest + with TestHiveSingleton with SQLTestUtils with BeforeAndAfterAll { + import spark.implicits._ var orcTableDir: File = null var orcTableAsDir: File = null @@ -146,6 +149,98 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA sql("DROP TABLE IF EXISTS orcNullValues") } + + test("prevent all column partitioning") { + withTempDir { dir => + val path = dir.getCanonicalPath + val e = intercept[AnalysisException] { + spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + }.getMessage + assert(e.contains("Cannot use all columns for partition columns")) + } + } + + test("save API - empty path or illegal path") { + var e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.format("orc").save() + }.getMessage + assert(e.contains("'path' is not specified")) + + e = intercept[IllegalArgumentException] { + spark.range(1).coalesce(1).write.orc("") + }.getMessage + assert(e.contains("Can not create a Path from an empty string")) + } + + test("load API - empty path") { + val expectedErrorMsg = "'path' is not specified" + var e = intercept[IllegalArgumentException] { + spark.read.orc() + }.getMessage + assert(e.contains(expectedErrorMsg)) + + e = intercept[IllegalArgumentException] { + spark.read.format("orc").load().show() + }.getMessage + assert(e.contains(expectedErrorMsg)) + } + + test("illegal compression") { + withTempDir { dir => + val path = dir.getCanonicalPath + val df = spark.range(0, 10) + val e = intercept[IllegalArgumentException] { + df.write.option("compression", "illegal").mode("overwrite").format("orc").save(path) + }.getMessage + assert(e.contains("Codec [illegal] is not available. Known codecs are")) + } + } + + test("orc - API") { + val userSchema = new StructType().add("s", StringType) + val data = Seq("1", "2", "3") + val dir = Utils.createTempDir(namePrefix = "input").getCanonicalPath + + // Writer + spark.createDataset(data).toDF("str").write.mode(SaveMode.Overwrite).orc(dir) + val df = spark.read.orc(dir) + checkAnswer(df, spark.createDataset(data).toDF()) + val schema = df.schema + + // Reader, without user specified schema + intercept[IllegalArgumentException] { + testRead(spark.read.orc(), Seq.empty, schema) + } + testRead(spark.read.orc(dir), data, schema) + testRead(spark.read.orc(dir, dir), data ++ data, schema) + testRead(spark.read.orc(Seq(dir, dir): _*), data ++ data, schema) + // Test explicit calls to single arg method - SPARK-16009 + testRead(Option(dir).map(spark.read.orc).get, data, schema) + + // Reader, with user specified schema, report an exception as schema in file different + // from user schema. + testRead(spark.read.schema(userSchema).orc(), Seq.empty, userSchema) + var e = intercept[SparkException] { + testRead(spark.read.schema(userSchema).orc(dir), Seq.empty, userSchema) + }.getMessage + assert(e.contains("Field \"s\" does not exist")) + e = intercept[SparkException] { + testRead(spark.read.schema(userSchema).orc(dir, dir), Seq.empty, userSchema) + }.getMessage + assert(e.contains("Field \"s\" does not exist")) + e = intercept[SparkException] { + testRead(spark.read.schema(userSchema).orc(Seq(dir, dir): _*), Seq.empty, userSchema) + }.getMessage + assert(e.contains("Field \"s\" does not exist")) + } + + private def testRead( + df: => DataFrame, + expectedResult: Seq[String], + expectedSchema: StructType): Unit = { + checkAnswer(df, spark.createDataset(expectedResult).toDF()) + assert(df.schema === expectedSchema) + } } class OrcSourceSuite extends OrcSuite {