Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -457,7 +455,6 @@ case class AlterTableSetLocation(
}
Seq.empty[Row]
}

}


Expand Down Expand Up @@ -489,9 +486,83 @@ private[sql] object DDLUtils {
case _ =>
})
}

def isTablePartitioned(table: CatalogTable): Boolean = {
table.partitionColumns.size > 0 ||
table.partitionColumns.nonEmpty ||
table.properties.contains("spark.sql.sources.schema.numPartCols")
}
}

def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
getSchemaFromTableProperties(metadata.properties)
}

// A persisted data source table may not store its schema in the catalog. In this case, its schema
// will be inferred at runtime when the table is referenced.
def getSchemaFromTableProperties(props: Map[String, String]): Option[StructType] = {
require(isDatasourceTable(props))

val schemaParts = for {
numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
index <- 0 until numParts.toInt
} yield props.getOrElse(
s"spark.sql.sources.schema.part.$index",
throw new AnalysisException(
s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing."
)
)

if (schemaParts.isEmpty) {
None
} else {
Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType])
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a time when we only used spark.sql.sources.schema. Let's check this first and then check spark.sql.sources.schema.numParts. You can take a look at HiveMetastoreCatalog's cachedDataSourceTables.

Copy link
Contributor

@yhuai yhuai May 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make a PR to handle spark.sql.sources.schema.


private def getColumnNamesByTypeFromTableProperties(
props: Map[String, String], colType: String, typeName: String): Seq[String] = {
require(isDatasourceTable(props))

for {
numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
index <- 0 until numCols.toInt
} yield props.getOrElse(
s"spark.sql.sources.schema.${colType}Col.$index",
throw new AnalysisException(
s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
)
)
}

def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
getPartitionColumnsFromTableProperties(metadata.properties)
}

def getPartitionColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
getColumnNamesByTypeFromTableProperties(props, "part", "partitioning columns")
}

def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = {
getNumBucketFromTableProperties(metadata.properties)
}

def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] = {
require(isDatasourceTable(props))
props.get("spark.sql.sources.schema.numBuckets").map(_.toInt)
}

def getBucketingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
getBucketingColumnsFromTableProperties(metadata.properties)
}

def getBucketingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing columns")
}

def getSortingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
getSortingColumnsFromTableProperties(metadata.properties)
}

def getSortingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,29 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF

// Shows data columns and partitioned columns (if any)
private def describe(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describeSchema(table.schema, buffer)
if (DDLUtils.isDatasourceTable(table)) {
val schema = DDLUtils.getSchemaFromTableProperties(table)

if (table.partitionColumns.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output(0).name}", output(1).name, output(2).name)
describeSchema(table.partitionColumns, buffer)
if (schema.isEmpty) {
append(buffer, "# Schema of this table is inferred at runtime", "", "")
} else {
schema.foreach(describeSchema(_, buffer))
}

val partCols = DDLUtils.getPartitionColumnsFromTableProperties(table)
if (partCols.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", "", "")
partCols.foreach(col => append(buffer, col, "", ""))
}
} else {
describeSchema(table.schema, buffer)

if (table.partitionColumns.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
describeSchema(table.partitionColumns, buffer)
}
}
}

Expand All @@ -338,26 +355,47 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
append(buffer, "Table Type:", table.tableType.name, "")

append(buffer, "Table Parameters:", "", "")
table.properties.foreach { case (key, value) =>
table.properties.filterNot {
// Hides schema properties that hold user-defined schema, partition columns, and bucketing
// information since they are already extracted and shown in other parts.
case (key, _) => key.startsWith("spark.sql.sources.schema")
}.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}

describeStorageInfo(table, buffer)
}

private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Storage Information", "", "")
table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
table.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else "No", "")
append(buffer, "Num Buckets:", table.numBuckets.toString, "")
append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "")
metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "")
describeBucketingInfo(metadata, buffer)

append(buffer, "Storage Desc Parameters:", "", "")
table.storage.serdeProperties.foreach { case (key, value) =>
metadata.storage.serdeProperties.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
}

private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
if (DDLUtils.isDatasourceTable(metadata)) {
val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata)
val bucketCols = DDLUtils.getBucketingColumnsFromTableProperties(metadata)
val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata)
append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), "")
append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "")
} else {
append(buffer, "Num Buckets:", metadata.numBuckets.toString, "")
append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "")
}
}

private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -531,4 +531,58 @@ class HiveDDLSuite
.exists(_.getString(0) == "# Detailed Table Information"))
}
}

test("desc table for data source table - no user-defined schema") {
withTable("t1") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext.range(1).write.parquet(path)
sql(s"CREATE TABLE t1 USING parquet OPTIONS (PATH '$path')")

val desc = sql("DESC FORMATTED t1").collect().toSeq

assert(desc.contains(Row("# Schema of this table is inferred at runtime", "", "")))
}
}
}

test("desc table for data source table - partitioned bucketed table") {
withTable("t1") {
sqlContext
.range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
.bucketBy(2, "b").sortBy("c").partitionBy("d")
.saveAsTable("t1")

val formattedDesc = sql("DESC FORMATTED t1").collect()

assert(formattedDesc.containsSlice(
Seq(
Row("a", "bigint", ""),
Row("b", "bigint", ""),
Row("c", "bigint", ""),
Row("d", "bigint", ""),
Row("# Partition Information", "", ""),
Row("# col_name", "", ""),
Row("d", "", ""),
Row("", "", ""),
Row("# Detailed Table Information", "", ""),
Row("Database:", "default", "")
)
))

assert(formattedDesc.containsSlice(
Seq(
Row("Table Type:", "MANAGED", "")
)
))

assert(formattedDesc.containsSlice(
Seq(
Row("Num Buckets:", "2", ""),
Row("Bucket Columns:", "[b]", ""),
Row("Sort Columns:", "[c]", "")
)
))
}
}
}