diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c905a52c4836b..b6f06f5989d2f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -156,6 +156,32 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } + def convertStorageFormat(storage: CatalogStorageFormat): CatalogStorageFormat = { + val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + + if (serde.contains("parquet")) { + val options = storage.properties + (ParquetOptions.MERGE_SCHEMA -> + SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) + storage.copy( + serde = None, + properties = options + ) + } else { + val options = storage.properties + if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { + storage.copy( + serde = None, + properties = options + ) + } else { + storage.copy( + serde = None, + properties = options + ) + } + } + } + private def convertToLogicalRelation( relation: HiveTableRelation, options: Map[String, String], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6a3de557e6e09..d1e222794a526 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, InsertIntoDataSourceDirCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec @@ -186,6 +186,8 @@ object HiveAnalysis extends Rule[LogicalPlan] { * - When writing to non-partitioned Hive-serde Parquet/Orc tables * - When writing to partitioned Hive-serde Parquet/Orc tables when * `spark.sql.hive.convertInsertingPartitionedTable` is true + * - When writing to directory with Hive-serde + * - When writing to non-partitioned Hive-serde Parquet/ORC tables using CTAS * - When scanning Hive-serde Parquet/ORC tables * * This rule must be run before all other DDL post-hoc resolution rules, i.e. @@ -198,11 +200,20 @@ case class RelationConversions( } private def isConvertible(tableMeta: CatalogTable): Boolean = { - val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + isConvertible(tableMeta.storage) + } + + private def isConvertible(storage: CatalogStorageFormat): Boolean = { + val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT) serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) || serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC) } + private def convertProvider(storage: CatalogStorageFormat): String = { + val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT) + if (serde.contains("parquet")) "parquet" else "orc" + } + private val metastoreCatalog = sessionCatalog.metastoreCatalog override def apply(plan: LogicalPlan): LogicalPlan = { @@ -230,6 +241,16 @@ case class RelationConversions( DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema)) OptimizedCreateHiveTableAsSelectCommand( tableDesc, query, query.output.map(_.name), mode) + + // INSERT HIVE DIR + case InsertIntoDir(_, storage, provider, query, overwrite) + if query.resolved && DDLUtils.isHiveTable(provider) && + isConvertible(storage) && conf.getConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR) => + val outputPath = new Path(storage.locationUri.get) + if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath) + + InsertIntoDataSourceDirCommand(metastoreCatalog.convertStorageFormat(storage), + convertProvider(storage), query, overwrite) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 93a38e524ebdc..911cb98588d78 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -160,6 +160,15 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) + val CONVERT_METASTORE_INSERT_DIR = buildConf("spark.sql.hive.convertMetastoreInsertDir") + .doc("When set to true, Spark will try to use built-in data source writer " + + "instead of Hive serde in INSERT OVERWRITE DIRECTORY. This flag is effective only if " + + "`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " + + "enabled respectively for Parquet and ORC formats") + .version("3.3.0") + .booleanConf + .createWithDefault(true) + val HIVE_METASTORE_SHARED_PREFIXES = buildStaticConf("spark.sql.hive.metastore.sharedPrefixes") .doc("A comma separated list of class prefixes that should be loaded using the classloader " + "that is shared between Spark SQL and a specific version of Hive. An example of classes " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c3c47c58b90be..5cede9e9edb4e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession} @@ -2927,37 +2927,41 @@ class HiveDDLSuite } test("SPARK-33844, 37969: Insert overwrite directory should check schema too") { - withView("v") { - spark.range(1).createTempView("v") - withTempPath { path => - Seq("PARQUET", "ORC").foreach { format => - val e = intercept[SparkException] { - spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " + - s"STORED AS $format SELECT ID, if(1=1, 1, 0), abs(id), '^-' FROM v") - }.getCause.getMessage - assert(e.contains("Column name \"(IF((1 = 1), 1, 0))\" contains" + - " invalid character(s). Please use alias to rename it.")) + withSQLConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> "false") { + withView("v") { + spark.range(1).createTempView("v") + withTempPath { path => + Seq("PARQUET", "ORC").foreach { format => + val e = intercept[SparkException] { + spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " + + s"STORED AS $format SELECT ID, if(1=1, 1, 0), abs(id), '^-' FROM v") + }.getCause.getMessage + assert(e.contains("Column name \"(IF((1 = 1), 1, 0))\" contains" + + " invalid character(s). Please use alias to rename it.")) + } } } } } test("SPARK-36201: Add check for inner field of parquet/orc schema") { - withView("v") { - spark.range(1).createTempView("v") - withTempPath { path => - val e = intercept[SparkException] { - spark.sql( - s""" - |INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' - |STORED AS PARQUET - |SELECT - |NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1 - |FROM v + withSQLConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> "false") { + withView("v") { + spark.range(1).createTempView("v") + withTempPath { path => + val e = intercept[SparkException] { + spark.sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' + |STORED AS PARQUET + |SELECT + |NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1 + |FROM v """.stripMargin) - }.getCause.getMessage - assert(e.contains("Column name \"IF(ID=1,ID,0)\" contains invalid character(s). " + - "Please use alias to rename it.")) + }.getCause.getMessage + assert(e.contains("Column name \"IF(ID=1,ID,0)\" contains invalid character(s). " + + "Please use alias to rename it.")) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index d3f5d7613ace7..f2711db839913 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.TestUncaughtExceptionHandler import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} -import org.apache.spark.sql.execution.command.LoadDataCommand +import org.apache.spark.sql.execution.command.{InsertIntoDataSourceDirCommand, LoadDataCommand} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} @@ -2654,6 +2654,46 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi } } } + + test("SPARK-38215: Hive Insert Dir should use data source if it is convertible") { + withTempView("p") { + Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p") + + Seq("orc", "parquet").foreach { format => + Seq(true, false).foreach { isConverted => + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted", + HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") { + Seq(true, false).foreach { isConvertedCtas => + withSQLConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> s"$isConvertedCtas") { + withTempDir { dir => + val df = sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '${dir.getAbsolutePath}' + |STORED AS $format + |SELECT 1 + """.stripMargin) + val insertIntoDSDir = df.queryExecution.analyzed.collect { + case _: InsertIntoDataSourceDirCommand => true + }.headOption + val insertIntoHiveDir = df.queryExecution.analyzed.collect { + case _: InsertIntoHiveDirCommand => true + }.headOption + if (isConverted && isConvertedCtas) { + assert(insertIntoDSDir.nonEmpty) + assert(insertIntoHiveDir.isEmpty) + } else { + assert(insertIntoDSDir.isEmpty) + assert(insertIntoHiveDir.nonEmpty) + } + } + } + } + } + } + } + } + } } @SlowHiveTest