From 89e0aea04bdeffaa69be0c19f10860d12af90953 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 3 May 2016 01:07:32 +0800 Subject: [PATCH 1/6] Implements native 'DESC TABE' DDL command --- .../sql/catalyst/catalog/interface.scala | 1 + .../spark/sql/execution/SparkSqlParser.scala | 7 +- .../spark/sql/execution/command/tables.scala | 96 +++++++++++++++---- .../sql/hive/client/HiveClientImpl.scala | 1 + 4 files changed, 83 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 3851e4c70674..ff8a5df6a867 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -89,6 +89,7 @@ case class CatalogTable( sortColumnNames: Seq[String] = Seq.empty, bucketColumnNames: Seq[String] = Seq.empty, numBuckets: Int = -1, + owner: String = "", createTime: Long = System.currentTimeMillis, lastAccessTime: Long = -1, properties: Map[String, String] = Map.empty, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8128a6efe3cc..0ce8c1dcbfed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -240,10 +240,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { // FORMATTED and columns are not supported. Return null and let the parser decide what to do // with this (create an exception or pass it on to a different system). - if (ctx.describeColName != null || ctx.FORMATTED != null || ctx.partitionSpec != null) { + if (ctx.describeColName != null || ctx.partitionSpec != null) { null } else { - DescribeTableCommand(visitTableIdentifier(ctx.tableIdentifier), ctx.EXTENDED != null) + DescribeTableCommand( + visitTableIdentifier(ctx.tableIdentifier), + ctx.EXTENDED != null, + ctx.FORMATTED() != null) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6078918316d9..aa5d7ed6c606 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -19,16 +19,17 @@ package org.apache.spark.sql.execution.command import java.io.File import java.net.URI +import java.util.Date import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogRelation, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} -import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType} +import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType} import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( @@ -270,10 +271,10 @@ case class LoadData( /** * Command that looks like * {{{ - * DESCRIBE (EXTENDED) table_name; + * DESCRIBE [EXTENDED|FORMATTED] table_name; * }}} */ -case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) +case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isFormatted: Boolean) extends RunnableCommand { override val output: Seq[Attribute] = Seq( @@ -290,29 +291,84 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) val result = new ArrayBuffer[Row] sparkSession.sessionState.catalog.lookupRelation(table) match { case catalogRelation: CatalogRelation => - catalogRelation.catalogTable.schema.foreach { column => - result += Row(column.name, column.dataType, column.comment.orNull) - } - - if (catalogRelation.catalogTable.partitionColumns.nonEmpty) { - result += Row("# Partition Information", "", "") - result += Row(s"# ${output(0).name}", output(1).name, output(2).name) - - catalogRelation.catalogTable.partitionColumns.foreach { col => - result += Row(col.name, col.dataType, col.comment.orNull) - } + if (isExtended) { + describeExtended(catalogRelation, result) + } else if (isFormatted) { + describeFormatted(catalogRelation, result) + } else { + describe(catalogRelation, result) } case relation => - relation.schema.fields.foreach { field => - val comment = - if (field.metadata.contains("comment")) field.metadata.getString("comment") else "" - result += Row(field.name, field.dataType.simpleString, comment) - } + describeSchema(relation.schema, result) } result } + + private def describe(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { + describeSchema(relation.catalogTable.schema, buffer) + + if (relation.catalogTable.partitionColumns.nonEmpty) { + buffer += Row("# Partition Information", "", "") + buffer += Row(s"# ${output(0).name}", output(1).name, output(2).name) + + describeSchema(relation.catalogTable.partitionColumns, buffer) + } + } + + private def describeExtended(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { + describeSchema(relation.catalogTable.schema, buffer) + buffer += Row("", "", "") + buffer += Row("# Detailed Table Information", relation.catalogTable.toString, "") + } + + private def describeFormatted(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { + val table = relation.catalogTable + describeSchema(table.schema, buffer) + + buffer += Row("", "", "") + buffer += Row("# Detailed Table Information", "", "") + buffer += Row("Database:", table.database, "") + buffer += Row("Owner:", table.owner, "") + buffer += Row("Create Time:", new Date(table.createTime).toString, "") + buffer += Row("Last Access Time:", new Date(table.lastAccessTime).toString, "") + buffer += Row("Location:", table.storage.locationUri.getOrElse(""), "") + buffer += Row("Table Type:", table.tableType.name, "") + + buffer += Row("Table Parameters:", "", "") + table.properties.foreach { case (key, value) => + buffer += Row(s" $key", value, "") + } + + buffer += Row("", "", "") + buffer += Row("# Storage Information", "", "") + table.storage.serde.foreach(serdeLib => buffer += Row("SerDe Library:", serdeLib, "")) + table.storage.inputFormat.foreach(format => buffer += Row("InputFormat:", format, "")) + table.storage.outputFormat.foreach(format => buffer += Row("OutputFormat:", format, "")) + buffer += Row("Num Buckets:", table.numBuckets.toString, "") + buffer += Row("Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "") + buffer += Row("Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "") + + buffer += Row("Storage Desc Parameters:", "", "") + table.storage.serdeProperties.foreach { case (key, value) => + buffer += Row(s" $key", value, "") + } + } + + private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { + schema.foreach { column => + val comment = + if (column.metadata.contains("comment")) column.metadata.getString("comment") else "" + buffer += Row(column.name, column.dataType.sql, comment) + } + } + + private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { + schema.foreach { column => + buffer += Row(column.name, column.dataType, column.comment.getOrElse("")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index cdfadfaaeac7..4cbcf0d3c08e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -348,6 +348,7 @@ private[hive] class HiveClientImpl( sortColumnNames = Seq(), // TODO: populate this bucketColumnNames = h.getBucketCols.asScala, numBuckets = h.getNumBuckets, + owner = h.getOwner, createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( From 718da25b489ef56e2b1092484bd6fbcdf2a547ed Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 3 May 2016 01:39:24 +0800 Subject: [PATCH 2/6] Test case --- .../spark/sql/hive/execution/HiveDDLSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 687a4a7e512a..373d1a1e0ebc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -348,4 +348,21 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("desc table") { + withTable("tab1") { + val tabName = "tab1" + sql(s"CREATE TABLE $tabName(c1 int)") + + assert(sql(s"DESC $tabName").collect().length == 1) + + assert( + sql(s"DESC FORMATTED $tabName").collect() + .exists(_.getString(0) == "# Storage Information")) + + assert( + sql(s"DESC EXTENDED $tabName").collect() + .exists(_.getString(0) == "# Detailed Table Information")) + } + } } From 9194fe1112ed247ff10d37ee6864ef88a9bd07fb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 3 May 2016 13:25:12 +0800 Subject: [PATCH 3/6] Fixes test failures - Shows partition columns for EXTENDED and FORMATTED - Shows "Compressed:" field - Shows data types in lower case --- .../spark/sql/catalyst/catalog/interface.scala | 4 +++- .../sql/catalyst/catalog/ExternalCatalogSuite.scala | 1 + .../apache/spark/sql/execution/SparkSqlParser.scala | 8 +++++--- .../execution/command/createDataSourceTables.scala | 2 ++ .../apache/spark/sql/execution/command/tables.scala | 12 ++++++++---- .../spark/sql/execution/command/DDLSuite.scala | 6 +++++- .../spark/sql/hive/client/HiveClientImpl.scala | 3 ++- .../sql/hive/execution/CreateTableAsSelect.scala | 3 ++- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 1 + .../apache/spark/sql/hive/client/VersionsSuite.scala | 1 + 10 files changed, 30 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index ff8a5df6a867..2c6e9f53b27b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -48,6 +48,7 @@ case class CatalogStorageFormat( inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], + compressed: Boolean, serdeProperties: Map[String, String]) @@ -124,10 +125,11 @@ case class CatalogTable( locationUri: Option[String] = storage.locationUri, inputFormat: Option[String] = storage.inputFormat, outputFormat: Option[String] = storage.outputFormat, + compressed: Boolean = false, serde: Option[String] = storage.serde, serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = { copy(storage = CatalogStorageFormat( - locationUri, inputFormat, outputFormat, serde, serdeProperties)) + locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index d739b177430c..ae7c503e65cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -507,6 +507,7 @@ abstract class CatalogTestUtils { inputFormat = Some(tableInputFormat), outputFormat = Some(tableOutputFormat), serde = None, + compressed = false, serdeProperties = Map.empty) lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 0ce8c1dcbfed..ec5febce6360 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -870,6 +870,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Note: Keep this unspecified because we use the presence of the serde to decide // whether to convert a table created by CTAS to a datasource table. serde = None, + compressed = false, serdeProperties = Map()) } val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) @@ -881,6 +882,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat), outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat), serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde), + compressed = false, serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties) // TODO support the sql text - have a proper location for this! @@ -934,7 +936,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** Empty storage format for default values and copies. */ - private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty) + private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) /** * Create a [[CatalogStorageFormat]]. @@ -1015,7 +1017,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { entry("field.delim", ctx.fieldsTerminatedBy) ++ entry("serialization.format", ctx.fieldsTerminatedBy) ++ entry("escape.delim", ctx.escapedBy) ++ - entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ + entry("collection.delim", ctx.collectionItemsTerminatedBy) ++ entry("mapkey.delim", ctx.keysTerminatedBy) ++ Option(ctx.linesSeparatedBy).toSeq.map { token => val value = string(token) @@ -1154,7 +1156,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { case c: RowFormatSerdeContext => // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c) + val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) // SPARK-10310: Special cases LazySimpleSerDe val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index f670f63472bf..e07ab99ef3e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -349,6 +349,7 @@ object CreateDataSourceTableUtils extends Logging { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = options ), properties = tableProperties.toMap) @@ -368,6 +369,7 @@ object CreateDataSourceTableUtils extends Logging { inputFormat = serde.inputFormat, outputFormat = serde.outputFormat, serde = serde.serde, + compressed = false, serdeProperties = options ), schema = relation.schema.map { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index aa5d7ed6c606..59e281a0950d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -306,6 +306,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF result } + // Shows data columns and partitioned columns (if any) private def describe(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { describeSchema(relation.catalogTable.schema, buffer) @@ -318,14 +319,16 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } private def describeExtended(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { - describeSchema(relation.catalogTable.schema, buffer) + describe(relation, buffer) + buffer += Row("", "", "") buffer += Row("# Detailed Table Information", relation.catalogTable.toString, "") } private def describeFormatted(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { + describe(relation, buffer) + val table = relation.catalogTable - describeSchema(table.schema, buffer) buffer += Row("", "", "") buffer += Row("# Detailed Table Information", "", "") @@ -346,6 +349,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF table.storage.serde.foreach(serdeLib => buffer += Row("SerDe Library:", serdeLib, "")) table.storage.inputFormat.foreach(format => buffer += Row("InputFormat:", format, "")) table.storage.outputFormat.foreach(format => buffer += Row("OutputFormat:", format, "")) + buffer += Row("Compressed:", if (table.storage.compressed) "Yes" else "No", "") buffer += Row("Num Buckets:", table.numBuckets.toString, "") buffer += Row("Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "") buffer += Row("Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "") @@ -360,13 +364,13 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF schema.foreach { column => val comment = if (column.metadata.contains("comment")) column.metadata.getString("comment") else "" - buffer += Row(column.name, column.dataType.sql, comment) + buffer += Row(column.name, column.dataType.sql.toLowerCase, comment) } } private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { schema.foreach { column => - buffer += Row(column.name, column.dataType, column.comment.getOrElse("")) + buffer += Row(column.name, column.dataType.toLowerCase, column.comment.orNull) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 12acb9f2761d..0ae099ecc2bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -76,6 +76,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map()) catalog.createTable(CatalogTable( identifier = name, @@ -89,7 +90,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { catalog: SessionCatalog, spec: TablePartitionSpec, tableName: TableIdentifier): Unit = { - val part = CatalogTablePartition(spec, CatalogStorageFormat(None, None, None, None, Map())) + val part = CatalogTablePartition( + spec, CatalogStorageFormat(None, None, None, None, false, Map())) catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) } @@ -264,6 +266,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map()) val expectedTable = CatalogTable( @@ -288,6 +291,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map()) val expectedTable = CatalogTable( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 4cbcf0d3c08e..dd8ac0d185ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -356,6 +356,7 @@ private[hive] class HiveClientImpl( inputFormat = Option(h.getInputFormatClass).map(_.getName), outputFormat = Option(h.getOutputFormatClass).map(_.getName), serde = Option(h.getSerializationLib), + compressed = h.getTTable.getSd.isCompressed, serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap ), properties = h.getParameters.asScala.toMap, @@ -785,7 +786,7 @@ private[hive] class HiveClientImpl( inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), + compressed = apiPartition.getSd.isCompressed, serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap)) } - } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 08d4b99d30ea..9dfbafae872f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -56,7 +56,8 @@ case class CreateTableAsSelect( outputFormat = tableDesc.storage.outputFormat .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName))) + serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)), + compressed = tableDesc.storage.compressed) val withSchema = if (withFormat.schema.isEmpty) { // Hive doesn't support specifying the column list for target table in CTAS diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index c3a9f2479ce7..4bdcb96feb0a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -732,6 +732,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv inputFormat = None, outputFormat = None, serde = None, + compressed = false, serdeProperties = Map( "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName))) ), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 9341b3816fea..a6a5ab3988fc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -157,6 +157,7 @@ class VersionsSuite extends SparkFunSuite with Logging { outputFormat = Some( classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName), serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()), + compressed = false, serdeProperties = Map.empty )) From a66885a545d670aa5ddc94126284446c246ca81c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 3 May 2016 17:33:35 +0800 Subject: [PATCH 4/6] Reverts the "typo" fix --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 3 ++- .../scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ec5febce6360..220e1ee3e991 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1017,7 +1017,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { entry("field.delim", ctx.fieldsTerminatedBy) ++ entry("serialization.format", ctx.fieldsTerminatedBy) ++ entry("escape.delim", ctx.escapedBy) ++ - entry("collection.delim", ctx.collectionItemsTerminatedBy) ++ + // The following typo is inherited from Hive... + entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ entry("mapkey.delim", ctx.keysTerminatedBy) ++ Option(ctx.linesSeparatedBy).toSeq.map { token => val value = string(token) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index c97c28c40c96..d7a83c16b6c7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -578,7 +578,7 @@ class HiveDDLCommandSuite extends PlanTest { assert(source2.table == "table2") } - test("load data") { + test("load data") { val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { case LoadData(t, path, l, o, partition) => (t, path, l, o, partition) From 18b9bb50039d75150dca06b4f97bcffbe41343e7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 3 May 2016 19:26:25 +0800 Subject: [PATCH 5/6] Uses .simpleString instead of .sql when describing column data types --- .../scala/org/apache/spark/sql/execution/command/tables.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 59e281a0950d..5fec6d2408b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -364,7 +364,7 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF schema.foreach { column => val comment = if (column.metadata.contains("comment")) column.metadata.getString("comment") else "" - buffer += Row(column.name, column.dataType.sql.toLowerCase, comment) + buffer += Row(column.name, column.dataType.simpleString, comment) } } From b5dbc155afc447a0f700919f7645fc093ec26925 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 4 May 2016 14:43:00 +0800 Subject: [PATCH 6/6] Addresses PR comment --- .../spark/sql/execution/command/tables.scala | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 5fec6d2408b8..3c0e859cfaca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -311,9 +311,8 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF describeSchema(relation.catalogTable.schema, buffer) if (relation.catalogTable.partitionColumns.nonEmpty) { - buffer += Row("# Partition Information", "", "") - buffer += Row(s"# ${output(0).name}", output(1).name, output(2).name) - + append(buffer, "# Partition Information", "", "") + append(buffer, s"# ${output(0).name}", output(1).name, output(2).name) describeSchema(relation.catalogTable.partitionColumns, buffer) } } @@ -321,8 +320,8 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF private def describeExtended(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { describe(relation, buffer) - buffer += Row("", "", "") - buffer += Row("# Detailed Table Information", relation.catalogTable.toString, "") + append(buffer, "", "", "") + append(buffer, "# Detailed Table Information", relation.catalogTable.toString, "") } private def describeFormatted(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = { @@ -330,33 +329,33 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF val table = relation.catalogTable - buffer += Row("", "", "") - buffer += Row("# Detailed Table Information", "", "") - buffer += Row("Database:", table.database, "") - buffer += Row("Owner:", table.owner, "") - buffer += Row("Create Time:", new Date(table.createTime).toString, "") - buffer += Row("Last Access Time:", new Date(table.lastAccessTime).toString, "") - buffer += Row("Location:", table.storage.locationUri.getOrElse(""), "") - buffer += Row("Table Type:", table.tableType.name, "") + append(buffer, "", "", "") + append(buffer, "# Detailed Table Information", "", "") + append(buffer, "Database:", table.database, "") + append(buffer, "Owner:", table.owner, "") + append(buffer, "Create Time:", new Date(table.createTime).toString, "") + append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") + append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "") + append(buffer, "Table Type:", table.tableType.name, "") - buffer += Row("Table Parameters:", "", "") + append(buffer, "Table Parameters:", "", "") table.properties.foreach { case (key, value) => - buffer += Row(s" $key", value, "") + append(buffer, s" $key", value, "") } - buffer += Row("", "", "") - buffer += Row("# Storage Information", "", "") - table.storage.serde.foreach(serdeLib => buffer += Row("SerDe Library:", serdeLib, "")) - table.storage.inputFormat.foreach(format => buffer += Row("InputFormat:", format, "")) - table.storage.outputFormat.foreach(format => buffer += Row("OutputFormat:", format, "")) - buffer += Row("Compressed:", if (table.storage.compressed) "Yes" else "No", "") - buffer += Row("Num Buckets:", table.numBuckets.toString, "") - buffer += Row("Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "") - buffer += Row("Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "") - - buffer += Row("Storage Desc Parameters:", "", "") + append(buffer, "", "", "") + append(buffer, "# Storage Information", "", "") + table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, "")) + table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, "")) + table.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, "")) + append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else "No", "") + append(buffer, "Num Buckets:", table.numBuckets.toString, "") + append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "") + append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "") + + append(buffer, "Storage Desc Parameters:", "", "") table.storage.serdeProperties.foreach { case (key, value) => - buffer += Row(s" $key", value, "") + append(buffer, s" $key", value, "") } } @@ -364,15 +363,20 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF schema.foreach { column => val comment = if (column.metadata.contains("comment")) column.metadata.getString("comment") else "" - buffer += Row(column.name, column.dataType.simpleString, comment) + append(buffer, column.name, column.dataType.simpleString, comment) } } private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { schema.foreach { column => - buffer += Row(column.name, column.dataType.toLowerCase, column.comment.orNull) + append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull) } } + + private def append( + buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = { + buffer += Row(column, dataType, comment) + } }