diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e1faecedd20e..096481f68275 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -820,6 +820,14 @@ object DDLUtils { table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER } + def readHiveTable(table: CatalogTable): HiveTableRelation = { + HiveTableRelation( + table, + // Hive table columns are always nullable. + table.dataSchema.asNullable.toAttributes, + table.partitionSchema.asNullable.toAttributes) + } + /** * Throws a standard error for actions that require partitionProvider = hive. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index b304e2da6e1c..b5cf8c9515bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -244,27 +244,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] }) } - private def readHiveTable(table: CatalogTable): LogicalPlan = { - HiveTableRelation( - table, - // Hive table columns are always nullable. - table.dataSchema.asNullable.toAttributes, - table.partitionSchema.asNullable.toAttributes) - } - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta)) case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) => - i.copy(table = readHiveTable(tableMeta)) + i.copy(table = DDLUtils.readHiveTable(tableMeta)) case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) => readDataSourceTable(tableMeta) case UnresolvedCatalogRelation(tableMeta) => - readHiveTable(tableMeta) + DDLUtils.readHiveTable(tableMeta) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5823548a8063..03f4b8d83e35 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.util.Locale + import scala.util.control.NonFatal import com.google.common.util.concurrent.Striped @@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - def convertToLogicalRelation( + // Return true for Apache ORC and Hive ORC-related configuration names. + // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. + private def isOrcProperty(key: String) = + key.startsWith("orc.") || key.contains(".orc.") + + private def isParquetProperty(key: String) = + key.startsWith("parquet.") || key.contains(".parquet.") + + def convert(relation: HiveTableRelation): LogicalRelation = { + val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + + // Consider table and storage properties. For properties existing in both sides, storage + // properties will supersede table properties. + if (serde.contains("parquet")) { + val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ + relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> + SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) + convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") + } else { + val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ + relation.tableMeta.storage.properties + if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { + convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], + "orc") + } else { + convertToLogicalRelation( + relation, + options, + classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], + "orc") + } + } + } + + private def convertToLogicalRelation( relation: HiveTableRelation, options: Map[String, String], fileFormatClass: Class[_ <: FileFormat], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 07ee10540431..8a5ab188a949 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} -import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -181,49 +180,17 @@ case class RelationConversions( conf: SQLConf, sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] { private def isConvertible(relation: HiveTableRelation): Boolean = { - val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || - serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) + isConvertible(relation.tableMeta) } - // Return true for Apache ORC and Hive ORC-related configuration names. - // Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`. - private def isOrcProperty(key: String) = - key.startsWith("orc.") || key.contains(".orc.") - - private def isParquetProperty(key: String) = - key.startsWith("parquet.") || key.contains(".parquet.") - - private def convert(relation: HiveTableRelation): LogicalRelation = { - val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) - - // Consider table and storage properties. For properties existing in both sides, storage - // properties will supersede table properties. - if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ - relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> - conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) - sessionCatalog.metastoreCatalog - .convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") - } else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ - relation.tableMeta.storage.properties - if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { - sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat], - "orc") - } else { - sessionCatalog.metastoreCatalog.convertToLogicalRelation( - relation, - options, - classOf[org.apache.spark.sql.hive.orc.OrcFileFormat], - "orc") - } - } + private def isConvertible(tableMeta: CatalogTable): Boolean = { + val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || + serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private val metastoreCatalog = sessionCatalog.metastoreCatalog + override def apply(plan: LogicalPlan): LogicalPlan = { plan resolveOperators { // Write path @@ -231,12 +198,21 @@ case class RelationConversions( // Inserting into partitioned table is not supported in Parquet/Orc data source (yet). if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && !r.isPartitioned && isConvertible(r) => - InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists) + InsertIntoTable(metastoreCatalog.convert(r), partition, + query, overwrite, ifPartitionNotExists) // Read path case relation: HiveTableRelation if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => - convert(relation) + metastoreCatalog.convert(relation) + + // CTAS + case CreateTable(tableDesc, mode, Some(query)) + if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && + isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => + DDLUtils.checkDataColNames(tableDesc) + OptimizedCreateHiveTableAsSelectCommand( + tableDesc, query, query.output.map(_.name), mode) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 66067704195d..b60d4c71f594 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -110,6 +110,14 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) + val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas") + .doc("When set to true, Spark will try to use built-in data source writer " + + "instead of Hive serde in CTAS. This flag is effective only if " + + "`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " + + "enabled respectively for Parquet and ORC formats") + .booleanConf + .createWithDefault(true) + val HIVE_METASTORE_SHARED_PREFIXES = buildConf("spark.sql.hive.metastore.sharedPrefixes") .doc("A comma separated list of class prefixes that should be loaded using the classloader " + "that is shared between Spark SQL and a specific version of Hive. An example of classes " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index fd1e931ee0c7..608f21e72625 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -20,32 +20,26 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.DataWritingCommand +import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} +import org.apache.spark.sql.hive.HiveSessionCatalog +trait CreateHiveTableAsSelectBase extends DataWritingCommand { + val tableDesc: CatalogTable + val query: LogicalPlan + val outputColumnNames: Seq[String] + val mode: SaveMode -/** - * Create table and insert the query result into it. - * - * @param tableDesc the Table Describe, which may contain serde, storage handler etc. - * @param query the query whose result will be insert into the new relation - * @param mode SaveMode - */ -case class CreateHiveTableAsSelectCommand( - tableDesc: CatalogTable, - query: LogicalPlan, - outputColumnNames: Seq[String], - mode: SaveMode) - extends DataWritingCommand { - - private val tableIdentifier = tableDesc.identifier + protected val tableIdentifier = tableDesc.identifier override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (catalog.tableExists(tableIdentifier)) { + val tableExists = catalog.tableExists(tableIdentifier) + + if (tableExists) { assert(mode != SaveMode.Overwrite, s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") @@ -57,15 +51,8 @@ case class CreateHiveTableAsSelectCommand( return Seq.empty } - // For CTAS, there is no static partition values to insert. - val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap - InsertIntoHiveTable( - tableDesc, - partition, - query, - overwrite = false, - ifPartitionNotExists = false, - outputColumnNames = outputColumnNames).run(sparkSession, child) + val command = getWritingCommand(catalog, tableDesc, tableExists = true) + command.run(sparkSession, child) } else { // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data @@ -77,15 +64,8 @@ case class CreateHiveTableAsSelectCommand( try { // Read back the metadata of the table which was created just now. val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier) - // For CTAS, there is no static partition values to insert. - val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap - InsertIntoHiveTable( - createdTableMeta, - partition, - query, - overwrite = true, - ifPartitionNotExists = false, - outputColumnNames = outputColumnNames).run(sparkSession, child) + val command = getWritingCommand(catalog, createdTableMeta, tableExists = false) + command.run(sparkSession, child) } catch { case NonFatal(e) => // drop the created table. @@ -97,9 +77,89 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` which actually writes data into the table. + def getWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( + tableDesc: CatalogTable, + query: LogicalPlan, + outputColumnNames: Seq[String], + mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def getWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand = { + // For CTAS, there is no static partition values to insert. + val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap + InsertIntoHiveTable( + tableDesc, + partition, + query, + overwrite = if (tableExists) false else true, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } +} + +/** + * Create table and insert the query result into it. This creates Hive table but inserts + * the query result into it by using data source. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class OptimizedCreateHiveTableAsSelectCommand( + tableDesc: CatalogTable, + query: LogicalPlan, + outputColumnNames: Seq[String], + mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def getWritingCommand( + catalog: SessionCatalog, + tableDesc: CatalogTable, + tableExists: Boolean): DataWritingCommand = { + val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + val hiveTable = DDLUtils.readHiveTable(tableDesc) + + val hadoopRelation = metastoreCatalog.convert(hiveTable) match { + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t + case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " + + "HadoopFsRelation.") + } + + InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + if (tableExists) mode else SaveMode.Overwrite, + Some(tableDesc), + Some(hadoopRelation.location), + query.output.map(_.name)) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index e5c9df05d567..470c6a342b4d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton } } } + + test("SPARK-25271: write empty map into hive parquet table") { + import testImplicits._ + + Seq(Map(1 -> "a"), Map.empty[Int, String]).toDF("m").createOrReplaceTempView("p") + withTempView("p") { + val targetTable = "targetTable" + withTable(targetTable) { + sql(s"CREATE TABLE $targetTable STORED AS PARQUET AS SELECT m FROM p") + checkAnswer(sql(s"SELECT m FROM $targetTable"), + Row(Map(1 -> "a")) :: Row(Map.empty[Int, String]) :: Nil) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index fab2a27cdef1..6acf44606cbb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2276,6 +2276,46 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-25271: Hive ctas commands should use data source if it is convertible") { + withTempView("p") { + Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p") + + Seq("orc", "parquet").foreach { format => + Seq(true, false).foreach { isConverted => + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted", + HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") { + Seq(true, false).foreach { isConvertedCtas => + withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") { + + val targetTable = "targetTable" + withTable(targetTable) { + val df = sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p") + checkAnswer(sql(s"SELECT id FROM $targetTable"), + Row(1) :: Row(2) :: Row(3) :: Nil) + + val ctasDSCommand = df.queryExecution.analyzed.collect { + case _: OptimizedCreateHiveTableAsSelectCommand => true + }.headOption + val ctasCommand = df.queryExecution.analyzed.collect { + case _: CreateHiveTableAsSelectCommand => true + }.headOption + + if (isConverted && isConvertedCtas) { + assert(ctasDSCommand.nonEmpty) + assert(ctasCommand.isEmpty) + } else { + assert(ctasDSCommand.isEmpty) + assert(ctasCommand.nonEmpty) + } + } + } + } + } + } + } + } + } test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") { withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {