diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 2f8ac825fc7a..b8d5f2148df1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -392,119 +392,121 @@ private[hive] class HiveClientImpl( dbName: String, tableName: String): Option[CatalogTable] = withHiveState { logDebug(s"Looking up $dbName.$tableName") - getRawTableOption(dbName, tableName).map { h => - // Note: Hive separates partition columns and the schema, but for us the - // partition columns are part of the schema - val cols = h.getCols.asScala.map(fromHiveColumn) - val partCols = h.getPartCols.asScala.map(fromHiveColumn) - val schema = StructType(cols ++ partCols) - - val bucketSpec = if (h.getNumBuckets > 0) { - val sortColumnOrders = h.getSortCols.asScala - // Currently Spark only supports columns to be sorted in ascending order - // but Hive can support both ascending and descending order. If all the columns - // are sorted in ascending order, only then propagate the sortedness information - // to downstream processing / optimizations in Spark - // TODO: In future we can have Spark support columns sorted in descending order - val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC) - - val sortColumnNames = if (allAscendingSorted) { - sortColumnOrders.map(_.getCol) - } else { - Seq.empty - } - Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames)) + getRawTableOption(dbName, tableName).map(convertHiveTableToCatalogTable) + } + + private def convertHiveTableToCatalogTable(h: HiveTable): CatalogTable = { + // Note: Hive separates partition columns and the schema, but for us the + // partition columns are part of the schema + val cols = h.getCols.asScala.map(fromHiveColumn) + val partCols = h.getPartCols.asScala.map(fromHiveColumn) + val schema = StructType(cols ++ partCols) + + val bucketSpec = if (h.getNumBuckets > 0) { + val sortColumnOrders = h.getSortCols.asScala + // Currently Spark only supports columns to be sorted in ascending order + // but Hive can support both ascending and descending order. If all the columns + // are sorted in ascending order, only then propagate the sortedness information + // to downstream processing / optimizations in Spark + // TODO: In future we can have Spark support columns sorted in descending order + val allAscendingSorted = sortColumnOrders.forall(_.getOrder == HIVE_COLUMN_ORDER_ASC) + + val sortColumnNames = if (allAscendingSorted) { + sortColumnOrders.map(_.getCol) } else { - None + Seq.empty } + Option(BucketSpec(h.getNumBuckets, h.getBucketCols.asScala, sortColumnNames)) + } else { + None + } - // Skew spec and storage handler can't be mapped to CatalogTable (yet) - val unsupportedFeatures = ArrayBuffer.empty[String] + // Skew spec and storage handler can't be mapped to CatalogTable (yet) + val unsupportedFeatures = ArrayBuffer.empty[String] - if (!h.getSkewedColNames.isEmpty) { - unsupportedFeatures += "skewed columns" - } + if (!h.getSkewedColNames.isEmpty) { + unsupportedFeatures += "skewed columns" + } - if (h.getStorageHandler != null) { - unsupportedFeatures += "storage handler" - } + if (h.getStorageHandler != null) { + unsupportedFeatures += "storage handler" + } - if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) { - unsupportedFeatures += "partitioned view" - } + if (h.getTableType == HiveTableType.VIRTUAL_VIEW && partCols.nonEmpty) { + unsupportedFeatures += "partitioned view" + } - val properties = Option(h.getParameters).map(_.asScala.toMap).orNull + val properties = Option(h.getParameters).map(_.asScala.toMap).orNull - // Hive-generated Statistics are also recorded in ignoredProperties - val ignoredProperties = scala.collection.mutable.Map.empty[String, String] - for (key <- HiveStatisticsProperties; value <- properties.get(key)) { - ignoredProperties += key -> value - } + // Hive-generated Statistics are also recorded in ignoredProperties + val ignoredProperties = scala.collection.mutable.Map.empty[String, String] + for (key <- HiveStatisticsProperties; value <- properties.get(key)) { + ignoredProperties += key -> value + } - val excludedTableProperties = HiveStatisticsProperties ++ Set( - // The property value of "comment" is moved to the dedicated field "comment" - "comment", - // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added - // in the function toHiveTable. - "EXTERNAL" - ) + val excludedTableProperties = HiveStatisticsProperties ++ Set( + // The property value of "comment" is moved to the dedicated field "comment" + "comment", + // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added + // in the function toHiveTable. + "EXTERNAL" + ) - val filteredProperties = properties.filterNot { - case (key, _) => excludedTableProperties.contains(key) - } - val comment = properties.get("comment") - - CatalogTable( - identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), - tableType = h.getTableType match { - case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL - case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED - case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW - case unsupportedType => - val tableTypeStr = unsupportedType.toString.toLowerCase(Locale.ROOT).replace("_", " ") - throw new AnalysisException(s"Hive $tableTypeStr is not supported.") - }, - schema = schema, - partitionColumnNames = partCols.map(_.name), - // If the table is written by Spark, we will put bucketing information in table properties, - // and will always overwrite the bucket spec in hive metastore by the bucketing information - // in table properties. This means, if we have bucket spec in both hive metastore and - // table properties, we will trust the one in table properties. - bucketSpec = bucketSpec, - owner = Option(h.getOwner).getOrElse(""), - createTime = h.getTTable.getCreateTime.toLong * 1000, - lastAccessTime = h.getLastAccessTime.toLong * 1000, - storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI), - // To avoid ClassNotFound exception, we try our best to not get the format class, but get - // the class name directly. However, for non-native tables, there is no interface to get - // the format class name, so we may still throw ClassNotFound in this case. - inputFormat = Option(h.getTTable.getSd.getInputFormat).orElse { - Option(h.getStorageHandler).map(_.getInputFormatClass.getName) - }, - outputFormat = Option(h.getTTable.getSd.getOutputFormat).orElse { - Option(h.getStorageHandler).map(_.getOutputFormatClass.getName) - }, - serde = Option(h.getSerializationLib), - compressed = h.getTTable.getSd.isCompressed, - properties = Option(h.getTTable.getSd.getSerdeInfo.getParameters) - .map(_.asScala.toMap).orNull - ), - // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added - // in the function toHiveTable. - properties = filteredProperties, - stats = readHiveStats(properties), - comment = comment, - // In older versions of Spark(before 2.2.0), we expand the view original text and - // store that into `viewExpandedText`, that should be used in view resolution. - // We get `viewExpandedText` as viewText, and also get `viewOriginalText` in order to - // display the original view text in `DESC [EXTENDED|FORMATTED] table` command for views - // that created by older versions of Spark. - viewOriginalText = Option(h.getViewOriginalText), - viewText = Option(h.getViewExpandedText), - unsupportedFeatures = unsupportedFeatures, - ignoredProperties = ignoredProperties.toMap) + val filteredProperties = properties.filterNot { + case (key, _) => excludedTableProperties.contains(key) } + val comment = properties.get("comment") + + CatalogTable( + identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), + tableType = h.getTableType match { + case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL + case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED + case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW + case unsupportedType => + val tableTypeStr = unsupportedType.toString.toLowerCase(Locale.ROOT).replace("_", " ") + throw new AnalysisException(s"Hive $tableTypeStr is not supported.") + }, + schema = schema, + partitionColumnNames = partCols.map(_.name), + // If the table is written by Spark, we will put bucketing information in table properties, + // and will always overwrite the bucket spec in hive metastore by the bucketing information + // in table properties. This means, if we have bucket spec in both hive metastore and + // table properties, we will trust the one in table properties. + bucketSpec = bucketSpec, + owner = Option(h.getOwner).getOrElse(""), + createTime = h.getTTable.getCreateTime.toLong * 1000, + lastAccessTime = h.getLastAccessTime.toLong * 1000, + storage = CatalogStorageFormat( + locationUri = shim.getDataLocation(h).map(CatalogUtils.stringToURI), + // To avoid ClassNotFound exception, we try our best to not get the format class, but get + // the class name directly. However, for non-native tables, there is no interface to get + // the format class name, so we may still throw ClassNotFound in this case. + inputFormat = Option(h.getTTable.getSd.getInputFormat).orElse { + Option(h.getStorageHandler).map(_.getInputFormatClass.getName) + }, + outputFormat = Option(h.getTTable.getSd.getOutputFormat).orElse { + Option(h.getStorageHandler).map(_.getOutputFormatClass.getName) + }, + serde = Option(h.getSerializationLib), + compressed = h.getTTable.getSd.isCompressed, + properties = Option(h.getTTable.getSd.getSerdeInfo.getParameters) + .map(_.asScala.toMap).orNull + ), + // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added + // in the function toHiveTable. + properties = filteredProperties, + stats = readHiveStats(properties), + comment = comment, + // In older versions of Spark(before 2.2.0), we expand the view original text and + // store that into `viewExpandedText`, that should be used in view resolution. + // We get `viewExpandedText` as viewText, and also get `viewOriginalText` in order to + // display the original view text in `DESC [EXTENDED|FORMATTED] table` command for views + // that created by older versions of Spark. + viewOriginalText = Option(h.getViewOriginalText), + viewText = Option(h.getViewExpandedText), + unsupportedFeatures = unsupportedFeatures, + ignoredProperties = ignoredProperties.toMap) } override def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = withHiveState {