Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,14 @@ object DDLUtils {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

def readHiveTable(table: CatalogTable): HiveTableRelation = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

/**
* Throws a standard error for actions that require partitionProvider = hive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
Expand Down Expand Up @@ -244,27 +243,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
})
}

private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
i.copy(table = readHiveTable(tableMeta))
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta)

case UnresolvedCatalogRelation(tableMeta) =>
readHiveTable(tableMeta)
DDLUtils.readHiveTable(tableMeta)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import java.util.Locale

import scala.util.control.NonFatal

import com.google.common.util.concurrent.Striped
Expand All @@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

def convertToLogicalRelation(
// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}

private def convertToLogicalRelation(
relation: HiveTableRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}

Expand Down Expand Up @@ -181,62 +180,39 @@ case class RelationConversions(
conf: SQLConf,
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
private def isConvertible(relation: HiveTableRelation): Boolean = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
isConvertible(relation.tableMeta)
}

// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
private def isConvertible(tableMeta: CatalogTable): Boolean = {
val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

private val metastoreCatalog = sessionCatalog.metastoreCatalog

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write path
case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
InsertIntoTable(metastoreCatalog.convert(r), partition,
query, overwrite, ifPartitionNotExists)

// Read path
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
metastoreCatalog.convert(relation)

// CTAS
case CreateTable(tableDesc, mode, Some(query))
if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty &&
isConvertible(tableDesc) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an internal SQL conf here? The perf impact is huge. It could be better or worse.

Also add it to the migration guide and explain the behavior changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a new optimization... It's an optimization we dropped in 2.3 by mistake.

I'm fine to add a config with default value true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, the optimization is already controlled by configs like HiveUtils.CONVERT_METASTORE_ORC and HiveUtils.CONVERT_METASTORE_PARQUET. Do we need another config for it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind to add HiveUtils.CONVERT_METASTORE_ORC_CTAS, maybe we can do it in a followup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the regression was already introduced, we need to add a conf and migration guide.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually don't write a migration guide for perf optimizations. Otherwise it's annoying to write one for each optimization and ask users to turn it off if something goes wrong. I think we only do that when there are known issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not for perf optimization only. This is using different write paths. Thus, we could have different limits/bugs that might be exposed after this code change. We just let the community aware of this change.

DDLUtils.checkDataColNames(tableDesc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In HiveAnalysis, when transforming CreateTable to CreateHiveTableAsSelectCommand, it has this too. checkDataColNames checks if any invalid character is using in column name.

OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}
}
}
Expand Down
Loading