Skip to content

Commit 7795355

Browse files
committed
Add ShowCreateTableAsSparkCommand.
1 parent ed280c2 commit 7795355

File tree

6 files changed

+398
-75
lines changed

6 files changed

+398
-75
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ statement
183183
| SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions
184184
| SHOW identifier? FUNCTIONS
185185
(LIKE? (qualifiedName | pattern=STRING))? #showFunctions
186-
| SHOW CREATE TABLE tableIdentifier #showCreateTable
186+
| SHOW CREATE TABLE tableIdentifier (AS SPARK)? #showCreateTable
187187
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
188188
| (DESC | DESCRIBE) database EXTENDED? identifier #describeDatabase
189189
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
@@ -1027,6 +1027,7 @@ ansiNonReserved
10271027
| SKEWED
10281028
| SORT
10291029
| SORTED
1030+
| SPARK
10301031
| START
10311032
| STATISTICS
10321033
| STORED
@@ -1283,6 +1284,7 @@ nonReserved
12831284
| SOME
12841285
| SORT
12851286
| SORTED
1287+
| SPARK
12861288
| START
12871289
| STATISTICS
12881290
| STORED
@@ -1541,6 +1543,7 @@ SKEWED: 'SKEWED';
15411543
SOME: 'SOME';
15421544
SORT: 'SORT';
15431545
SORTED: 'SORTED';
1546+
SPARK: 'SPARK';
15441547
START: 'START';
15451548
STATISTICS: 'STATISTICS';
15461549
STORED: 'STORED';

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,11 +235,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
235235
}
236236

237237
/**
238-
* Creates a [[ShowCreateTableCommand]]
238+
* Creates a [[ShowCreateTableCommand]] or [[ShowCreateTableAsSparkCommand]]
239239
*/
240240
override def visitShowCreateTable(ctx: ShowCreateTableContext): LogicalPlan = withOrigin(ctx) {
241241
val table = visitTableIdentifier(ctx.tableIdentifier())
242-
ShowCreateTableCommand(table)
242+
if (ctx.SPARK != null) {
243+
ShowCreateTableAsSparkCommand(table)
244+
} else {
245+
ShowCreateTableCommand(table)
246+
}
243247
}
244248

245249
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 153 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import java.io.File
2120
import java.net.{URI, URISyntaxException}
22-
import java.nio.file.FileSystems
2321

2422
import scala.collection.mutable.ArrayBuffer
2523
import scala.util.Try
@@ -29,7 +27,7 @@ import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
2927

3028
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
3129
import org.apache.spark.sql.catalyst.TableIdentifier
32-
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute, UnresolvedRelation}
30+
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
3331
import org.apache.spark.sql.catalyst.catalog._
3432
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
3533
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -44,7 +42,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2
4442
import org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2
4543
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
4644
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
47-
import org.apache.spark.sql.internal.SQLConf
45+
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
4846
import org.apache.spark.sql.types._
4947
import org.apache.spark.sql.util.SchemaUtils
5048

@@ -942,7 +940,95 @@ case class ShowPartitionsCommand(
942940
}
943941
}
944942

945-
case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableCommand {
943+
/**
944+
* Provides common utilities between `ShowCreateTableCommand` and `ShowCreateTableAsSparkCommand`.
945+
*/
946+
trait ShowCreateTableCommandBase {
947+
948+
protected val table: TableIdentifier
949+
950+
protected def showTableLocation(metadata: CatalogTable, builder: StringBuilder): Unit = {
951+
if (metadata.tableType == EXTERNAL) {
952+
metadata.storage.locationUri.foreach { location =>
953+
builder ++= s"LOCATION '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'\n"
954+
}
955+
}
956+
}
957+
958+
protected def showTableComment(metadata: CatalogTable, builder: StringBuilder): Unit = {
959+
metadata
960+
.comment
961+
.map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
962+
.foreach(builder.append)
963+
}
964+
965+
protected def showTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
966+
if (metadata.properties.nonEmpty) {
967+
val props = metadata.properties.map { case (key, value) =>
968+
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
969+
}
970+
971+
builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
972+
}
973+
}
974+
975+
protected def showDataSourceTableDataColumns(
976+
metadata: CatalogTable, builder: StringBuilder): Unit = {
977+
val columns = metadata.schema.fields.map(_.toDDL)
978+
builder ++= columns.mkString("(", ", ", ")\n")
979+
}
980+
981+
protected def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
982+
builder ++= s"USING ${metadata.provider.get}\n"
983+
984+
val dataSourceOptions = metadata.storage.properties.map {
985+
case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
986+
}
987+
988+
if (dataSourceOptions.nonEmpty) {
989+
builder ++= "OPTIONS (\n"
990+
builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n")
991+
builder ++= ")\n"
992+
}
993+
}
994+
995+
protected def showDataSourceTableNonDataColumns(
996+
metadata: CatalogTable, builder: StringBuilder): Unit = {
997+
val partCols = metadata.partitionColumnNames
998+
if (partCols.nonEmpty) {
999+
builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
1000+
}
1001+
1002+
metadata.bucketSpec.foreach { spec =>
1003+
if (spec.bucketColumnNames.nonEmpty) {
1004+
builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"
1005+
1006+
if (spec.sortColumnNames.nonEmpty) {
1007+
builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n"
1008+
}
1009+
1010+
builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
1011+
}
1012+
}
1013+
}
1014+
1015+
protected def showCreateDataSourceTable(metadata: CatalogTable): String = {
1016+
val builder = StringBuilder.newBuilder
1017+
1018+
builder ++= s"CREATE TABLE ${table.quotedString} "
1019+
showDataSourceTableDataColumns(metadata, builder)
1020+
showDataSourceTableOptions(metadata, builder)
1021+
showDataSourceTableNonDataColumns(metadata, builder)
1022+
showTableComment(metadata, builder)
1023+
showTableLocation(metadata, builder)
1024+
showTableProperties(metadata, builder)
1025+
1026+
builder.toString()
1027+
}
1028+
}
1029+
1030+
case class ShowCreateTableCommand(table: TableIdentifier)
1031+
extends RunnableCommand with ShowCreateTableCommandBase {
9461032
override val output: Seq[Attribute] = Seq(
9471033
AttributeReference("createtab_stmt", StringType, nullable = false)()
9481034
)
@@ -1057,83 +1143,79 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
10571143
}
10581144
}
10591145
}
1146+
}
10601147

1061-
private def showTableLocation(metadata: CatalogTable, builder: StringBuilder): Unit = {
1062-
if (metadata.tableType == EXTERNAL) {
1063-
metadata.storage.locationUri.foreach { location =>
1064-
builder ++= s"LOCATION '${escapeSingleQuotedString(CatalogUtils.URIToString(location))}'\n"
1065-
}
1066-
}
1067-
}
1148+
/**
1149+
* This commands generates Spark DDL for Hive table.
1150+
*
1151+
* The syntax of using this command in SQL is:
1152+
* {{{
1153+
* SHOW CREATE TABLE table_identifier AS SPARK;
1154+
* }}}
1155+
*/
1156+
case class ShowCreateTableAsSparkCommand(table: TableIdentifier)
1157+
extends RunnableCommand with ShowCreateTableCommandBase {
1158+
override val output: Seq[Attribute] = Seq(
1159+
AttributeReference("sparktab_stmt", StringType, nullable = false)()
1160+
)
10681161

1069-
private def showTableComment(metadata: CatalogTable, builder: StringBuilder): Unit = {
1070-
metadata
1071-
.comment
1072-
.map("COMMENT '" + escapeSingleQuotedString(_) + "'\n")
1073-
.foreach(builder.append)
1074-
}
1162+
override def run(sparkSession: SparkSession): Seq[Row] = {
1163+
val catalog = sparkSession.sessionState.catalog
1164+
val tableMetadata = catalog.getTableMetadata(table)
10751165

1076-
private def showTableProperties(metadata: CatalogTable, builder: StringBuilder): Unit = {
1077-
if (metadata.properties.nonEmpty) {
1078-
val props = metadata.properties.map { case (key, value) =>
1079-
s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'"
1166+
val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
1167+
throw new AnalysisException(
1168+
s"$table is already a Spark data source table. Using `SHOW CREATE TABLE` instead.")
1169+
} else {
1170+
if (tableMetadata.unsupportedFeatures.nonEmpty) {
1171+
throw new AnalysisException(
1172+
"Failed to execute SHOW CREATE TABLE AS SPARK against table " +
1173+
s"${tableMetadata.identifier}, which is created by Hive and uses the " +
1174+
"following unsupported feature(s)\n" +
1175+
tableMetadata.unsupportedFeatures.map(" - " + _).mkString("\n")
1176+
)
10801177
}
10811178

1082-
builder ++= props.mkString("TBLPROPERTIES (\n ", ",\n ", "\n)\n")
1083-
}
1084-
}
1085-
1086-
private def showCreateDataSourceTable(metadata: CatalogTable): String = {
1087-
val builder = StringBuilder.newBuilder
1088-
1089-
builder ++= s"CREATE TABLE ${table.quotedString} "
1090-
showDataSourceTableDataColumns(metadata, builder)
1091-
showDataSourceTableOptions(metadata, builder)
1092-
showDataSourceTableNonDataColumns(metadata, builder)
1093-
showTableComment(metadata, builder)
1094-
showTableLocation(metadata, builder)
1095-
showTableProperties(metadata, builder)
1096-
1097-
builder.toString()
1098-
}
1099-
1100-
private def showDataSourceTableDataColumns(
1101-
metadata: CatalogTable, builder: StringBuilder): Unit = {
1102-
val columns = metadata.schema.fields.map(_.toDDL)
1103-
builder ++= columns.mkString("(", ", ", ")\n")
1104-
}
1105-
1106-
private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
1107-
builder ++= s"USING ${metadata.provider.get}\n"
1179+
if (tableMetadata.tableType == VIEW) {
1180+
throw new AnalysisException("Hive view isn't supported by SHOW CREATE TABLE AS SPARK")
1181+
}
11081182

1109-
val dataSourceOptions = metadata.storage.properties.map {
1110-
case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
1183+
showCreateDataSourceTable(convertTableMetadata(tableMetadata))
11111184
}
11121185

1113-
if (dataSourceOptions.nonEmpty) {
1114-
builder ++= "OPTIONS (\n"
1115-
builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n")
1116-
builder ++= ")\n"
1117-
}
1186+
Seq(Row(stmt))
11181187
}
11191188

1120-
private def showDataSourceTableNonDataColumns(
1121-
metadata: CatalogTable, builder: StringBuilder): Unit = {
1122-
val partCols = metadata.partitionColumnNames
1123-
if (partCols.nonEmpty) {
1124-
builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
1125-
}
1126-
1127-
metadata.bucketSpec.foreach { spec =>
1128-
if (spec.bucketColumnNames.nonEmpty) {
1129-
builder ++= s"CLUSTERED BY ${spec.bucketColumnNames.mkString("(", ", ", ")")}\n"
1130-
1131-
if (spec.sortColumnNames.nonEmpty) {
1132-
builder ++= s"SORTED BY ${spec.sortColumnNames.mkString("(", ", ", ")")}\n"
1133-
}
1134-
1135-
builder ++= s"INTO ${spec.numBuckets} BUCKETS\n"
1189+
private def convertTableMetadata(tableMetadata: CatalogTable): CatalogTable = {
1190+
val hiveSerde = HiveSerDe(
1191+
serde = tableMetadata.storage.serde,
1192+
inputFormat = tableMetadata.storage.inputFormat,
1193+
outputFormat = tableMetadata.storage.outputFormat)
1194+
1195+
// Looking for Spark data source that maps to to the Hive serde.
1196+
// TODO: some Hive fileformat + row serde might be mapped to Spark data source, e.g. CSV.
1197+
val source = HiveSerDe.serdeToSource(hiveSerde)
1198+
if (source.isEmpty) {
1199+
val builder = StringBuilder.newBuilder
1200+
hiveSerde.serde.foreach { serde =>
1201+
builder ++= s" SERDE: $serde"
1202+
}
1203+
hiveSerde.inputFormat.foreach { format =>
1204+
builder ++= s" INPUTFORMAT: $format"
11361205
}
1206+
hiveSerde.outputFormat.foreach { format =>
1207+
builder ++= s" OUTPUTFORMAT: $format"
1208+
}
1209+
throw new AnalysisException(
1210+
"Failed to execute SHOW CREATE TABLE AS SPARK against table " +
1211+
s"${tableMetadata.identifier}, which is created by Hive and uses the " +
1212+
"following unsupported serde configuration\n" +
1213+
builder.toString()
1214+
)
1215+
} else {
1216+
// TODO: should we keep Hive serde properties?
1217+
val newStorage = tableMetadata.storage.copy(properties = Map.empty)
1218+
tableMetadata.copy(provider = source, storage = newStorage)
11371219
}
11381220
}
11391221
}

sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ object HiveSerDe {
6565
outputFormat = Option("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"),
6666
serde = Option("org.apache.hadoop.hive.serde2.avro.AvroSerDe")))
6767

68+
// `HiveSerDe` in `serdeMap` should be dintinct.
69+
val serdeInverseMap: Map[HiveSerDe, String] = serdeMap.flatMap {
70+
case ("sequencefile", _) => None
71+
case ("rcfile", _) => None
72+
case ("textfile", serde) => Some((serde, "text"))
73+
case pair => Some(pair.swap)
74+
}
75+
6876
/**
6977
* Get the Hive SerDe information from the data source abbreviation string or classname.
7078
*
@@ -88,6 +96,14 @@ object HiveSerDe {
8896
serdeMap.get(key)
8997
}
9098

99+
/**
100+
* Get the Spark data source name from the Hive SerDe information.
101+
*
102+
* @param serde Hive SerDe information.
103+
* @return Spark data source name associated with the specified Hive Serde.
104+
*/
105+
def serdeToSource(serde: HiveSerDe): Option[String] = serdeInverseMap.get(serde)
106+
91107
def getDefaultStorage(conf: SQLConf): CatalogStorageFormat = {
92108
// To respect hive-site.xml, it peeks Hadoop configuration from existing Spark session,
93109
// as an easy workaround. See SPARK-27555.

sql/core/src/test/scala/org/apache/spark/sql/ShowCreateTableSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ abstract class ShowCreateTableSuite extends QueryTest with SQLTestUtils {
200200
}
201201
}
202202

203-
private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
203+
protected def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
204204
def normalize(table: CatalogTable): CatalogTable = {
205205
val nondeterministicProps = Set(
206206
"CreateTime",

0 commit comments

Comments
 (0)