Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTableUsing
(OPTIONS tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? #createTableUsing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Do we allow users to specify bucketing without providing partitioning columns? Seems only DynamicPartitionWriterContainer support bucketSpec?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do. If partition columns are empty but bucket columns are not, we will also use DynamicPartitionWriterContainer, see: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala#L124-L136

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor

@yhuai yhuai May 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that OPTIONS goes first. I am wondering if it makes sense putting OPTIONS after PARTITIONED BY and bucketSpec.

(Update: let me ask around)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add tests for this syntax (right now we only have tests for show create table).

| createTableHeader tableProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but should we combine these 2 rules? They are all called createTableUsing but the second one is actually createTableUsingAsSelect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for not combining them is that the first is a CREATE TABLE statement (and has column and no query) and that the second one is CREATE TABLE ... AS statement (which has no columns and a query). This way the parser will catch input that both defines columns and a query.

We could also check this in the SparkSqlAstBuilder...

(OPTIONS tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
Expand Down Expand Up @@ -102,6 +104,7 @@ statement
((FROM | IN) db=identifier)? #showColumns
| SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions
| SHOW FUNCTIONS (LIKE? (qualifiedName | pattern=STRING))? #showFunctions
| SHOW CREATE TABLE tableIdentifier #showCreateTable
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
tableIdentifier partitionSpec? describeColName? #describeTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ sealed trait IdentifierWithDatabase {
/**
* Identifies a table in a database.
* If `database` is not defined, the current database is used.
* When we register a permenent function in the FunctionRegistry, we use
* When we register a permanent function in the FunctionRegistry, we use
* unquotedString as the function name.
*/
case class TableIdentifier(table: String, database: Option[String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ShowPartitionsCommand(table, partitionKeys)
}

/**
* Creates a [[ShowCreateTableCommand]]
*/
override def visitShowCreateTable(ctx: ShowCreateTableContext): LogicalPlan = withOrigin(ctx) {
val table = visitTableIdentifier(ctx.tableIdentifier())
ShowCreateTableCommand(table)
}

/**
* Create a [[RefreshTable]] logical plan.
*/
Expand Down Expand Up @@ -287,6 +295,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

if (ctx.query != null) {
Expand All @@ -302,16 +314,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
SaveMode.ErrorIfExists
}

val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])

CreateTableUsingAsSelect(
table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
} else {
val struct = Option(ctx.colTypeList()).map(createStructType)
CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = true)
CreateTableUsing(
table,
struct,
provider,
temp,
options,
partitionColumnNames,
bucketSpec,
ifNotExists,
managedIfNoPath = true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object DDLStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableUsing(tableIdent, userSpecifiedSchema, provider, true, opts, false, _) =>
case c: CreateTableUsing if c.temporary && !c.allowExisting =>
ExecutedCommandExec(
CreateTempTableUsing(
tableIdent, userSpecifiedSchema, provider, opts)) :: Nil
c.tableIdent, c.userSpecifiedSchema, c.provider, c.options)) :: Nil

case c: CreateTableUsing if !c.temporary =>
val cmd =
Expand All @@ -384,6 +384,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
c.userSpecifiedSchema,
c.provider,
c.options,
c.partitionColumns,
c.bucketSpec,
c.allowExisting,
c.managedIfNoPath)
ExecutedCommandExec(cmd) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@

package org.apache.spark.sql.execution.command

import java.io.File

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.debug._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -117,101 +112,3 @@ case class ExplainCommand(
("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_))
}
}

/**
* A command to list the column names for a table. This function creates a
* [[ShowColumnsCommand]] logical plan.
*
* The syntax of using this command in SQL is:
* {{{
* SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
* }}}
*/
case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand {
// The result of SHOW COLUMNS has one column called 'result'
override val output: Seq[Attribute] = {
AttributeReference("result", StringType, nullable = false)() :: Nil
}

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c =>
Row(c.name)
}
}
}

/**
* A command to list the partition names of a table. If the partition spec is specified,
* partitions that match the spec are returned. [[AnalysisException]] exception is thrown under
* the following conditions:
*
* 1. If the command is called for a non partitioned table.
* 2. If the partition spec refers to the columns that are not defined as partitioning columns.
*
* This function creates a [[ShowPartitionsCommand]] logical plan
*
* The syntax of using this command in SQL is:
* {{{
* SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
* }}}
*/
case class ShowPartitionsCommand(
table: TableIdentifier,
spec: Option[TablePartitionSpec]) extends RunnableCommand {
// The result of SHOW PARTITIONS has one column called 'result'
override val output: Seq[Attribute] = {
AttributeReference("result", StringType, nullable = false)() :: Nil
}

private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = {
partColNames.map { name =>
PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name))
}.mkString(File.separator)
}

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val db = table.database.getOrElse(catalog.getCurrentDatabase)
if (catalog.isTemporaryTable(table)) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a temporary table: " +
s"${table.unquotedString}")
} else {
val tab = catalog.getTableMetadata(table)
/**
* Validate and throws an [[AnalysisException]] exception under the following conditions:
* 1. If the table is not partitioned.
* 2. If it is a datasource table.
* 3. If it is a view or index table.
*/
if (tab.tableType == CatalogTableType.VIEW ||
tab.tableType == CatalogTableType.INDEX) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a view or index table: " +
s"${tab.qualifiedName}")
}
if (!DDLUtils.isTablePartitioned(tab)) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a table that is not " +
s"partitioned: ${tab.qualifiedName}")
}
if (DDLUtils.isDatasourceTable(tab)) {
throw new AnalysisException("SHOW PARTITIONS is not allowed on a datasource table: " +
s"${tab.qualifiedName}")
}
/**
* Validate the partitioning spec by making sure all the referenced columns are
* defined as partitioning columns in table definition. An AnalysisException exception is
* thrown if the partitioning spec is invalid.
*/
if (spec.isDefined) {
val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
if (badColumns.nonEmpty) {
throw new AnalysisException(
s"Non-partitioning column(s) [${badColumns.mkString(", ")}] are " +
s"specified for SHOW PARTITIONS")
}
}
val partNames =
catalog.listPartitions(table, spec).map(p => getPartName(p.spec, tab.partitionColumnNames))
partNames.map { p => Row(p) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ case class CreateDataSourceTableCommand(
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String],
partitionColumns: Array[String],
bucketSpec: Option[BucketSpec],
ignoreIfExists: Boolean,
managedIfNoPath: Boolean)
extends RunnableCommand {
Expand Down Expand Up @@ -103,8 +105,8 @@ case class CreateDataSourceTableCommand(
sparkSession = sparkSession,
tableIdent = tableIdent,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
bucketSpec = None,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
provider = provider,
options = optionsWithPath,
isExternal = isExternal)
Expand Down
Loading