Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ case class CatalogStorageFormat(
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever true? If it isn't we could leave it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm. Hive can pass compressed tables.

serdeProperties: Map[String, String])


Expand Down Expand Up @@ -89,6 +90,7 @@ case class CatalogTable(
sortColumnNames: Seq[String] = Seq.empty,
bucketColumnNames: Seq[String] = Seq.empty,
numBuckets: Int = -1,
owner: String = "",
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
Expand Down Expand Up @@ -123,10 +125,11 @@ case class CatalogTable(
locationUri: Option[String] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
serde: Option[String] = storage.serde,
serdeProperties: Map[String, String] = storage.serdeProperties): CatalogTable = {
copy(storage = CatalogStorageFormat(
locationUri, inputFormat, outputFormat, serde, serdeProperties))
locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ abstract class CatalogTestUtils {
inputFormat = Some(tableInputFormat),
outputFormat = Some(tableOutputFormat),
serde = None,
compressed = false,
serdeProperties = Map.empty)
lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat)
lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
// FORMATTED and columns are not supported. Return null and let the parser decide what to do
// with this (create an exception or pass it on to a different system).
if (ctx.describeColName != null || ctx.FORMATTED != null || ctx.partitionSpec != null) {
if (ctx.describeColName != null || ctx.partitionSpec != null) {
null
} else {
DescribeTableCommand(visitTableIdentifier(ctx.tableIdentifier), ctx.EXTENDED != null)
DescribeTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.EXTENDED != null,
ctx.FORMATTED() != null)
}
}

Expand Down Expand Up @@ -867,6 +870,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// Note: Keep this unspecified because we use the presence of the serde to decide
// whether to convert a table created by CTAS to a datasource table.
serde = None,
compressed = false,
serdeProperties = Map())
}
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
Expand All @@ -878,6 +882,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
inputFormat = fileStorage.inputFormat.orElse(defaultStorage.inputFormat),
outputFormat = fileStorage.outputFormat.orElse(defaultStorage.outputFormat),
serde = rowStorage.serde.orElse(fileStorage.serde).orElse(defaultStorage.serde),
compressed = false,
serdeProperties = rowStorage.serdeProperties ++ fileStorage.serdeProperties)

// TODO support the sql text - have a proper location for this!
Expand Down Expand Up @@ -931,7 +936,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}

/** Empty storage format for default values and copies. */
private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, Map.empty)
private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty)

/**
* Create a [[CatalogStorageFormat]].
Expand Down Expand Up @@ -1012,6 +1017,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
entry("field.delim", ctx.fieldsTerminatedBy) ++
entry("serialization.format", ctx.fieldsTerminatedBy) ++
entry("escape.delim", ctx.escapedBy) ++
// The following typo is inherited from Hive...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this comment. Totally got me the last time.

entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++
entry("mapkey.delim", ctx.keysTerminatedBy) ++
Option(ctx.linesSeparatedBy).toSeq.map { token =>
Expand Down Expand Up @@ -1151,7 +1157,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {

case c: RowFormatSerdeContext =>
// Use a serde format.
val CatalogStorageFormat(None, None, None, Some(name), props) = visitRowFormatSerde(c)
val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c)

// SPARK-10310: Special cases LazySimpleSerDe
val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ object CreateDataSourceTableUtils extends Logging {
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
serdeProperties = options
),
properties = tableProperties.toMap)
Expand All @@ -368,6 +369,7 @@ object CreateDataSourceTableUtils extends Logging {
inputFormat = serde.inputFormat,
outputFormat = serde.outputFormat,
serde = serde.serde,
compressed = false,
serdeProperties = options
),
schema = relation.schema.map { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ package org.apache.spark.sql.execution.command

import java.io.File
import java.net.URI
import java.util.Date

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode}
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType}
import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType, StructType}
import org.apache.spark.util.Utils

case class CreateTableAsSelectLogicalPlan(
Expand Down Expand Up @@ -270,10 +271,10 @@ case class LoadData(
/**
* Command that looks like
* {{{
* DESCRIBE (EXTENDED) table_name;
* DESCRIBE [EXTENDED|FORMATTED] table_name;
* }}}
*/
case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isFormatted: Boolean)
extends RunnableCommand {

override val output: Seq[Attribute] = Seq(
Expand All @@ -290,29 +291,92 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean)
val result = new ArrayBuffer[Row]
sparkSession.sessionState.catalog.lookupRelation(table) match {
case catalogRelation: CatalogRelation =>
catalogRelation.catalogTable.schema.foreach { column =>
result += Row(column.name, column.dataType, column.comment.orNull)
}

if (catalogRelation.catalogTable.partitionColumns.nonEmpty) {
result += Row("# Partition Information", "", "")
result += Row(s"# ${output(0).name}", output(1).name, output(2).name)

catalogRelation.catalogTable.partitionColumns.foreach { col =>
result += Row(col.name, col.dataType, col.comment.orNull)
}
if (isExtended) {
describeExtended(catalogRelation, result)
} else if (isFormatted) {
describeFormatted(catalogRelation, result)
} else {
describe(catalogRelation, result)
}

case relation =>
relation.schema.fields.foreach { field =>
val comment =
if (field.metadata.contains("comment")) field.metadata.getString("comment") else ""
result += Row(field.name, field.dataType.simpleString, comment)
}
describeSchema(relation.schema, result)
}

result
}

// Shows data columns and partitioned columns (if any)
private def describe(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describeSchema(relation.catalogTable.schema, buffer)

if (relation.catalogTable.partitionColumns.nonEmpty) {
append(buffer, "# Partition Information", "", "")
append(buffer, s"# ${output(0).name}", output(1).name, output(2).name)
describeSchema(relation.catalogTable.partitionColumns, buffer)
}
}

private def describeExtended(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
describe(relation, buffer)

append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", relation.catalogTable.toString, "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng To improve the output of Explain, I plan to change the default implementation of toString of case class CatalogTable. That will also affect the output of Describe Extended.

I checked what Hive did for the command Describe Extended, as follows.

Detailed Table Information Table(tableName:t1, dbName:default, owner:root, createTime:1462627092, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:int, comment:null)], location:hdfs://6b68a24121f4:9000/user/hive/warehouse/t1, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{transient_lastDdlTime=1462627092}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)

Basically, in the implementation of toString, I will try to follow what you did in describeFormatted but the contents will be in a single line. Feel free to let me know if you have any concern or suggestion. Thanks!

}

private def describeFormatted(relation: CatalogRelation, buffer: ArrayBuffer[Row]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also use some sort of an append(string, string, string) function instead of calling buffer += Row(..., ..., ...) alot. But i guess that is just a matter of preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, actually there were several times that I forgot to add the trailing empty string(s) while working on this PR. Thanks!

describe(relation, buffer)

val table = relation.catalogTable

append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", "", "")
append(buffer, "Database:", table.database, "")
append(buffer, "Owner:", table.owner, "")
append(buffer, "Create Time:", new Date(table.createTime).toString, "")
append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "")
append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "")
append(buffer, "Table Type:", table.tableType.name, "")

append(buffer, "Table Parameters:", "", "")
table.properties.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}

append(buffer, "", "", "")
append(buffer, "# Storage Information", "", "")
table.storage.serde.foreach(serdeLib => append(buffer, "SerDe Library:", serdeLib, ""))
table.storage.inputFormat.foreach(format => append(buffer, "InputFormat:", format, ""))
table.storage.outputFormat.foreach(format => append(buffer, "OutputFormat:", format, ""))
append(buffer, "Compressed:", if (table.storage.compressed) "Yes" else "No", "")
append(buffer, "Num Buckets:", table.numBuckets.toString, "")
append(buffer, "Bucket Columns:", table.bucketColumnNames.mkString("[", ", ", "]"), "")
append(buffer, "Sort Columns:", table.sortColumnNames.mkString("[", ", ", "]"), "")

append(buffer, "Storage Desc Parameters:", "", "")
table.storage.serdeProperties.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
}

private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
val comment =
if (column.metadata.contains("comment")) column.metadata.getString("comment") else ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea: So 9/10 times I use column.metadata it is to get the comment. Shouldn't we just add this to the StructField...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the metadata is heavily used in ML code. Another thing is that the data type API is already public for a long time. We probably don't want to change it unless there are particular good reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not saying that we should (re)move it. I am only suggesting that it might be easier if we had such an accessor in StructField.

append(buffer, column.name, column.dataType.simpleString, comment)
}
}

private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
}
}

private def append(
buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = {
buffer += Row(column, dataType, comment)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
serdeProperties = Map())
catalog.createTable(CatalogTable(
identifier = name,
Expand All @@ -89,7 +90,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
catalog: SessionCatalog,
spec: TablePartitionSpec,
tableName: TableIdentifier): Unit = {
val part = CatalogTablePartition(spec, CatalogStorageFormat(None, None, None, None, Map()))
val part = CatalogTablePartition(
spec, CatalogStorageFormat(None, None, None, None, false, Map()))
catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
}

Expand Down Expand Up @@ -264,6 +266,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
serdeProperties = Map())
val expectedTable =
CatalogTable(
Expand All @@ -288,6 +291,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
serdeProperties = Map())
val expectedTable =
CatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,15 @@ private[hive] class HiveClientImpl(
sortColumnNames = Seq(), // TODO: populate this
bucketColumnNames = h.getBucketCols.asScala,
numBuckets = h.getNumBuckets,
owner = h.getOwner,
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
locationUri = shim.getDataLocation(h),
inputFormat = Option(h.getInputFormatClass).map(_.getName),
outputFormat = Option(h.getOutputFormatClass).map(_.getName),
serde = Option(h.getSerializationLib),
compressed = h.getTTable.getSd.isCompressed,
serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap
),
properties = h.getParameters.asScala.toMap,
Expand Down Expand Up @@ -784,7 +786,7 @@ private[hive] class HiveClientImpl(
inputFormat = Option(apiPartition.getSd.getInputFormat),
outputFormat = Option(apiPartition.getSd.getOutputFormat),
serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib),
compressed = apiPartition.getSd.isCompressed,
serdeProperties = apiPartition.getSd.getSerdeInfo.getParameters.asScala.toMap))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ case class CreateTableAsSelect(
outputFormat =
tableDesc.storage.outputFormat
.orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)))
serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
compressed = tableDesc.storage.compressed)

val withSchema = if (withFormat.schema.isEmpty) {
// Hive doesn't support specifying the column list for target table in CTAS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ class HiveDDLCommandSuite extends PlanTest {
assert(source2.table == "table2")
}

test("load data") {
test("load data") {
val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1"
val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect {
case LoadData(t, path, l, o, partition) => (t, path, l, o, partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
inputFormat = None,
outputFormat = None,
serde = None,
compressed = false,
serdeProperties = Map(
"path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
outputFormat = Some(
classOf[org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat[_, _]].getName),
serde = Some(classOf[org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe].getName()),
compressed = false,
serdeProperties = Map.empty
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,4 +348,21 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}

test("desc table") {
withTable("tab1") {
val tabName = "tab1"
sql(s"CREATE TABLE $tabName(c1 int)")

assert(sql(s"DESC $tabName").collect().length == 1)

assert(
sql(s"DESC FORMATTED $tabName").collect()
.exists(_.getString(0) == "# Storage Information"))

assert(
sql(s"DESC EXTENDED $tabName").collect()
.exists(_.getString(0) == "# Detailed Table Information"))
}
}
}