diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index b237a07c72d0..200abd08fb86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -53,11 +53,12 @@ private[sql] case class ParquetRelation( self: Product => /** Schema derived from ParquetFile */ - def parquetSchema: MessageType = - ParquetTypesConverter - .readMetaData(new Path(path), conf) - .getFileMetaData - .getSchema + def parquetSchema: MessageType = { + ParquetTypesConverter.readMetaData(new Path(path), conf) + .map(_.getFileMetaData.getSchema) + .getOrElse( + throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) + } /** Attributes */ override val output = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index fa37d1f2ae7e..37db13513684 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -48,7 +48,7 @@ private[parquet] case class ParquetTypeInfo( decimalMetadata: Option[DecimalMetadata] = None, length: Option[Int] = None) -private[parquet] object ParquetTypesConverter extends Logging { +private[sql] object ParquetTypesConverter extends Logging { def isPrimitiveType(ctype: DataType): Boolean = classOf[PrimitiveType] isAssignableFrom ctype.getClass @@ -425,7 +425,9 @@ private[parquet] object ParquetTypesConverter extends Logging { * @param configuration The Hadoop configuration to use. * @return The `ParquetMetadata` containing among other things the schema. */ - def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = { + def readMetaData( + origPath: Path, + configuration: Option[Configuration]): Option[ParquetMetadata] = { if (origPath == null) { throw new IllegalArgumentException("Unable to read Parquet metadata: path is null") } @@ -450,13 +452,11 @@ private[parquet] object ParquetTypesConverter extends Logging { // all data in a single Parquet file have the same schema, which is normally true. children // Try any non-"_metadata" file first... - .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE) + .find(file => file.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE && !file.isDir) // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is // empty, thus normally the "_metadata" file is expected to be fairly small). .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)) .map(ParquetFileReader.readFooter(conf, _)) - .getOrElse( - throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) } /** @@ -473,15 +473,19 @@ private[parquet] object ParquetTypesConverter extends Logging { origPath: Path, conf: Option[Configuration], isBinaryAsString: Boolean): Seq[Attribute] = { + val metaData = readMetaData(origPath, conf) + if (metaData.isEmpty) { + return Seq.empty + } + val keyValueMetadata: java.util.Map[String, String] = - readMetaData(origPath, conf) - .getFileMetaData - .getKeyValueMetaData + metaData.map(_.getFileMetaData.getKeyValueMetaData).getOrElse(Map.empty[String, String]) + if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) { convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) } else { val attributes = convertToAttributes( - readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString) + readMetaData(origPath, conf).get.getFileMetaData.getSchema, isBinaryAsString) log.info(s"Falling back to schema conversion from Parquet types; result: $attributes") attributes } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 7ee4f3c1e93e..8a1a44c6d2e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -174,7 +174,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA // test default compression codec rdd.saveAsParquetFile(path) - var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + var actualCodec = ParquetTypesConverter + .readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) @@ -190,7 +191,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED") rdd.saveAsParquetFile(path) - actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + actualCodec = ParquetTypesConverter + .readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) @@ -206,7 +208,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none") rdd.saveAsParquetFile(path) - actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + actualCodec = ParquetTypesConverter + .readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct assert(actualCodec === "UNCOMPRESSED" :: Nil) @@ -222,7 +225,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip") rdd.saveAsParquetFile(path) - actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + actualCodec = ParquetTypesConverter + .readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) @@ -238,7 +242,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy") rdd.saveAsParquetFile(path) - actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) + actualCodec = ParquetTypesConverter + .readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)).get .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) @@ -341,7 +346,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA path, TestSQLContext.sparkContext.hadoopConfiguration) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job))) + val metaData = + ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job))).get assert(metaData != null) ParquetTestData .testData diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 56fc85239e1c..24b4006b565f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.types.StringType import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan} import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ -import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.sql.parquet.{ParquetTypesConverter, ParquetRelation} import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy} import scala.collection.JavaConversions._ @@ -104,6 +104,14 @@ private[hive] trait HiveStrategies { case a: AttributeReference => UnresolvedAttribute(a.name) }) + val path = relation.hiveQlTable.getPath + val conf = hiveContext.sparkContext.hadoopConfiguration + val meta = ParquetTypesConverter.readMetaData(path, Some(conf)) + + if (meta.isEmpty) { + ParquetRelation.createEmpty(path.toString, relation.attributes, true, conf, hiveContext) + } + if (relation.hiveQlTable.isPartitioned) { val rawPredicate = pruningPredicates.reduceOption(And).getOrElse(Literal(true)) // Translate the predicate so that it automatically casts the input values to the correct diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 6f57fe895838..e35fd3e44ba4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -106,6 +106,13 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames) } + test("SPARK-4552: query for empty parquet table get IllegalArgumentException") { + sql("CREATE TABLE parquet_test(key INT, value STRING)") + val result = sql("select count(*) from parquet_test limit 5").collect + assert(result.size == 1) + assert(result(0).getLong(0) == 0) + } + private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) { var counter = 0 (rddOne, rddTwo).zipped.foreach {