Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ case class CatalogTablePartition(
*
* Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
* future once we have a better understanding of how we want to handle skewed columns.
*
* @param hasUnsupportedFeatures is used to indicate whether all table metadata entries retrieved
* from the concrete underlying external catalog (e.g. Hive metastore) are supported by
* Spark SQL. For example, if the underlying Hive table has skewed columns, this information
* can't be mapped to [[CatalogTable]] since Spark SQL doesn't handle skewed columns for now.
* In this case `hasUnsupportedFeatures` is set to true. By default, it is false.
*/
case class CatalogTable(
identifier: TableIdentifier,
Expand All @@ -95,7 +101,8 @@ case class CatalogTable(
properties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None) {
comment: Option[String] = None,
hasUnsupportedFeatures: Boolean = false) {

// Verify that the provided columns are part of the schema
private val colNames = schema.map(_.name).toSet
Expand Down Expand Up @@ -200,6 +207,7 @@ case class SimpleCatalogRelation(
}
}

require(metadata.identifier.database == Some(databaseName),
require(
metadata.identifier.database.contains(databaseName),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contains is not in scala 2.10. Let me fixing the build.

"provided database does not match the one specified in the table definition")
}
Original file line number Diff line number Diff line change
Expand Up @@ -626,40 +626,149 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
showCreateDataSourceTable(tableMetadata)
} else {
throw new UnsupportedOperationException(
"SHOW CREATE TABLE only supports Spark SQL data source tables.")
showCreateHiveTable(tableMetadata)
}

Seq(Row(stmt))
}

private def showCreateHiveTable(metadata: CatalogTable): String = {
def reportUnsupportedError(): Unit = {
throw new UnsupportedOperationException(
Copy link
Contributor

@andrewor14 andrewor14 May 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Elsewhere we normally throw AnalysisException("Operation not allowed: ...")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, can you add tests for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this isn't an analysis failure, is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be consistent with other places and change this to AnalysisException.

s"Failed to execute SHOW CREATE TABLE against table ${metadata.identifier.quotedString}, " +
"because it contains table structure(s) (e.g. skewed columns) that Spark SQL doesn't " +
"support yet."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to explicitly say that the table was created by Hive with keywords that we do not support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should mention the exact keywords, or just saying that the table is created by Hive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced hasUnsupportedFeatures: Boolean with unsupportedFeatures: Seq[String], which holds string descriptions of unsupported features, so that we can list them in the exception message.

)
}

if (metadata.hasUnsupportedFeatures) {
reportUnsupportedError()
}

val builder = StringBuilder.newBuilder

val tableTypeString = metadata.tableType match {
case EXTERNAL => " EXTERNAL TABLE"
case VIEW => " VIEW"
case MANAGED => " TABLE"
case INDEX => reportUnsupportedError()
}

builder ++= s"CREATE$tableTypeString ${table.quotedString}"

if (metadata.tableType == VIEW) {
if (metadata.schema.nonEmpty) {
builder ++= metadata.schema.map(_.name).mkString("(", ", ", ")")
}
builder ++= metadata.viewText.mkString(" AS\n", "", "\n")
} else {
showHiveTableHeader(metadata, builder)
showHiveTableNonDataColumns(metadata, builder)
showHiveTableStorageInfo(metadata, builder)
showHiveTableProperties(metadata, builder)
}

builder.toString()
}

private def showHiveTableHeader(metadata: CatalogTable, builder: StringBuilder): Unit = {
val columns = metadata.schema.filterNot { column =>
metadata.partitionColumnNames.contains(column.name)
}.map(columnToDDLFragment)

if (columns.nonEmpty) {
builder ++= columns.mkString("(", ", ", ")\n")
}

metadata
.comment
.map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
.foreach(builder.append)
}

private def columnToDDLFragment(column: CatalogColumn): String = {
val comment = column.comment.map(escapeSingleQuotedString).map(" COMMENT '" + _ + "'")
s"${quoteIdentifier(column.name)} ${column.dataType}${comment.getOrElse("")}"
}

private def showHiveTableNonDataColumns(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.partitionColumns.nonEmpty) {
val partCols = metadata.partitionColumns.map(columnToDDLFragment)
builder ++= partCols.mkString("PARTITIONED BY (", ", ", ")\n")
}

if (metadata.bucketColumnNames.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't support this I don't think. You can't actually create a table with bucketed columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you remove this then you probably don't need this helper method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. Let's throw an exception since we do not allow users to create bucketed Hive tables.

throw new UnsupportedOperationException(
"Creating Hive table with bucket spec is not supported yet.")
}
}

private def showHiveTableStorageInfo(metadata: CatalogTable, builder: StringBuilder): Unit = {
val storage = metadata.storage

storage.serde.foreach { serde =>
builder ++= s"ROW FORMAT SERDE '$serde'\n"

val serdeProps = metadata.storage.serdeProperties.map {
case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}

builder ++= serdeProps.mkString("WITH SERDEPROPERTIES (", ",\n ", "\n)\n")
}

if (storage.inputFormat.isDefined || storage.outputFormat.isDefined) {
builder ++= "STORED AS\n"

storage.inputFormat.foreach { format =>
builder ++= s" INPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
}

storage.outputFormat.foreach { format =>
builder ++= s" OUTPUTFORMAT '${escapeSingleQuotedString(format)}'\n"
}
}

if (metadata.tableType == EXTERNAL) {
storage.locationUri.foreach { uri =>
builder ++= s"LOCATION '$uri'\n"
}
}
}

private def showHiveTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
if (metadata.properties.nonEmpty) {
val filteredProps = metadata.properties.filterNot {
// Skips "EXTERNAL" property for external tables
case (key, _) => key == "EXTERNAL" && metadata.tableType == EXTERNAL
}

val props = filteredProps.map { case (key, value) =>
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
}

if (props.nonEmpty) {
builder ++= props.mkString("TBLPROPERTIES (", ",\n ", ")\n")
}
}
}

private def showCreateDataSourceTable(metadata: CatalogTable): String = {
val builder = StringBuilder.newBuilder

builder ++= s"CREATE TABLE ${table.quotedString} "
showDataSourceTableDataCols(metadata, builder)
showDataSourceTableDataColumns(metadata, builder)
showDataSourceTableOptions(metadata, builder)
showDataSourceTableNonDataColumns(metadata, builder)

builder.toString()
}

private def showDataSourceTableDataCols(metadata: CatalogTable, builder: StringBuilder): Unit = {
val props = metadata.properties
val schemaParts = for {
numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
index <- 0 until numParts.toInt
} yield props.getOrElse(
s"spark.sql.sources.schema.part.$index",
throw new AnalysisException(
s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing."
)
)

if (schemaParts.nonEmpty) {
val fields = DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields
val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
builder ++= colTypeList.mkString("(", ", ", ")")
private def showDataSourceTableDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
DDLUtils.getSchemaFromTableProperties(metadata).foreach { schema =>
val columns = schema.fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}")
builder ++= columns.mkString("(", ", ", ")")
}

builder ++= "\n"
Expand Down Expand Up @@ -688,40 +797,21 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman

private def showDataSourceTableNonDataColumns(
metadata: CatalogTable, builder: StringBuilder): Unit = {
val props = metadata.properties

def getColumnNamesByType(colType: String, typeName: String): Seq[String] = {
(for {
numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
index <- 0 until numCols.toInt
} yield props.getOrElse(
s"spark.sql.sources.schema.${colType}Col.$index",
throw new AnalysisException(
s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
)
)).map(quoteIdentifier)
}

val partCols = getColumnNamesByType("part", "partitioning columns")
val partCols = DDLUtils.getPartitionColumnsFromTableProperties(metadata)
if (partCols.nonEmpty) {
builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
}

val bucketCols = getColumnNamesByType("bucket", "bucketing columns")
if (bucketCols.nonEmpty) {
builder ++= s"CLUSTERED BY ${bucketCols.mkString("(", ", ", ")")}\n"

val sortCols = getColumnNamesByType("sort", "sorting columns")
if (sortCols.nonEmpty) {
builder ++= s"SORTED BY ${sortCols.mkString("(", ", ", ")")}\n"
}
DDLUtils.getBucketSpecFromTableProperties(metadata).foreach { spec =>
if (spec.bucketColumnNames.nonEmpty) {
builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"

val numBuckets = props.getOrElse(
"spark.sql.sources.schema.numBuckets",
throw new AnalysisException("Corrupted bucket spec in catalog: missing bucket number")
)
if (spec.sortColumnNames.nonEmpty) {
builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n"
}

builder ++= s"INTO $numBuckets BUCKETS\n"
builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ private[hive] class HiveClientImpl(
// partition columns are part of the schema
val partCols = h.getPartCols.asScala.map(fromHiveColumn)
val schema = h.getCols.asScala.map(fromHiveColumn) ++ partCols

// Skew spec, storage handler, and bucketing info can't be mapped to CatalogTable (yet)
val hasUnsupportedFeatures =
!h.getSkewedColNames.isEmpty ||
h.getStorageHandler != null ||
!h.getBucketCols.isEmpty

CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
Expand All @@ -364,7 +371,8 @@ private[hive] class HiveClientImpl(
),
properties = h.getParameters.asScala.toMap,
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText))
viewText = Option(h.getViewExpandedText),
hasUnsupportedFeatures = hasUnsupportedFeatures)
}
}

Expand Down
Loading