-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20067] [SQL] Unify and Clean Up Desc Commands Using Catalog Interface #17394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
2ebeac8
8720919
d999147
3859097
1d72079
68bb05c
ac3f351
bef1134
29817ea
6c56041
36b501e
e116018
a6db8a3
43668be
862a4d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.catalog | |
| import java.net.URI | ||
| import java.util.Date | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| import com.google.common.base.Objects | ||
|
|
||
| import org.apache.spark.sql.AnalysisException | ||
|
|
@@ -57,20 +59,25 @@ case class CatalogStorageFormat( | |
| properties: Map[String, String]) { | ||
|
|
||
| override def toString: String = { | ||
| val serdePropsToString = CatalogUtils.maskCredentials(properties) match { | ||
| case props if props.isEmpty => "" | ||
| case props => "Properties: " + props.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") | ||
| } | ||
| val output = | ||
| Seq(locationUri.map("Location: " + _).getOrElse(""), | ||
| inputFormat.map("InputFormat: " + _).getOrElse(""), | ||
| outputFormat.map("OutputFormat: " + _).getOrElse(""), | ||
| if (compressed) "Compressed" else "", | ||
| serde.map("Serde: " + _).getOrElse(""), | ||
| serdePropsToString) | ||
| output.filter(_.nonEmpty).mkString("Storage(", ", ", ")") | ||
| toLinkedHashMap.map { case ((key, value)) => | ||
| if (value.isEmpty) key else s"$key: $value" | ||
| }.mkString("Storage(", ", ", ")") | ||
| } | ||
|
|
||
| def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { | ||
| val map = new mutable.LinkedHashMap[String, String]() | ||
| locationUri.foreach(l => map.put("Location", l.toString)) | ||
| serde.foreach(map.put("Serde Library", _)) | ||
| inputFormat.foreach(map.put("InputFormat", _)) | ||
| outputFormat.foreach(map.put("OutputFormat", _)) | ||
| if (compressed) map.put("Compressed", "") | ||
| CatalogUtils.maskCredentials(properties) match { | ||
| case props if props.isEmpty => // No-op | ||
| case props => | ||
| map.put("Properties", props.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]")) | ||
| } | ||
| map | ||
| } | ||
| } | ||
|
|
||
| object CatalogStorageFormat { | ||
|
|
@@ -91,15 +98,28 @@ case class CatalogTablePartition( | |
| storage: CatalogStorageFormat, | ||
| parameters: Map[String, String] = Map.empty) { | ||
|
|
||
| override def toString: String = { | ||
| def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { | ||
| val map = new mutable.LinkedHashMap[String, String]() | ||
| val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ") | ||
| val output = | ||
| Seq( | ||
| s"Partition Values: [$specString]", | ||
| s"$storage", | ||
| s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}") | ||
| map.put("Partition Values", s"[$specString]") | ||
| map ++= storage.toLinkedHashMap | ||
| if (parameters.nonEmpty) { | ||
| map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}") | ||
| } | ||
| map | ||
| } | ||
|
|
||
| output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")") | ||
| override def toString: String = { | ||
| toLinkedHashMap.map { case ((key, value)) => | ||
| if (value.isEmpty) key else s"$key: $value" | ||
| }.mkString("CatalogPartition(\n\t", "\n\t", ")") | ||
| } | ||
|
|
||
| /** Readable string representation for the CatalogTablePartition. */ | ||
| def simpleString: String = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when shall we call
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| toLinkedHashMap.map { case ((key, value)) => | ||
| if (value.isEmpty) key else s"$key: $value" | ||
| }.mkString("", "\n", "") | ||
| } | ||
|
|
||
| /** Return the partition location, assuming it is specified. */ | ||
|
|
@@ -154,6 +174,14 @@ case class BucketSpec( | |
| } | ||
| s"$numBuckets buckets, $bucketString$sortString" | ||
| } | ||
|
|
||
| def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { | ||
| mutable.LinkedHashMap[String, String]( | ||
| "Num Buckets" -> numBuckets.toString, | ||
| "Bucket Columns" -> bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]"), | ||
| "Sort Columns" -> sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -261,40 +289,49 @@ case class CatalogTable( | |
| locationUri, inputFormat, outputFormat, serde, compressed, properties)) | ||
| } | ||
|
|
||
| override def toString: String = { | ||
|
|
||
| def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { | ||
| val map = new mutable.LinkedHashMap[String, String]() | ||
| val tableProperties = properties.map(p => p._1 + "=" + p._2).mkString("[", ", ", "]") | ||
| val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") | ||
| val bucketStrings = bucketSpec match { | ||
| case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => | ||
| val bucketColumnsString = bucketColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") | ||
| val sortColumnsString = sortColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") | ||
| Seq( | ||
| s"Num Buckets: $numBuckets", | ||
| if (bucketColumnNames.nonEmpty) s"Bucket Columns: $bucketColumnsString" else "", | ||
| if (sortColumnNames.nonEmpty) s"Sort Columns: $sortColumnsString" else "" | ||
| ) | ||
|
|
||
| case _ => Nil | ||
|
|
||
| map.put("Table", identifier.quotedString) | ||
| if (owner.nonEmpty) map.put("Owner", owner) | ||
| map.put("Created", new Date(createTime).toString) | ||
| map.put("Last Access", new Date(lastAccessTime).toString) | ||
| map.put("Type", tableType.name) | ||
| provider.foreach(map.put("Provider", _)) | ||
| bucketSpec.foreach(map ++= _.toLinkedHashMap) | ||
| comment.foreach(map.put("Comment", _)) | ||
| if (tableType == CatalogTableType.VIEW) { | ||
| viewText.foreach(map.put("View Text", _)) | ||
| viewDefaultDatabase.foreach(map.put("View Default Database", _)) | ||
| if (viewQueryColumnNames.nonEmpty) { | ||
| map.put("View Query Output Columns", viewQueryColumnNames.mkString("[", ", ", "]")) | ||
| } | ||
| } | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should de-duplicate the codes. |
||
|
|
||
| val output = | ||
| Seq(s"Table: ${identifier.quotedString}", | ||
| if (owner.nonEmpty) s"Owner: $owner" else "", | ||
| s"Created: ${new Date(createTime).toString}", | ||
| s"Last Access: ${new Date(lastAccessTime).toString}", | ||
| s"Type: ${tableType.name}", | ||
| if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "", | ||
| if (provider.isDefined) s"Provider: ${provider.get}" else "", | ||
| if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else "" | ||
| ) ++ bucketStrings ++ Seq( | ||
| viewText.map("View: " + _).getOrElse(""), | ||
| comment.map("Comment: " + _).getOrElse(""), | ||
| if (properties.nonEmpty) s"Properties: $tableProperties" else "", | ||
| if (stats.isDefined) s"Statistics: ${stats.get.simpleString}" else "", | ||
| s"$storage", | ||
| if (tracksPartitionsInCatalog) "Partition Provider: Catalog" else "") | ||
|
|
||
| output.filter(_.nonEmpty).mkString("CatalogTable(\n\t", "\n\t", ")") | ||
| if (properties.nonEmpty) map.put("Properties", tableProperties) | ||
| stats.foreach(s => map.put("Statistics", s.simpleString)) | ||
| map ++= storage.toLinkedHashMap | ||
| if (tracksPartitionsInCatalog) map.put("Partition Provider", "Catalog") | ||
| if (partitionColumnNames.nonEmpty) map.put("Partition Columns", partitionColumns) | ||
| if (schema.nonEmpty) map.put("Schema", schema.treeString) | ||
|
|
||
| map | ||
| } | ||
|
|
||
| override def toString: String = { | ||
| toLinkedHashMap.map { case ((key, value)) => | ||
| if (value.isEmpty) key else s"$key: $value" | ||
| }.mkString("CatalogTable(\n", "\n", ")") | ||
| } | ||
|
|
||
| /** Readable string representation for the CatalogTable. */ | ||
| def simpleString: String = { | ||
| toLinkedHashMap.map { case ((key, value)) => | ||
| if (value.isEmpty) key else s"$key: $value" | ||
| }.mkString("", "\n", "") | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -500,8 +500,7 @@ case class TruncateTableCommand( | |
| case class DescribeTableCommand( | ||
| table: TableIdentifier, | ||
| partitionSpec: TablePartitionSpec, | ||
| isExtended: Boolean, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, let me make the change. |
||
| isFormatted: Boolean) | ||
| isExtended: Boolean) | ||
| extends RunnableCommand { | ||
|
|
||
| override val output: Seq[Attribute] = Seq( | ||
|
|
@@ -536,14 +535,10 @@ case class DescribeTableCommand( | |
|
|
||
| describePartitionInfo(metadata, result) | ||
|
|
||
| if (partitionSpec.isEmpty) { | ||
| if (isExtended) { | ||
| describeExtendedTableInfo(metadata, result) | ||
| } else if (isFormatted) { | ||
| describeFormattedTableInfo(metadata, result) | ||
| } | ||
| } else { | ||
| if (partitionSpec.nonEmpty) { | ||
| describeDetailedPartitionInfo(sparkSession, catalog, metadata, result) | ||
| } else if (isExtended) { | ||
| describeFormattedTableInfo(metadata, result) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -558,72 +553,10 @@ case class DescribeTableCommand( | |
| } | ||
| } | ||
|
|
||
| private def describeExtendedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { | ||
| append(buffer, "", "", "") | ||
| append(buffer, "# Detailed Table Information", table.toString, "") | ||
| } | ||
|
|
||
| private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { | ||
| append(buffer, "", "", "") | ||
| append(buffer, "# Detailed Table Information", "", "") | ||
| append(buffer, "Database:", table.database, "") | ||
| append(buffer, "Owner:", table.owner, "") | ||
| append(buffer, "Created:", new Date(table.createTime).toString, "") | ||
| append(buffer, "Last Access:", new Date(table.lastAccessTime).toString, "") | ||
| append(buffer, "Location:", table.storage.locationUri.map(CatalogUtils.URIToString(_)) | ||
| .getOrElse(""), "") | ||
| append(buffer, "Table Type:", table.tableType.name, "") | ||
| append(buffer, "Comment:", table.comment.getOrElse(""), "") | ||
| table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, "")) | ||
|
|
||
| append(buffer, "Table Parameters:", "", "") | ||
| table.properties.foreach { case (key, value) => | ||
| append(buffer, s" $key", value, "") | ||
| } | ||
|
|
||
| describeStorageInfo(table, buffer) | ||
|
|
||
| if (table.tableType == CatalogTableType.VIEW) describeViewInfo(table, buffer) | ||
|
|
||
| if (DDLUtils.isDatasourceTable(table) && table.tracksPartitionsInCatalog) { | ||
| append(buffer, "Partition Provider:", "Catalog", "") | ||
| } | ||
| } | ||
|
|
||
| private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { | ||
| append(buffer, "", "", "") | ||
| append(buffer, "# Storage Information", "", "") | ||
| metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) | ||
| metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) | ||
| metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) | ||
| append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "") | ||
| describeBucketingInfo(metadata, buffer) | ||
|
|
||
| append(buffer, "Storage Desc Parameters:", "", "") | ||
| val maskedProperties = CatalogUtils.maskCredentials(metadata.storage.properties) | ||
| maskedProperties.foreach { case (key, value) => | ||
| append(buffer, s" $key", value, "") | ||
| } | ||
| } | ||
|
|
||
| private def describeViewInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { | ||
| append(buffer, "", "", "") | ||
| append(buffer, "# View Information", "", "") | ||
| append(buffer, "View Text:", metadata.viewText.getOrElse(""), "") | ||
| append(buffer, "View Default Database:", metadata.viewDefaultDatabase.getOrElse(""), "") | ||
| append(buffer, "View Query Output Columns:", | ||
| metadata.viewQueryColumnNames.mkString("[", ", ", "]"), "") | ||
| } | ||
|
|
||
| private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { | ||
| metadata.bucketSpec match { | ||
| case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) => | ||
| append(buffer, "Num Buckets:", numBuckets.toString, "") | ||
| append(buffer, "Bucket Columns:", bucketColumnNames.mkString("[", ", ", "]"), "") | ||
| append(buffer, "Sort Columns:", sortColumnNames.mkString("[", ", ", "]"), "") | ||
|
|
||
| case _ => | ||
| } | ||
| table.toLinkedHashMap.foreach(s => append(buffer, s._1, s._2, "")) | ||
| } | ||
|
|
||
| private def describeDetailedPartitionInfo( | ||
|
|
@@ -637,21 +570,7 @@ case class DescribeTableCommand( | |
| } | ||
| DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION") | ||
| val partition = catalog.getPartition(table, partitionSpec) | ||
| if (isExtended) { | ||
| describeExtendedDetailedPartitionInfo(table, metadata, partition, result) | ||
| } else if (isFormatted) { | ||
| describeFormattedDetailedPartitionInfo(table, metadata, partition, result) | ||
| describeStorageInfo(metadata, result) | ||
| } | ||
| } | ||
|
|
||
| private def describeExtendedDetailedPartitionInfo( | ||
| tableIdentifier: TableIdentifier, | ||
| table: CatalogTable, | ||
| partition: CatalogTablePartition, | ||
| buffer: ArrayBuffer[Row]): Unit = { | ||
| append(buffer, "", "", "") | ||
| append(buffer, "Detailed Partition Information " + partition.toString, "", "") | ||
| if (isExtended) describeFormattedDetailedPartitionInfo(table, metadata, partition, result) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not related to this PR, but it looks weird that
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function DESCRIBE [EXTENDED|FORMATTED] table_name PARTITION (partitionVal*) |
||
| } | ||
|
|
||
| private def describeFormattedDetailedPartitionInfo( | ||
|
|
@@ -661,15 +580,17 @@ case class DescribeTableCommand( | |
| buffer: ArrayBuffer[Row]): Unit = { | ||
| append(buffer, "", "", "") | ||
| append(buffer, "# Detailed Partition Information", "", "") | ||
| append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") | ||
| append(buffer, "Database:", table.database, "") | ||
| append(buffer, "Table:", tableIdentifier.table, "") | ||
| append(buffer, "Location:", partition.storage.locationUri.map(CatalogUtils.URIToString(_)) | ||
| .getOrElse(""), "") | ||
| append(buffer, "Partition Parameters:", "", "") | ||
| partition.parameters.foreach { case (key, value) => | ||
| append(buffer, s" $key", value, "") | ||
| append(buffer, "Database", table.database, "") | ||
| append(buffer, "Table", tableIdentifier.table, "") | ||
| partition.toLinkedHashMap.foreach(s => append(buffer, s._1, s._2, "")) | ||
| append(buffer, "", "", "") | ||
| append(buffer, "# Table Storage Information", "", "") | ||
| table.bucketSpec match { | ||
| case Some(spec) => | ||
| spec.toLinkedHashMap.foreach(s => append(buffer, s._1, s._2, "")) | ||
| case _ => | ||
| } | ||
| table.storage.toLinkedHashMap.foreach(s => append(buffer, s._1, s._2, "")) | ||
| } | ||
|
|
||
| private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { | ||
|
|
@@ -728,7 +649,7 @@ case class ShowTablesCommand( | |
| val tableName = tableIdent.table | ||
| val isTemp = catalog.isTemporaryTable(tableIdent) | ||
| if (isExtended) { | ||
| val information = catalog.getTempViewOrPermanentTableMetadata(tableIdent).toString | ||
| val information = catalog.getTempViewOrPermanentTableMetadata(tableIdent).simpleString | ||
| Row(database, tableName, isTemp, s"$information\n") | ||
| } else { | ||
| Row(database, tableName, isTemp) | ||
|
|
@@ -745,7 +666,7 @@ case class ShowTablesCommand( | |
| val database = table.database.getOrElse("") | ||
| val tableName = table.table | ||
| val isTemp = catalog.isTemporaryTable(table) | ||
| val information = partition.toString | ||
| val information = partition.simpleString | ||
| Seq(Row(database, tableName, isTemp, s"$information\n")) | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we put the imports of scala library above third-party.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change the order, can scalastyle pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually, the import order is:
Dataset.scalais an example.