Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2446,7 +2446,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

/**
* Type to keep track of table clauses:
* (partTransforms, partCols, bucketSpec, properties, options, location, comment, serde).
* - partition transforms
* - partition columns
* - bucketSpec
* - properties
* - options
* - location
* - comment
* - serde
*
* Note: Partition transforms are based on existing table schema definition. It can be simple
* column names, or functions like `year(date_col)`. Partition columns are column names with data
* types like `i INT`, which should be appended to the existing table schema.
*/
type TableClauses = (
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
Expand Down Expand Up @@ -2802,8 +2813,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* [NULL DEFINED AS char]
* }}}
*/
def visitRowFormat(
ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
def visitRowFormat(ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
ctx match {
case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
Expand Down Expand Up @@ -2923,16 +2933,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val location = visitLocationSpecList(ctx.locationSpec())
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
val comment = visitCommentSpecList(ctx.commentSpec())

validateRowFormatFileFormat(
ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
val fileFormatSerdeInfo = ctx.createFileFormat.asScala.map(visitCreateFileFormat)
val rowFormatSerdeInfo = ctx.rowFormat.asScala.map(visitRowFormat)
val serdeInfo =
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((x, y) => x.merge(y))

val serdeInfo = getSerdeInfo(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make a method for it, so that we can reuse it in SparkSqlAstBuilder

(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
serdeInfo)
serdeInfo)
}

protected def getSerdeInfo(
rowFormatCtx: Seq[RowFormatContext],
createFileFormatCtx: Seq[CreateFileFormatContext],
ctx: ParserRuleContext): Option[SerdeInfo] = {
validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx)
val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat)
val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat)
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption(_ merge _)
}

private def partitionExpressions(
Expand All @@ -2943,8 +2956,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
if (partCols.nonEmpty) {
val references = partTransforms.map(_.describe()).mkString(", ")
val columns = partCols
.map(field => s"${field.name} ${field.dataType.simpleString}")
.mkString(", ")
.map(field => s"${field.name} ${field.dataType.simpleString}")
.mkString(", ")
operationNotAllowed(
s"""PARTITION BY: Cannot mix partition expressions and partition columns:
|Expressions: $references
Expand All @@ -2966,12 +2979,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* Expected format:
* {{{
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* [USING table_provider]
* create_table_clauses
* [[AS] select_statement];
*
* create_table_clauses (order insensitive):
* partition_clauses
* [PARTITIONED BY (partition_fields)]
* [OPTIONS table_property_list]
* [ROW FORMAT row_format]
* [STORED AS file_format]
Expand All @@ -2982,15 +2995,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* partition_clauses:
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)] |
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
*
* partition_fields:
* col_name, transform(col_name), transform(constant, col_name), ... |
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)

val columns = Option(ctx.colTypeList()).map(visitColTypeList)
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
visitCreateTableClauses(ctx.createTableClauses())
Expand All @@ -2999,37 +3013,34 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
}

val schema = columns
.map(dataCols => StructType(dataCols ++ partCols))
.getOrElse(StructType(partCols))
if (temp) {
operationNotAllowed("CREATE TEMPORARY TABLE is not supported yet. " +
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the other error message, "CREATE TEMPORARY TABLE ... AS ..., use CREATE TEMPORARY VIEW instead".

As I noted on the similar case, "is not supported yet" is both redundant and misleading. I don't think that Spark intends to implement CREATE TEMPORARY TABLE. Even if it may be implemented, it has not been supported for years, so there is no value in implying that it will be supported.

Please update to use the simpler and clearer error mesage.

}

val partitioning = partitionExpressions(partTransforms, partCols, ctx)

Option(ctx.query).map(plan) match {
case Some(_) if temp =>
operationNotAllowed(
"CREATE TEMPORARY TABLE ... AS ..., use CREATE TEMPORARY VIEW instead",
ctx)

case Some(_) if columns.isDefined =>
case Some(_) if columns.nonEmpty =>
operationNotAllowed(
"Schema may not be specified in a Create Table As Select (CTAS) statement",
ctx)

case Some(_) if partCols.nonEmpty =>
// non-reference partition columns are not allowed because schema can't be specified
operationNotAllowed(
"Partition column types may not be specified in Create Table As Select (CTAS)",
ctx)
val errorMessage = "Create Partitioned Table As Select cannot specify data type for " +
"the partition columns of the target table."
operationNotAllowed(errorMessage, ctx)

case Some(query) =>
CreateTableAsSelectStatement(
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)

case None if temp =>
operationNotAllowed("CREATE TEMPORARY TABLE", ctx)

case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
}
Expand All @@ -3041,71 +3052,64 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* Expected format:
* {{{
* [CREATE OR] REPLACE TABLE [db_name.]table_name
* USING table_provider
* [USING table_provider]
* replace_table_clauses
* [[AS] select_statement];
*
* replace_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)]
* [PARTITIONED BY (partition_fields)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
*
* partition_fields:
* col_name, transform(col_name), transform(constant, col_name), ... |
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
* }}}
*/
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader)
if (temp) {
operationNotAllowed(
"CREATE OR REPLACE TEMPORARY TABLE ..., use CREATE TEMPORARY VIEW instead",
ctx)
}

if (external) {
operationNotAllowed("REPLACE EXTERNAL TABLE ...", ctx)
}

if (ifNotExists) {
operationNotAllowed("REPLACE ... IF NOT EXISTS, use CREATE IF NOT EXISTS instead", ctx)
}

assert(!temp && !ifNotExists && !external)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visitReplaceTableHeader simply return 3 false for these 3 properties. So it's simpler to use assert here.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is bad practice for a method to assume it will get certain results from other methods. While using an assert handles correctness, if there is a change that violates the assertion, a user would get a nearly unusable error.

I think this change should be reverted so that the error messages are meaningful.

val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
visitCreateTableClauses(ctx.createTableClauses())
val columns = Option(ctx.colTypeList()).map(visitColTypeList)
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)

if (provider.isDefined && serdeInfo.isDefined) {
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
}

val schema = columns.map(dataCols => StructType(dataCols ++ partCols))
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
val orCreate = ctx.replaceTableHeader().CREATE() != null

Option(ctx.query).map(plan) match {
case Some(_) if schema.isDefined =>
case Some(_) if columns.nonEmpty =>
operationNotAllowed(
"Schema may not be specified in a Replace Table As Select (RTAS) statement",
ctx)

case Some(_) if partCols.nonEmpty =>
operationNotAllowed(
"Partition column types may not be specified in Replace Table As Select (RTAS)",
ctx)
// non-reference partition columns are not allowed because schema can't be specified
val errorMessage = "Replace Partitioned Table As Select cannot specify data type for " +
"the partition columns of the target table."
operationNotAllowed(errorMessage, ctx)

case Some(query) =>
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
provider, options, location, comment, writeOptions = Map.empty, serdeInfo,
orCreate = orCreate)

case _ =>
ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
Copy link
Author

@cloud-fan cloud-fan Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, when table columns list is not specified, we ignore the partition columns with data type. It was fine before syntax merging, as there was no partition columns with data type in REPLACE TABLE. But now it's better to make it consistent with CREATE TABLE. I also added test to check it: https://github.com/rdblue/spark/pull/8/files#diff-b9e91f767e5562861565b0ce78759af3bcb7fff405a81e928894641147db2ae4R293

bucketSpec, properties, provider, options, location, comment, serdeInfo,
orCreate = orCreate)
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
options, location, comment, serdeInfo, orCreate = orCreate)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.ViewType
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -63,11 +64,11 @@ case class SerdeInfo(
serdeProperties: Map[String, String] = Map.empty) {
// this uses assertions because validation is done in validateRowFormatFileFormat etc.
assert(storedAs.isEmpty || formatClasses.isEmpty,
s"Conflicting STORED AS $storedAs and INPUTFORMAT/OUTPUTFORMAT $formatClasses values")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird to print scala Option directly.

"Cannot specify both STORED AS and INPUTFORMAT/OUTPUTFORMAT")

def describe: String = {
val serdeString = if (serde.isDefined || serdeProperties.nonEmpty) {
"ROW FORMAT" + serde.map(sd => s" SERDE $sd").getOrElse(" DELIMITED")
"ROW FORMAT " + serde.map(sd => s"SERDE $sd").getOrElse("DELIMITED")
} else {
""
}
Expand All @@ -76,7 +77,7 @@ case class SerdeInfo(
case SerdeInfo(Some(format), _, _, _) =>
s"STORED AS $format $serdeString"
case SerdeInfo(_, Some((inFormat, outFormat)), _, _) =>
s"INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString"
s"STORED AS INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString"
case _ =>
serdeString
}
Expand All @@ -85,7 +86,7 @@ case class SerdeInfo(
def merge(other: SerdeInfo): SerdeInfo = {
def getOnly[T](desc: String, left: Option[T], right: Option[T]): Option[T] = {
(left, right) match {
case (Some(l), Some(r)) if l != r =>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise the assert below is useless.

case (Some(l), Some(r)) =>
assert(l == r, s"Conflicting $desc values: $l != $r")
left
case (Some(_), _) =>
Expand All @@ -97,6 +98,7 @@ case class SerdeInfo(
}
}

SerdeInfo.checkSerdePropMerging(serdeProperties, other.serdeProperties)
SerdeInfo(
getOnly("STORED AS", storedAs, other.storedAs),
getOnly("INPUTFORMAT/OUTPUTFORMAT", formatClasses, other.formatClasses),
Expand All @@ -106,8 +108,18 @@ case class SerdeInfo(
}

object SerdeInfo {
val empty: SerdeInfo = {
SerdeInfo(None, None, None, Map.empty)
val empty: SerdeInfo = SerdeInfo(None, None, None, Map.empty)

def checkSerdePropMerging(
props1: Map[String, String], props2: Map[String, String]): Unit = {
if (props1.keySet.intersect(props2.keySet).nonEmpty) {
throw new UnsupportedOperationException(
s"""
|Cannot safely merge SERDEPROPERTIES:
|${props1.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
|${props2.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
|""".stripMargin)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,19 @@ private[sql] object CatalogV2Util {
options ++ // to make the transition to the "option." prefix easier, add both
options.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value } ++
convertToProperties(serdeInfo) ++
(if (external) Map(TableCatalog.PROP_EXTERNAL -> "true") else Map.empty) ++
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
comment.map(TableCatalog.PROP_COMMENT -> _) ++
location.map(TableCatalog.PROP_LOCATION -> _)
}

/**
* Converts Hive Serde info to table properties. The mapped property keys are:
* - INPUTFORMAT/OUTPUTFORMAT: hive.input/output-format
* - STORED AS: hive.stored-as
* - ROW FORMAT SERDE: hive.serde
* - SERDEPROPERTIES: add "option." prefix
*/
private def convertToProperties(serdeInfo: Option[SerdeInfo]): Map[String, String] = {
serdeInfo match {
case Some(s) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,25 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("create/replace table - empty columns list") {
val createSql = "CREATE TABLE my_tab PARTITIONED BY (part string)"
val replaceSql = "REPLACE TABLE my_tab PARTITIONED BY (part string)"
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(new StructType().add("part", StringType)),
Seq(IdentityTransform(FieldReference("part"))),
None,
Map.empty[String, String],
None,
Map.empty[String, String],
None,
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}

test("create/replace table - using with partition column definitions") {
val createSql = "CREATE TABLE my_tab (id bigint) USING parquet PARTITIONED BY (part string)"
val replaceSql = "REPLACE TABLE my_tab (id bigint) USING parquet PARTITIONED BY (part string)"
Expand Down
Loading