Skip to content

Commit 54e2618

Browse files
committed
Extracts schema information from table properties for data source tables
1 parent f453791 commit 54e2618

File tree

3 files changed

+183
-20
lines changed

3 files changed

+183
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ package org.apache.spark.sql.execution.command
1919

2020
import scala.util.control.NonFatal
2121

22-
import org.apache.spark.internal.Logging
2322
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2423
import org.apache.spark.sql.catalyst.TableIdentifier
2524
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
2625
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, CatalogTableType, SessionCatalog}
2726
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
2827
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
29-
import org.apache.spark.sql.catalyst.parser.ParseException
3028
import org.apache.spark.sql.types._
3129

3230

@@ -457,7 +455,6 @@ case class AlterTableSetLocation(
457455
}
458456
Seq.empty[Row]
459457
}
460-
461458
}
462459

463460

@@ -489,9 +486,83 @@ private[sql] object DDLUtils {
489486
case _ =>
490487
})
491488
}
489+
492490
def isTablePartitioned(table: CatalogTable): Boolean = {
493-
table.partitionColumns.size > 0 ||
491+
table.partitionColumns.nonEmpty ||
494492
table.properties.contains("spark.sql.sources.schema.numPartCols")
495493
}
496-
}
497494

495+
def getSchemaFromTableProperties(metadata: CatalogTable): Option[StructType] = {
496+
getSchemaFromTableProperties(metadata.properties)
497+
}
498+
499+
// A persisted data source table may not store its schema in the catalog. In this case, its schema
500+
// will be inferred at runtime when the table is referenced.
501+
def getSchemaFromTableProperties(props: Map[String, String]): Option[StructType] = {
502+
require(isDatasourceTable(props))
503+
504+
val schemaParts = for {
505+
numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
506+
index <- 0 until numParts.toInt
507+
} yield props.getOrElse(
508+
s"spark.sql.sources.schema.part.$index",
509+
throw new AnalysisException(
510+
s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing."
511+
)
512+
)
513+
514+
if (schemaParts.isEmpty) {
515+
None
516+
} else {
517+
Some(DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType])
518+
}
519+
}
520+
521+
private def getColumnNamesByTypeFromTableProperties(
522+
props: Map[String, String], colType: String, typeName: String): Seq[String] = {
523+
require(isDatasourceTable(props))
524+
525+
for {
526+
numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
527+
index <- 0 until numCols.toInt
528+
} yield props.getOrElse(
529+
s"spark.sql.sources.schema.${colType}Col.$index",
530+
throw new AnalysisException(
531+
s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
532+
)
533+
)
534+
}
535+
536+
def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
537+
getPartitionColumnsFromTableProperties(metadata.properties)
538+
}
539+
540+
def getPartitionColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
541+
getColumnNamesByTypeFromTableProperties(props, "part", "partitioning columns")
542+
}
543+
544+
def getNumBucketFromTableProperties(metadata: CatalogTable): Option[Int] = {
545+
getNumBucketFromTableProperties(metadata.properties)
546+
}
547+
548+
def getNumBucketFromTableProperties(props: Map[String, String]): Option[Int] = {
549+
require(isDatasourceTable(props))
550+
props.get("spark.sql.sources.schema.numBuckets").map(_.toInt)
551+
}
552+
553+
def getBucketingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
554+
getBucketingColumnsFromTableProperties(metadata.properties)
555+
}
556+
557+
def getBucketingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
558+
getColumnNamesByTypeFromTableProperties(props, "bucket", "bucketing columns")
559+
}
560+
561+
def getSortingColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
562+
getSortingColumnsFromTableProperties(metadata.properties)
563+
}
564+
565+
def getSortingColumnsFromTableProperties(props: Map[String, String]): Seq[String] = {
566+
getColumnNamesByTypeFromTableProperties(props, "sort", "sorting columns")
567+
}
568+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -309,12 +309,29 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
309309

310310
// Shows data columns and partitioned columns (if any)
311311
private def describe(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
312-
describeSchema(table.schema, buffer)
312+
if (DDLUtils.isDatasourceTable(table)) {
313+
val schema = DDLUtils.getSchemaFromTableProperties(table)
313314

314-
if (table.partitionColumns.nonEmpty) {
315-
append(buffer, "# Partition Information", "", "")
316-
append(buffer, s"# ${output(0).name}", output(1).name, output(2).name)
317-
describeSchema(table.partitionColumns, buffer)
315+
if (schema.isEmpty) {
316+
append(buffer, "# Schema of this table is inferred at runtime", "", "")
317+
} else {
318+
schema.foreach(describeSchema(_, buffer))
319+
}
320+
321+
val partCols = DDLUtils.getPartitionColumnsFromTableProperties(table)
322+
if (partCols.nonEmpty) {
323+
append(buffer, "# Partition Information", "", "")
324+
append(buffer, s"# ${output.head.name}", "", "")
325+
partCols.foreach(col => append(buffer, col, "", ""))
326+
}
327+
} else {
328+
describeSchema(table.schema, buffer)
329+
330+
if (table.partitionColumns.nonEmpty) {
331+
append(buffer, "# Partition Information", "", "")
332+
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
333+
describeSchema(table.partitionColumns, buffer)
334+
}
318335
}
319336
}
320337

@@ -338,26 +355,47 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
338355
append(buffer, "Table Type:", table.tableType.name, "")
339356

340357
append(buffer, "Table Parameters:", "", "")
341-
table.properties.foreach { case (key, value) =>
358+
table.properties.filterNot {
359+
// Hides schema properties that hold user-defined schema, partition columns, and bucketing
360+
// information since they are already extracted and shown in other parts.
361+
case (key, _) => key.startsWith("spark.sql.sources.schema")
362+
}.foreach { case (key, value) =>
342363
append(buffer, s" $key", value, "")
343364
}
344365

366+
describeStorageInfo(table, buffer)
367+
}
368+
369+
private def describeStorageInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
345370
append(buffer, "", "", "")
346371
append(buffer, "# Storage Information", "", "")
347-
table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
348-
table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
349-
table.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
350-
append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else "No", "")
351-
append(buffer, "Num Buckets:", table.numBuckets.toString, "")
352-
append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "")
353-
append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "")
372+
metadata.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
373+
metadata.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
374+
metadata.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
375+
append(buffer, "Compressed:", if (metadata.storage.compressed) "Yes" else "No", "")
376+
describeBucketingInfo(metadata, buffer)
354377

355378
append(buffer, "Storage Desc Parameters:", "", "")
356-
table.storage.serdeProperties.foreach { case (key, value) =>
379+
metadata.storage.serdeProperties.foreach { case (key, value) =>
357380
append(buffer, s" $key", value, "")
358381
}
359382
}
360383

384+
private def describeBucketingInfo(metadata: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
385+
if (DDLUtils.isDatasourceTable(metadata)) {
386+
val numBuckets = DDLUtils.getNumBucketFromTableProperties(metadata)
387+
val bucketCols = DDLUtils.getBucketingColumnsFromTableProperties(metadata)
388+
val sortCols = DDLUtils.getSortingColumnsFromTableProperties(metadata)
389+
append(buffer, "Num Buckets:", numBuckets.map(_.toString).getOrElse(""), "")
390+
append(buffer, "Bucket Columns:", bucketCols.mkString("[", ", ", "]"), "")
391+
append(buffer, "Sort Columns:", sortCols.mkString("[", ", ", "]"), "")
392+
} else {
393+
append(buffer, "Num Buckets:", metadata.numBuckets.toString, "")
394+
append(buffer, "Bucket Columns:", metadata.bucketColumnNames.mkString("[", ", ", "]"), "")
395+
append(buffer, "Sort Columns:", metadata.sortColumnNames.mkString("[", ", ", "]"), "")
396+
}
397+
}
398+
361399
private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
362400
schema.foreach { column =>
363401
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.io.File
2222
import org.apache.hadoop.fs.Path
2323
import org.scalatest.BeforeAndAfterEach
2424

25-
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
25+
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
2626
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
2727
import org.apache.spark.sql.catalyst.TableIdentifier
2828
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -531,4 +531,58 @@ class HiveDDLSuite
531531
.exists(_.getString(0) == "# Detailed Table Information"))
532532
}
533533
}
534+
535+
test("desc table for data source table - no user-defined schema") {
536+
withTable("t1") {
537+
withTempPath { dir =>
538+
val path = dir.getCanonicalPath
539+
sqlContext.range(1).write.parquet(path)
540+
sql(s"CREATE TABLE t1 USING parquet OPTIONS (PATH '$path')")
541+
542+
val desc = sql("DESC FORMATTED t1").collect().toSeq
543+
544+
assert(desc.contains(Row("# Schema of this table is inferred at runtime", "", "")))
545+
}
546+
}
547+
}
548+
549+
test("desc table for data source table - partitioned bucketed table") {
550+
withTable("t1") {
551+
sqlContext
552+
.range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
553+
.bucketBy(2, "b").sortBy("c").partitionBy("d")
554+
.saveAsTable("t1")
555+
556+
val formattedDesc = sql("DESC FORMATTED t1").collect()
557+
558+
assert(formattedDesc.containsSlice(
559+
Seq(
560+
Row("a", "bigint", ""),
561+
Row("b", "bigint", ""),
562+
Row("c", "bigint", ""),
563+
Row("d", "bigint", ""),
564+
Row("# Partition Information", "", ""),
565+
Row("# col_name", "", ""),
566+
Row("d", "", ""),
567+
Row("", "", ""),
568+
Row("# Detailed Table Information", "", ""),
569+
Row("Database:", "default", "")
570+
)
571+
))
572+
573+
assert(formattedDesc.containsSlice(
574+
Seq(
575+
Row("Table Type:", "MANAGED", "")
576+
)
577+
))
578+
579+
assert(formattedDesc.containsSlice(
580+
Seq(
581+
Row("Num Buckets:", "2", ""),
582+
Row("Bucket Columns:", "[b]", ""),
583+
Row("Sort Columns:", "[c]", "")
584+
)
585+
))
586+
}
587+
}
534588
}

0 commit comments

Comments
 (0)