Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,14 @@ object DDLUtils {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}

def readHiveTable(table: CatalogTable): HiveTableRelation = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

/**
* Throws a standard error for actions that require partitionProvider = hive.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,27 +244,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
})
}

private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))

case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
i.copy(table = readHiveTable(tableMeta))
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta)

case UnresolvedCatalogRelation(tableMeta) =>
readHiveTable(tableMeta)
DDLUtils.readHiveTable(tableMeta)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import java.util.Locale

import scala.util.control.NonFatal

import com.google.common.util.concurrent.Striped
Expand All @@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

def convertToLogicalRelation(
// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}

private def convertToLogicalRelation(
relation: HiveTableRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}

Expand Down Expand Up @@ -181,62 +180,39 @@ case class RelationConversions(
conf: SQLConf,
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
private def isConvertible(relation: HiveTableRelation): Boolean = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
isConvertible(relation.tableMeta)
}

// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")

private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")

private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)

// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
private def isConvertible(tableMeta: CatalogTable): Boolean = {
val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

private val metastoreCatalog = sessionCatalog.metastoreCatalog

override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write path
case InsertIntoTable(r: HiveTableRelation, partition, query, overwrite, ifPartitionNotExists)
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
InsertIntoTable(metastoreCatalog.convert(r), partition,
query, overwrite, ifPartitionNotExists)

// Read path
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
metastoreCatalog.convert(relation)

// CTAS
case CreateTable(tableDesc, mode, Some(query))
if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty &&
isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
DDLUtils.checkDataColNames(tableDesc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In HiveAnalysis, when transforming CreateTable to CreateHiveTableAsSelectCommand, it has this too. checkDataColNames checks if any invalid character is using in column name.

OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(true)

val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas")
.doc("When set to true, Spark will try to use built-in data source writer " +
"instead of Hive serde in CTAS. This flag is effective only if " +
"`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " +
"enabled respectively for Parquet and ORC formats")
.booleanConf
.createWithDefault(true)

val HIVE_METASTORE_SHARED_PREFIXES = buildConf("spark.sql.hive.metastore.sharedPrefixes")
.doc("A comma separated list of class prefixes that should be loaded using the classloader " +
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,26 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.HiveSessionCatalog

trait CreateHiveTableAsSelectBase extends DataWritingCommand {
val tableDesc: CatalogTable
val query: LogicalPlan
val outputColumnNames: Seq[String]
val mode: SaveMode

/**
* Create table and insert the query result into it.
*
* @param tableDesc the Table Describe, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends DataWritingCommand {

private val tableIdentifier = tableDesc.identifier
protected val tableIdentifier = tableDesc.identifier

override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more thoughts:

CreateHiveTableAsSelectCommand just runs another command, so we will not get any metric for this plan node. It's OK if we use the hive writer, as we indeed can't get any metrics(the writing is done by hive). However, if we can convert and use Spark's native writer, we do have metrics. I think a better fix is to replace Hive CTAS with data source CTAS during optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the table metadata created by data source CTAS and Hive CTAS are different?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then how about we create a special Hive CTAS command that follows data source CTAS command but creates Hive table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought about it. But then we will have two Hive CTAS commands. Is it good for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with that, since we do have 2 different ways to do Hive CTAS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a Hive CTAS with data source command.

val catalog = sparkSession.sessionState.catalog
if (catalog.tableExists(tableIdentifier)) {
val tableExists = catalog.tableExists(tableIdentifier)

if (tableExists) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")

Expand All @@ -57,15 +51,8 @@ case class CreateHiveTableAsSelectCommand(
return Seq.empty
}

// For CTAS, there is no static partition values to insert.
val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
tableDesc,
partition,
query,
overwrite = false,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
val command = getWritingCommand(catalog, tableDesc, tableExists = true)
command.run(sparkSession, child)
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
Expand All @@ -77,15 +64,8 @@ case class CreateHiveTableAsSelectCommand(
try {
// Read back the metadata of the table which was created just now.
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
// For CTAS, there is no static partition values to insert.
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
createdTableMeta,
partition,
query,
overwrite = true,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
val command = getWritingCommand(catalog, createdTableMeta, tableExists = false)
command.run(sparkSession, child)
} catch {
case NonFatal(e) =>
// drop the created table.
Expand All @@ -97,9 +77,89 @@ case class CreateHiveTableAsSelectCommand(
Seq.empty[Row]
}

// Returns `DataWritingCommand` which actually writes data into the table.
def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand

override def argString: String = {
s"[Database:${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}, " +
s"InsertIntoHiveTable]"
}
}

/**
* Create table and insert the query result into it.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {

override def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
// For CTAS, there is no static partition values to insert.
val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
tableDesc,
partition,
query,
overwrite = if (tableExists) false else true,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames)
}
}

/**
* Create table and insert the query result into it. This creates Hive table but inserts
* the query result into it by using data source.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class OptimizedCreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {

override def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
val hiveTable = DDLUtils.readHiveTable(tableDesc)

val hadoopRelation = metastoreCatalog.convert(hiveTable) match {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " +
"HadoopFsRelation.")
}

InsertIntoHadoopFsRelationCommand(
hadoopRelation.location.rootPaths.head,
Map.empty, // We don't support to convert partitioned table.
false,
Seq.empty, // We don't support to convert partitioned table.
hadoopRelation.bucketSpec,
hadoopRelation.fileFormat,
hadoopRelation.options,
query,
if (tableExists) mode else SaveMode.Overwrite,
Some(tableDesc),
Some(hadoopRelation.location),
query.output.map(_.name))
}
}
Loading