Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,119 +392,121 @@ private[hive] class HiveClientImpl(
dbName: String,
tableName: String): Option[CatalogTable] = withHiveState {
logDebug(s"Looking up $dbName.$tableName")
getRawTableOption(dbName, tableName).map { h =>
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val cols = h.getCols.asScala.map(fromHiveColumn)
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = StructType(cols ++ partCols)

val bucketSpec = if (h.getNumBuckets > 0) {
val sortColumnOrders = h.getSortCols.asScala
// Currently Spark only supports columns to be sorted in ascending order
// but Hive can support both ascending and descending order. If all the columns
// are sorted in ascending order, only then propagate the sortedness information
// to downstream processing / optimizations in Spark
// TODO: In future we can have Spark support columns sorted in descending order
val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC)

val sortColumnNames = if (allAscendingSorted) {
sortColumnOrders.map(_.getCol)
} else {
Seq.empty
}
Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames))
getRawTableOption(dbName, tableName).map(convertHiveTableToCatalogTable)
}

private def convertHiveTableToCatalogTable(h: HiveTable): CatalogTable = {
// Note: Hive separates partition columns and the schema, but for us the
// partition columns are part of the schema
val cols = h.getCols.asScala.map(fromHiveColumn)
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = StructType(cols ++ partCols)

val bucketSpec = if (h.getNumBuckets > 0) {
val sortColumnOrders = h.getSortCols.asScala
// Currently Spark only supports columns to be sorted in ascending order
// but Hive can support both ascending and descending order. If all the columns
// are sorted in ascending order, only then propagate the sortedness information
// to downstream processing / optimizations in Spark
// TODO: In future we can have Spark support columns sorted in descending order
val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC)

val sortColumnNames = if (allAscendingSorted) {
sortColumnOrders.map(_.getCol)
} else {
None
Seq.empty
}
Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames))
} else {
None
}

// Skew spec and storage handler can't be mapped to CatalogTable (yet)
val unsupportedFeatures = ArrayBuffer.empty[String]
// Skew spec and storage handler can't be mapped to CatalogTable (yet)
val unsupportedFeatures = ArrayBuffer.empty[String]

if (!h.getSkewedColNames.isEmpty) {
unsupportedFeatures += "skewed columns"
}
if (!h.getSkewedColNames.isEmpty) {
unsupportedFeatures += "skewed columns"
}

if (h.getStorageHandler != null) {
unsupportedFeatures += "storage handler"
}
if (h.getStorageHandler != null) {
unsupportedFeatures += "storage handler"
}

if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) {
unsupportedFeatures += "partitioned view"
}
if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) {
unsupportedFeatures += "partitioned view"
}

val properties = Option(h.getParameters).map(_.asScala.toMap).orNull
val properties = Option(h.getParameters).map(_.asScala.toMap).orNull

// Hive-generated Statistics are also recorded in ignoredProperties
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
for (key <- HiveStatisticsProperties; value <- properties.get(key)) {
ignoredProperties += key -> value
}
// Hive-generated Statistics are also recorded in ignoredProperties
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
for (key <- HiveStatisticsProperties; value <- properties.get(key)) {
ignoredProperties += key -> value
}

val excludedTableProperties = HiveStatisticsProperties ++ Set(
// The property value of "comment" is moved to the dedicated field "comment"
"comment",
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
"EXTERNAL"
)
val excludedTableProperties = HiveStatisticsProperties ++ Set(
// The property value of "comment" is moved to the dedicated field "comment"
"comment",
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
"EXTERNAL"
)

val filteredProperties = properties.filterNot {
case (key, _) => excludedTableProperties.contains(key)
}
val comment = properties.get("comment")

CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL
case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED
case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW
case unsupportedType =>
val tableTypeStr = unsupportedType.toString.toLowerCase(Locale.ROOT).replace("_", " ")
throw new AnalysisException(s"Hive $tableTypeStr is not supported.")
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
// If the table is written by Spark, we will put bucketing information in table properties,
// and will always overwrite the bucket spec in hive metastore by the bucketing information
// in table properties. This means, if we have bucket spec in both hive metastore and
// table properties, we will trust the one in table properties.
bucketSpec = bucketSpec,
owner = Option(h.getOwner).getOrElse(""),
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI),
// To avoid ClassNotFound exception, we try our best to not get the format class, but get
// the class name directly. However, for non-native tables, there is no interface to get
// the format class name, so we may still throw ClassNotFound in this case.
inputFormat = Option(h.getTTable.getSd.getInputFormat).orElse {
Option(h.getStorageHandler).map(_.getInputFormatClass.getName)
},
outputFormat = Option(h.getTTable.getSd.getOutputFormat).orElse {
Option(h.getStorageHandler).map(_.getOutputFormatClass.getName)
},
serde = Option(h.getSerializationLib),
compressed = h.getTTable.getSd.isCompressed,
properties = Option(h.getTTable.getSd.getSerdeInfo.getParameters)
.map(_.asScala.toMap).orNull
),
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
properties = filteredProperties,
stats = readHiveStats(properties),
comment = comment,
// In older versions of Spark(before 2.2.0), we expand the view original text and
// store that into `viewExpandedText`, that should be used in view resolution.
// We get `viewExpandedText` as viewText, and also get `viewOriginalText` in order to
// display the original view text in `DESC [EXTENDED|FORMATTED] table` command for views
// that created by older versions of Spark.
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText),
unsupportedFeatures = unsupportedFeatures,
ignoredProperties = ignoredProperties.toMap)
val filteredProperties = properties.filterNot {
case (key, _) => excludedTableProperties.contains(key)
}
val comment = properties.get("comment")

CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL
case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED
case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW
case unsupportedType =>
val tableTypeStr = unsupportedType.toString.toLowerCase(Locale.ROOT).replace("_", " ")
throw new AnalysisException(s"Hive $tableTypeStr is not supported.")
},
schema = schema,
partitionColumnNames = partCols.map(_.name),
// If the table is written by Spark, we will put bucketing information in table properties,
// and will always overwrite the bucket spec in hive metastore by the bucketing information
// in table properties. This means, if we have bucket spec in both hive metastore and
// table properties, we will trust the one in table properties.
bucketSpec = bucketSpec,
owner = Option(h.getOwner).getOrElse(""),
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI),
// To avoid ClassNotFound exception, we try our best to not get the format class, but get
// the class name directly. However, for non-native tables, there is no interface to get
// the format class name, so we may still throw ClassNotFound in this case.
inputFormat = Option(h.getTTable.getSd.getInputFormat).orElse {
Option(h.getStorageHandler).map(_.getInputFormatClass.getName)
},
outputFormat = Option(h.getTTable.getSd.getOutputFormat).orElse {
Option(h.getStorageHandler).map(_.getOutputFormatClass.getName)
},
serde = Option(h.getSerializationLib),
compressed = h.getTTable.getSd.isCompressed,
properties = Option(h.getTTable.getSd.getSerdeInfo.getParameters)
.map(_.asScala.toMap).orNull
),
// For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added
// in the function toHiveTable.
properties = filteredProperties,
stats = readHiveStats(properties),
comment = comment,
// In older versions of Spark(before 2.2.0), we expand the view original text and
// store that into `viewExpandedText`, that should be used in view resolution.
// We get `viewExpandedText` as viewText, and also get `viewOriginalText` in order to
// display the original view text in `DESC [EXTENDED|FORMATTED] table` command for views
// that created by older versions of Spark.
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText),
unsupportedFeatures = unsupportedFeatures,
ignoredProperties = ignoredProperties.toMap)
}

override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {
Expand Down