Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._

Expand All @@ -48,14 +44,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
private def tableRelationCache = sparkSession.sessionState.catalog.tableRelationCache
import HiveMetastoreCatalog._

private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase

def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
QualifiedTableName(
tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
tableIdent.table.toLowerCase)
}

/** These locks guard against multiple attempts to instantiate a table, which wastes memory. */
private val tableCreationLocks = Striped.lazyWeakLock(100)

Expand All @@ -68,11 +56,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
// Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
val dbLocation = sparkSession.sharedState.externalCatalog.getDatabase(dbName).locationUri
new Path(new Path(dbLocation), tblName).toString
// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
val key = QualifiedTableName(
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
table.table.toLowerCase)
tableRelationCache.getIfPresent(key)
}

private def getCached(
Expand Down Expand Up @@ -122,7 +111,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}

private def convertToLogicalRelation(
def convertToLogicalRelation(
relation: CatalogRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],
Expand Down Expand Up @@ -273,78 +262,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
case NonFatal(ex) =>
logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex)
}

/**
* When scanning or writing to non-partitioned Metastore Parquet tables, convert them to Parquet
* data source relations for better performance.
*/
object ParquetConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)
}

private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[ParquetFileFormat]
val mergeSchema = sessionState.conf.getConf(
HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString)

convertToLogicalRelation(relation, options, fileFormatClass, "parquet")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && shouldConvertMetastoreParquet(r) =>
InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists)

// Read path
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
shouldConvertMetastoreParquet(relation) =>
convertToParquetRelation(relation)
}
}
}

/**
* When scanning Metastore ORC tables, convert them to ORC data source relations
* for better performance.
*/
object OrcConversions extends Rule[LogicalPlan] {
private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = {
relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
sessionState.conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}

private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = {
val fileFormatClass = classOf[OrcFileFormat]
val options = Map[String, String]()

convertToLogicalRelation(relation, options, fileFormatClass, "orc")
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && shouldConvertMetastoreOrc(r) =>
InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists)

// Read path
case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) &&
shouldConvertMetastoreOrc(relation) =>
convertToOrcRelation(relation)
}
}
}
}


private[hive] object HiveMetastoreCatalog {
def mergeWithMetastoreSchema(
metastoreSchema: StructType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, DoubleType}
Expand All @@ -43,7 +41,7 @@ import org.apache.spark.util.Utils
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
globalTempViewManager: GlobalTempViewManager,
private val metastoreCatalog: HiveMetastoreCatalog,
val metastoreCatalog: HiveMetastoreCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
Expand All @@ -58,25 +56,6 @@ private[sql] class HiveSessionCatalog(
parser,
functionResourceLoader) {

// ----------------------------------------------------------------
// | Methods and fields for interacting with HiveMetastoreCatalog |
// ----------------------------------------------------------------

// These 2 rules must be run before all other DDL post-hoc resolution rules, i.e.
// `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions

def hiveDefaultTableFilePath(name: TableIdentifier): String = {
metastoreCatalog.hiveDefaultTableFilePath(name)
}

// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
val key = metastoreCatalog.getQualifiedTableName(table)
tableRelationCache.getIfPresent(key)
}

override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = {
makeFunctionBuilder(funcName, Utils.classForName(className))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session

override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
new DetermineTableStats(session) +:
catalog.ParquetConversions +:
catalog.OrcConversions +:
RelationConversions(conf, catalog) +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.hive

import java.io.IOException
import java.net.URI

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.StatsSetupConst
Expand All @@ -31,9 +30,11 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


/**
Expand Down Expand Up @@ -170,6 +171,55 @@ object HiveAnalysis extends Rule[LogicalPlan] {
}
}

/**
* Relation conversion from metastore relations to data source relations for better performance
*
* - When writing to non-partitioned Hive-serde Parquet/Orc tables
* - When scanning Hive-serde Parquet/ORC tables
*
* This rule must be run before all other DDL post-hoc resolution rules, i.e.
* `PreprocessTableCreation`, `PreprocessTableInsertion`, `DataSourceAnalysis` and `HiveAnalysis`.
*/
case class RelationConversions(
conf: SQLConf,
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
private def isConvertible(relation: CatalogRelation): Boolean = {
(relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") &&
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET)) ||
(relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") &&
conf.getConf(HiveUtils.CONVERT_METASTORE_ORC))
}

private def convert(relation: CatalogRelation): LogicalRelation = {
if (relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet")) {
val options = Map(ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = Map[String, String]()
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
}
}

override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists)

// Read path
case relation: CatalogRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
}
}
}

private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public void setUp() throws IOException {
path.delete();
}
HiveSessionCatalog catalog = (HiveSessionCatalog) sqlContext.sessionState().catalog();
hiveManagedPath = new Path(
catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
hiveManagedPath = new Path(catalog.defaultTablePath(new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
fs.delete(hiveManagedPath, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
// Table lookup will make the table cached.
spark.table(tableIndent)
statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
statsBeforeUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get

sql(s"INSERT INTO $tableName SELECT 2")
Expand All @@ -423,7 +423,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
}
spark.table(tableIndent)
statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
statsAfterUpdate = catalog.metastoreCatalog.getCachedDataSourceTable(tableIndent)
.asInstanceOf[LogicalRelation].catalogTable.get.stats.get
}
(statsBeforeUpdate, statsAfterUpdate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,9 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}

private def getCachedDataSourceTable(id: TableIdentifier): LogicalPlan = {
sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCachedDataSourceTable(id)
private def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
.getCachedDataSourceTable(table)
}

test("Caching converted data source Parquet Relations") {
Expand Down