Skip to content

Commit 0b14b3f

Browse files
lianchengyhuai
authored andcommitted
[SPARK-14346] SHOW CREATE TABLE for data source tables
## What changes were proposed in this pull request? This PR adds native `SHOW CREATE TABLE` DDL command for data source tables. Support for Hive tables will be added in follow-up PR(s). To show table creation DDL for data source tables created by CTAS statements, this PR also added partitioning and bucketing support for normal `CREATE TABLE ... USING ...` syntax. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) A new test suite `ShowCreateTableSuite` is added in sql/hive package to test the new feature. Author: Cheng Lian <[email protected]> Closes #12781 from liancheng/spark-14346-show-create-table. (cherry picked from commit f036dd7) Signed-off-by: Yin Huai <[email protected]>
1 parent b2b04c6 commit 0b14b3f

File tree

14 files changed

+458
-127
lines changed

14 files changed

+458
-127
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ statement
4545
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
4646
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
4747
| createTableHeader ('(' colTypeList ')')? tableProvider
48-
(OPTIONS tablePropertyList)? #createTableUsing
48+
(OPTIONS tablePropertyList)?
49+
(PARTITIONED BY partitionColumnNames=identifierList)?
50+
bucketSpec? #createTableUsing
4951
| createTableHeader tableProvider
5052
(OPTIONS tablePropertyList)?
5153
(PARTITIONED BY partitionColumnNames=identifierList)?
@@ -102,6 +104,7 @@ statement
102104
((FROM | IN) db=identifier)? #showColumns
103105
| SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions
104106
| SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions
107+
| SHOW CREATE TABLE tableIdentifier #showCreateTable
105108
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
106109
| (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
107110
tableIdentifier partitionSpec? describeColName? #describeTable

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ sealed trait IdentifierWithDatabase {
4444
/**
4545
* Identifies a table in a database.
4646
* If `database` is not defined, the current database is used.
47-
* When we register a permenent function in the FunctionRegistry, we use
47+
* When we register a permanent function in the FunctionRegistry, we use
4848
* unquotedString as the function name.
4949
*/
5050
case class TableIdentifier(table: String, database: Option[String])

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
181181
ShowPartitionsCommand(table, partitionKeys)
182182
}
183183

184+
/**
185+
* Creates a [[ShowCreateTableCommand]]
186+
*/
187+
override def visitShowCreateTable(ctx: ShowCreateTableContext): LogicalPlan = withOrigin(ctx) {
188+
val table = visitTableIdentifier(ctx.tableIdentifier())
189+
ShowCreateTableCommand(table)
190+
}
191+
184192
/**
185193
* Create a [[RefreshTable]] logical plan.
186194
*/
@@ -287,6 +295,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
287295
}
288296
val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
289297
val provider = ctx.tableProvider.qualifiedName.getText
298+
val partitionColumnNames =
299+
Option(ctx.partitionColumnNames)
300+
.map(visitIdentifierList(_).toArray)
301+
.getOrElse(Array.empty[String])
290302
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
291303

292304
if (ctx.query != null) {
@@ -302,16 +314,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
302314
SaveMode.ErrorIfExists
303315
}
304316

305-
val partitionColumnNames =
306-
Option(ctx.partitionColumnNames)
307-
.map(visitIdentifierList(_).toArray)
308-
.getOrElse(Array.empty[String])
309-
310317
CreateTableUsingAsSelect(
311318
table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
312319
} else {
313320
val struct = Option(ctx.colTypeList()).map(createStructType)
314-
CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = true)
321+
CreateTableUsing(
322+
table,
323+
struct,
324+
provider,
325+
temp,
326+
options,
327+
partitionColumnNames,
328+
bucketSpec,
329+
ifNotExists,
330+
managedIfNoPath = true)
315331
}
316332
}
317333

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,10 +372,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
372372

373373
object DDLStrategy extends Strategy {
374374
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
375-
case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, opts, false, _) =>
375+
case c: CreateTableUsing if c.temporary && !c.allowExisting =>
376376
ExecutedCommandExec(
377377
CreateTempTableUsing(
378-
tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
378+
c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil
379379

380380
case c: CreateTableUsing if !c.temporary =>
381381
val cmd =
@@ -384,6 +384,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
384384
c.userSpecifiedSchema,
385385
c.provider,
386386
c.options,
387+
c.partitionColumns,
388+
c.bucketSpec,
387389
c.allowExisting,
388390
c.managedIfNoPath)
389391
ExecutedCommandExec(cmd) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala

Lines changed: 2 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,14 @@
1717

1818
package org.apache.spark.sql.execution.command
1919

20-
import java.io.File
21-
2220
import org.apache.spark.rdd.RDD
23-
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
24-
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
25-
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
26-
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
21+
import org.apache.spark.sql.{Row, SparkSession}
22+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2723
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2824
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
2925
import org.apache.spark.sql.catalyst.plans.logical
3026
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3127
import org.apache.spark.sql.execution.SparkPlan
32-
import org.apache.spark.sql.execution.datasources.PartitioningUtils
3328
import org.apache.spark.sql.execution.debug._
3429
import org.apache.spark.sql.types._
3530

@@ -117,101 +112,3 @@ case class ExplainCommand(
117112
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
118113
}
119114
}
120-
121-
/**
122-
* A command to list the column names for a table. This function creates a
123-
* [[ShowColumnsCommand]] logical plan.
124-
*
125-
* The syntax of using this command in SQL is:
126-
* {{{
127-
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
128-
* }}}
129-
*/
130-
case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
131-
// The result of SHOW COLUMNS has one column called 'result'
132-
override val output: Seq[Attribute] = {
133-
AttributeReference("result", StringType, nullable = false)() :: Nil
134-
}
135-
136-
override def run(sparkSession: SparkSession): Seq[Row] = {
137-
sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
138-
Row(c.name)
139-
}
140-
}
141-
}
142-
143-
/**
144-
* A command to list the partition names of a table. If the partition spec is specified,
145-
* partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
146-
* the following conditions:
147-
*
148-
* 1. If the command is called for a non partitioned table.
149-
* 2. If the partition spec refers to the columns that are not defined as partitioning columns.
150-
*
151-
* This function creates a [[ShowPartitionsCommand]] logical plan
152-
*
153-
* The syntax of using this command in SQL is:
154-
* {{{
155-
* SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
156-
* }}}
157-
*/
158-
case class ShowPartitionsCommand(
159-
table: TableIdentifier,
160-
spec: Option[TablePartitionSpec]) extends RunnableCommand {
161-
// The result of SHOW PARTITIONS has one column called 'result'
162-
override val output: Seq[Attribute] = {
163-
AttributeReference("result", StringType, nullable = false)() :: Nil
164-
}
165-
166-
private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
167-
partColNames.map { name =>
168-
PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
169-
}.mkString(File.separator)
170-
}
171-
172-
override def run(sparkSession: SparkSession): Seq[Row] = {
173-
val catalog = sparkSession.sessionState.catalog
174-
val db = table.database.getOrElse(catalog.getCurrentDatabase)
175-
if (catalog.isTemporaryTable(table)) {
176-
throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " +
177-
s"${table.unquotedString}")
178-
} else {
179-
val tab = catalog.getTableMetadata(table)
180-
/**
181-
* Validate and throws an [[AnalysisException]] exception under the following conditions:
182-
* 1. If the table is not partitioned.
183-
* 2. If it is a datasource table.
184-
* 3. If it is a view or index table.
185-
*/
186-
if (tab.tableType == CatalogTableType.VIEW ||
187-
tab.tableType == CatalogTableType.INDEX) {
188-
throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
189-
s"${tab.qualifiedName}")
190-
}
191-
if (!DDLUtils.isTablePartitioned(tab)) {
192-
throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " +
193-
s"partitioned: ${tab.qualifiedName}")
194-
}
195-
if (DDLUtils.isDatasourceTable(tab)) {
196-
throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " +
197-
s"${tab.qualifiedName}")
198-
}
199-
/**
200-
* Validate the partitioning spec by making sure all the referenced columns are
201-
* defined as partitioning columns in table definition. An AnalysisException exception is
202-
* thrown if the partitioning spec is invalid.
203-
*/
204-
if (spec.isDefined) {
205-
val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
206-
if (badColumns.nonEmpty) {
207-
throw new AnalysisException(
208-
s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " +
209-
s"specified for SHOW PARTITIONS")
210-
}
211-
}
212-
val partNames =
213-
catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames))
214-
partNames.map { p => Row(p) }
215-
}
216-
}
217-
}

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ case class CreateDataSourceTableCommand(
5151
userSpecifiedSchema: Option[StructType],
5252
provider: String,
5353
options: Map[String, String],
54+
partitionColumns: Array[String],
55+
bucketSpec: Option[BucketSpec],
5456
ignoreIfExists: Boolean,
5557
managedIfNoPath: Boolean)
5658
extends RunnableCommand {
@@ -103,8 +105,8 @@ case class CreateDataSourceTableCommand(
103105
sparkSession = sparkSession,
104106
tableIdent = tableIdent,
105107
userSpecifiedSchema = userSpecifiedSchema,
106-
partitionColumns = Array.empty[String],
107-
bucketSpec = None,
108+
partitionColumns = partitionColumns,
109+
bucketSpec = bucketSpec,
108110
provider = provider,
109111
options = optionsWithPath,
110112
isExternal = isExternal)

0 commit comments

Comments
 (0)