diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 181f470b2a10..be43ac67e0ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -191,7 +191,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log defaultSource: FileFormat, fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val metastoreSchema = metastoreRelation.schema val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. @@ -237,21 +237,24 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log new Path(metastoreRelation.catalogTable.storage.locationUri.get), partitionSpec) - val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.map { inferred => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) - } else { - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get + val schema = fileType match { + case "parquet" => + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + case "orc" => + metastoreSchema + case _ => + throw new RuntimeException(s"Cannot convert a $fileType to a data source table") } val relation = HadoopFsRelation( sparkSession = sparkSession, location = fileCatalog, partitionSchema = partitionSchema, - dataSchema = inferredSchema, + dataSchema = schema, bucketSpec = bucketSpec, fileFormat = defaultSource, options = options) @@ -281,7 +284,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log DataSource( sparkSession = sparkSession, paths = paths, - userSpecifiedSchema = Some(metastoreRelation.schema), + userSpecifiedSchema = Some(metastoreSchema), bucketSpec = bucketSpec, options = options, className = fileType).resolveRelation(), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index b13878d57860..ec57a7c6846d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.orc +import java.io.File import java.nio.charset.StandardCharsets import org.scalatest.BeforeAndAfterAll @@ -372,6 +373,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("support empty orc table when converting hive serde table to data source table") { + withSQLConf((HiveUtils.CONVERT_METASTORE_ORC.key, "true")) { + withTable("empty_orc_partitioned") { + sql( + """ + |CREATE TABLE empty_orc_partitioned(key INT, value STRING) + |PARTITIONED BY (p INT) STORED AS ORC + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + + // Query empty table + checkAnswer( + sql("SELECT key, value FROM empty_orc_partitioned"), + emptyDF) + } + + withTable("empty_orc") { + sql( + """ + |CREATE TABLE empty_orc(key INT, value STRING) + |STORED AS ORC + """.stripMargin) + + val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1) + + // Query empty table + checkAnswer( + sql("SELECT key, value FROM empty_orc"), + emptyDF) + } + } + } + test("SPARK-10623 Enable ORC PPD") { withTempPath { dir => withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {