Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,32 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

def convertStorageFormat(storage: CatalogStorageFormat): CatalogStorageFormat = {
val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

if (serde.contains("parquet")) {
val options = storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
storage.copy(
serde = None,
properties = options
)
} else {
val options = storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
storage.copy(
serde = None,
properties = options
)
} else {
storage.copy(
serde = None,
properties = options
)
}
}
}

private def convertToLogicalRelation(
relation: HiveTableRelation,
options: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, InsertIntoDataSourceDirCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
Expand Down Expand Up @@ -186,6 +186,8 @@ object HiveAnalysis extends Rule[LogicalPlan] {
* - When writing to non-partitioned Hive-serde Parquet/Orc tables
* - When writing to partitioned Hive-serde Parquet/Orc tables when
* `spark.sql.hive.convertInsertingPartitionedTable` is true
* - When writing to directory with Hive-serde
* - When writing to non-partitioned Hive-serde Parquet/ORC tables using CTAS
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Update the comment of this rule, also add comment about CTAS

* - When scanning Hive-serde Parquet/ORC tables
*
* This rule must be run before all other DDL post-hoc resolution rules, i.e.
Expand All @@ -198,11 +200,20 @@ case class RelationConversions(
}

private def isConvertible(tableMeta: CatalogTable): Boolean = {
val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
isConvertible(tableMeta.storage)
}

private def isConvertible(storage: CatalogStorageFormat): Boolean = {
val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

private def convertProvider(storage: CatalogStorageFormat): String = {
val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
if (serde.contains("parquet")) "parquet" else "orc"
}

private val metastoreCatalog = sessionCatalog.metastoreCatalog

override def apply(plan: LogicalPlan): LogicalPlan = {
Expand Down Expand Up @@ -230,6 +241,16 @@ case class RelationConversions(
DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)

// INSERT HIVE DIR
case InsertIntoDir(_, storage, provider, query, overwrite)
if query.resolved && DDLUtils.isHiveTable(provider) &&
isConvertible(storage) && conf.getConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR) =>
val outputPath = new Path(storage.locationUri.get)
if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath)

InsertIntoDataSourceDirCommand(metastoreCatalog.convertStorageFormat(storage),
convertProvider(storage), query, overwrite)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(true)

val CONVERT_METASTORE_INSERT_DIR = buildConf("spark.sql.hive.convertMetastoreInsertDir")
.doc("When set to true, Spark will try to use built-in data source writer " +
"instead of Hive serde in INSERT OVERWRITE DIRECTORY. This flag is effective only if " +
"`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " +
"enabled respectively for Parquet and ORC formats")
.version("3.3.0")
.booleanConf
.createWithDefault(true)

val HIVE_METASTORE_SHARED_PREFIXES = buildStaticConf("spark.sql.hive.metastore.sharedPrefixes")
.doc("A comma separated list of class prefixes that should be loaded using the classloader " +
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton, TestHiveSparkSession}
Expand Down Expand Up @@ -2927,37 +2927,41 @@ class HiveDDLSuite
}

test("SPARK-33844, 37969: Insert overwrite directory should check schema too") {
withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
Seq("PARQUET", "ORC").foreach { format =>
val e = intercept[SparkException] {
spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
s"STORED AS $format SELECT ID, if(1=1, 1, 0), abs(id), '^-' FROM v")
}.getCause.getMessage
assert(e.contains("Column name \"(IF((1 = 1), 1, 0))\" contains" +
" invalid character(s). Please use alias to rename it."))
withSQLConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> "false") {
withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
Seq("PARQUET", "ORC").foreach { format =>
val e = intercept[SparkException] {
spark.sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}' " +
s"STORED AS $format SELECT ID, if(1=1, 1, 0), abs(id), '^-' FROM v")
}.getCause.getMessage
assert(e.contains("Column name \"(IF((1 = 1), 1, 0))\" contains" +
" invalid character(s). Please use alias to rename it."))
}
}
}
}
}

test("SPARK-36201: Add check for inner field of parquet/orc schema") {
withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
val e = intercept[SparkException] {
spark.sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}'
|STORED AS PARQUET
|SELECT
|NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1
|FROM v
withSQLConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> "false") {
withView("v") {
spark.range(1).createTempView("v")
withTempPath { path =>
val e = intercept[SparkException] {
spark.sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${path.getCanonicalPath}'
|STORED AS PARQUET
|SELECT
|NAMED_STRUCT('ID', ID, 'IF(ID=1,ID,0)', IF(ID=1,ID,0), 'B', ABS(ID)) AS col1
|FROM v
""".stripMargin)
}.getCause.getMessage
assert(e.contains("Column name \"IF(ID=1,ID,0)\" contains invalid character(s). " +
"Please use alias to rename it."))
}.getCause.getMessage
assert(e.contains("Column name \"IF(ID=1,ID,0)\" contains invalid character(s). " +
"Please use alias to rename it."))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.TestUncaughtExceptionHandler
import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.command.LoadDataCommand
import org.apache.spark.sql.execution.command.{InsertIntoDataSourceDirCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
Expand Down Expand Up @@ -2654,6 +2654,46 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
}
}
}

test("SPARK-38215: Hive Insert Dir should use data source if it is convertible") {
withTempView("p") {
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")

Seq("orc", "parquet").foreach { format =>
Seq(true, false).foreach { isConverted =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
Seq(true, false).foreach { isConvertedCtas =>
withSQLConf(HiveUtils.CONVERT_METASTORE_INSERT_DIR.key -> s"$isConvertedCtas") {
withTempDir { dir =>
val df = sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${dir.getAbsolutePath}'
|STORED AS $format
|SELECT 1
""".stripMargin)
val insertIntoDSDir = df.queryExecution.analyzed.collect {
case _: InsertIntoDataSourceDirCommand => true
}.headOption
val insertIntoHiveDir = df.queryExecution.analyzed.collect {
case _: InsertIntoHiveDirCommand => true
}.headOption
if (isConverted && isConvertedCtas) {
assert(insertIntoDSDir.nonEmpty)
assert(insertIntoHiveDir.isEmpty)
} else {
assert(insertIntoDSDir.isEmpty)
assert(insertIntoHiveDir.nonEmpty)
}
}
}
}
}
}
}
}
}
}

@SlowHiveTest
Expand Down