Skip to content

Commit 119260f

Browse files
committed
address review comments
1 parent 2de1f03 commit 119260f

File tree

7 files changed

+251
-308
lines changed

7 files changed

+251
-308
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2446,7 +2446,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
24462446

24472447
/**
24482448
* Type to keep track of table clauses:
2449-
* (partTransforms, partCols, bucketSpec, properties, options, location, comment, serde).
2449+
* - partition transforms
2450+
* - partition columns
2451+
* - bucketSpec
2452+
* - properties
2453+
* - options
2454+
* - location
2455+
* - comment
2456+
* - serde
2457+
*
2458+
* Note: Partition transforms are based on existing table schema definition. It can be simple
2459+
* column names, or functions like `year(date_col)`. Partition columns are column names with data
2460+
* types like `i INT`, which should be appended to the existing table schema.
24502461
*/
24512462
type TableClauses = (
24522463
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
@@ -2802,8 +2813,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
28022813
* [NULL DEFINED AS char]
28032814
* }}}
28042815
*/
2805-
def visitRowFormat(
2806-
ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
2816+
def visitRowFormat(ctx: RowFormatContext): SerdeInfo = withOrigin(ctx) {
28072817
ctx match {
28082818
case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
28092819
case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
@@ -2923,16 +2933,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29232933
val location = visitLocationSpecList(ctx.locationSpec())
29242934
val (cleanedOptions, newLocation) = cleanTableOptions(ctx, options, location)
29252935
val comment = visitCommentSpecList(ctx.commentSpec())
2926-
2927-
validateRowFormatFileFormat(
2928-
ctx.rowFormat.asScala.toSeq, ctx.createFileFormat.asScala.toSeq, ctx)
2929-
val fileFormatSerdeInfo = ctx.createFileFormat.asScala.map(visitCreateFileFormat)
2930-
val rowFormatSerdeInfo = ctx.rowFormat.asScala.map(visitRowFormat)
2931-
val serdeInfo =
2932-
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((x, y) => x.merge(y))
2933-
2936+
val serdeInfo = getSerdeInfo(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
29342937
(partTransforms, partCols, bucketSpec, cleanedProperties, cleanedOptions, newLocation, comment,
2935-
serdeInfo)
2938+
serdeInfo)
2939+
}
2940+
2941+
protected def getSerdeInfo(
2942+
rowFormatCtx: Seq[RowFormatContext],
2943+
createFileFormatCtx: Seq[CreateFileFormatContext],
2944+
ctx: ParserRuleContext): Option[SerdeInfo] = {
2945+
validateRowFormatFileFormat(rowFormatCtx, createFileFormatCtx, ctx)
2946+
val rowFormatSerdeInfo = rowFormatCtx.map(visitRowFormat)
2947+
val fileFormatSerdeInfo = createFileFormatCtx.map(visitCreateFileFormat)
2948+
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption(_ merge _)
29362949
}
29372950

29382951
private def partitionExpressions(
@@ -2943,8 +2956,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29432956
if (partCols.nonEmpty) {
29442957
val references = partTransforms.map(_.describe()).mkString(", ")
29452958
val columns = partCols
2946-
.map(field => s"${field.name} ${field.dataType.simpleString}")
2947-
.mkString(", ")
2959+
.map(field => s"${field.name} ${field.dataType.simpleString}")
2960+
.mkString(", ")
29482961
operationNotAllowed(
29492962
s"""PARTITION BY: Cannot mix partition expressions and partition columns:
29502963
|Expressions: $references
@@ -2966,12 +2979,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29662979
* Expected format:
29672980
* {{{
29682981
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
2969-
* USING table_provider
2982+
* [USING table_provider]
29702983
* create_table_clauses
29712984
* [[AS] select_statement];
29722985
*
29732986
* create_table_clauses (order insensitive):
2974-
* partition_clauses
2987+
* [PARTITIONED BY (partition_fields)]
29752988
* [OPTIONS table_property_list]
29762989
* [ROW FORMAT row_format]
29772990
* [STORED AS file_format]
@@ -2982,15 +2995,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29822995
* [LOCATION path]
29832996
* [COMMENT table_comment]
29842997
* [TBLPROPERTIES (property_name=property_value, ...)]
2985-
* partition_clauses:
2986-
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)] |
2987-
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
2998+
*
2999+
* partition_fields:
3000+
* col_name, transform(col_name), transform(constant, col_name), ... |
3001+
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
29883002
* }}}
29893003
*/
29903004
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
29913005
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
29923006

2993-
val columns = Option(ctx.colTypeList()).map(visitColTypeList)
3007+
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
29943008
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
29953009
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
29963010
visitCreateTableClauses(ctx.createTableClauses())
@@ -2999,37 +3013,34 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29993013
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
30003014
}
30013015

3002-
val schema = columns
3003-
.map(dataCols => StructType(dataCols ++ partCols))
3004-
.getOrElse(StructType(partCols))
3016+
if (temp) {
3017+
operationNotAllowed("CREATE TEMPORARY TABLE is not supported yet. " +
3018+
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
3019+
}
3020+
30053021
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
30063022

30073023
Option(ctx.query).map(plan) match {
3008-
case Some(_) if temp =>
3009-
operationNotAllowed(
3010-
"CREATE TEMPORARY TABLE ... AS ..., use CREATE TEMPORARY VIEW instead",
3011-
ctx)
3012-
3013-
case Some(_) if columns.isDefined =>
3024+
case Some(_) if columns.nonEmpty =>
30143025
operationNotAllowed(
30153026
"Schema may not be specified in a Create Table As Select (CTAS) statement",
30163027
ctx)
30173028

30183029
case Some(_) if partCols.nonEmpty =>
30193030
// non-reference partition columns are not allowed because schema can't be specified
3020-
operationNotAllowed(
3021-
"Partition column types may not be specified in Create Table As Select (CTAS)",
3022-
ctx)
3031+
val errorMessage = "Create Partitioned Table As Select cannot specify data type for " +
3032+
"the partition columns of the target table."
3033+
operationNotAllowed(errorMessage, ctx)
30233034

30243035
case Some(query) =>
30253036
CreateTableAsSelectStatement(
30263037
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
30273038
writeOptions = Map.empty, serdeInfo, external = external, ifNotExists = ifNotExists)
30283039

3029-
case None if temp =>
3030-
operationNotAllowed("CREATE TEMPORARY TABLE", ctx)
3031-
30323040
case _ =>
3041+
// Note: table schema includes both the table columns list and the partition columns
3042+
// with data type.
3043+
val schema = StructType(columns ++ partCols)
30333044
CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
30343045
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
30353046
}
@@ -3041,71 +3052,64 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
30413052
* Expected format:
30423053
* {{{
30433054
* [CREATE OR] REPLACE TABLE [db_name.]table_name
3044-
* USING table_provider
3055+
* [USING table_provider]
30453056
* replace_table_clauses
30463057
* [[AS] select_statement];
30473058
*
30483059
* replace_table_clauses (order insensitive):
30493060
* [OPTIONS table_property_list]
3050-
* [PARTITIONED BY (col_name, transform(col_name), transform(constant, col_name), ...)]
3061+
* [PARTITIONED BY (partition_fields)]
30513062
* [CLUSTERED BY (col_name, col_name, ...)
30523063
* [SORTED BY (col_name [ASC|DESC], ...)]
30533064
* INTO num_buckets BUCKETS
30543065
* ]
30553066
* [LOCATION path]
30563067
* [COMMENT table_comment]
30573068
* [TBLPROPERTIES (property_name=property_value, ...)]
3069+
*
3070+
* partition_fields:
3071+
* col_name, transform(col_name), transform(constant, col_name), ... |
3072+
* col_name data_type [NOT NULL] [COMMENT col_comment], ...
30583073
* }}}
30593074
*/
30603075
override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) {
30613076
val (table, temp, ifNotExists, external) = visitReplaceTableHeader(ctx.replaceTableHeader)
3062-
if (temp) {
3063-
operationNotAllowed(
3064-
"CREATE OR REPLACE TEMPORARY TABLE ..., use CREATE TEMPORARY VIEW instead",
3065-
ctx)
3066-
}
3067-
3068-
if (external) {
3069-
operationNotAllowed("REPLACE EXTERNAL TABLE ...", ctx)
3070-
}
3071-
3072-
if (ifNotExists) {
3073-
operationNotAllowed("REPLACE ... IF NOT EXISTS, use CREATE IF NOT EXISTS instead", ctx)
3074-
}
3075-
3077+
assert(!temp && !ifNotExists && !external)
30763078
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
30773079
visitCreateTableClauses(ctx.createTableClauses())
3078-
val columns = Option(ctx.colTypeList()).map(visitColTypeList)
3080+
val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
30793081
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
30803082

30813083
if (provider.isDefined && serdeInfo.isDefined) {
30823084
operationNotAllowed(s"CREATE TABLE ... USING ... ${serdeInfo.get.describe}", ctx)
30833085
}
30843086

3085-
val schema = columns.map(dataCols => StructType(dataCols ++ partCols))
30863087
val partitioning = partitionExpressions(partTransforms, partCols, ctx)
30873088
val orCreate = ctx.replaceTableHeader().CREATE() != null
30883089

30893090
Option(ctx.query).map(plan) match {
3090-
case Some(_) if schema.isDefined =>
3091+
case Some(_) if columns.nonEmpty =>
30913092
operationNotAllowed(
30923093
"Schema may not be specified in a Replace Table As Select (RTAS) statement",
30933094
ctx)
30943095

30953096
case Some(_) if partCols.nonEmpty =>
3096-
operationNotAllowed(
3097-
"Partition column types may not be specified in Replace Table As Select (RTAS)",
3098-
ctx)
3097+
// non-reference partition columns are not allowed because schema can't be specified
3098+
val errorMessage = "Replace Partitioned Table As Select cannot specify data type for " +
3099+
"the partition columns of the target table."
3100+
operationNotAllowed(errorMessage, ctx)
30993101

31003102
case Some(query) =>
31013103
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
31023104
provider, options, location, comment, writeOptions = Map.empty, serdeInfo,
31033105
orCreate = orCreate)
31043106

31053107
case _ =>
3106-
ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
3107-
bucketSpec, properties, provider, options, location, comment, serdeInfo,
3108-
orCreate = orCreate)
3108+
// Note: table schema includes both the table columns list and the partition columns
3109+
// with data type.
3110+
val schema = StructType(columns ++ partCols)
3111+
ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
3112+
options, location, comment, serdeInfo, orCreate = orCreate)
31093113
}
31103114
}
31113115

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.plans.logical
1919

20+
import org.apache.spark.sql.AnalysisException
2021
import org.apache.spark.sql.catalyst.analysis.ViewType
2122
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
2223
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -63,11 +64,11 @@ case class SerdeInfo(
6364
serdeProperties: Map[String, String] = Map.empty) {
6465
// this uses assertions because validation is done in validateRowFormatFileFormat etc.
6566
assert(storedAs.isEmpty || formatClasses.isEmpty,
66-
s"Conflicting STORED AS $storedAs and INPUTFORMAT/OUTPUTFORMAT $formatClasses values")
67+
"Cannot specify both STORED AS and INPUTFORMAT/OUTPUTFORMAT")
6768

6869
def describe: String = {
6970
val serdeString = if (serde.isDefined || serdeProperties.nonEmpty) {
70-
"ROW FORMAT" + serde.map(sd => s" SERDE $sd").getOrElse(" DELIMITED")
71+
"ROW FORMAT " + serde.map(sd => s"SERDE $sd").getOrElse("DELIMITED")
7172
} else {
7273
""
7374
}
@@ -76,7 +77,7 @@ case class SerdeInfo(
7677
case SerdeInfo(Some(format), _, _, _) =>
7778
s"STORED AS $format $serdeString"
7879
case SerdeInfo(_, Some((inFormat, outFormat)), _, _) =>
79-
s"INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString"
80+
s"STORED AS INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat $serdeString"
8081
case _ =>
8182
serdeString
8283
}
@@ -85,7 +86,7 @@ case class SerdeInfo(
8586
def merge(other: SerdeInfo): SerdeInfo = {
8687
def getOnly[T](desc: String, left: Option[T], right: Option[T]): Option[T] = {
8788
(left, right) match {
88-
case (Some(l), Some(r)) if l != r =>
89+
case (Some(l), Some(r)) =>
8990
assert(l == r, s"Conflicting $desc values: $l != $r")
9091
left
9192
case (Some(_), _) =>
@@ -97,6 +98,7 @@ case class SerdeInfo(
9798
}
9899
}
99100

101+
SerdeInfo.checkSerdePropMerging(serdeProperties, other.serdeProperties)
100102
SerdeInfo(
101103
getOnly("STORED AS", storedAs, other.storedAs),
102104
getOnly("INPUTFORMAT/OUTPUTFORMAT", formatClasses, other.formatClasses),
@@ -106,8 +108,18 @@ case class SerdeInfo(
106108
}
107109

108110
object SerdeInfo {
109-
val empty: SerdeInfo = {
110-
SerdeInfo(None, None, None, Map.empty)
111+
val empty: SerdeInfo = SerdeInfo(None, None, None, Map.empty)
112+
113+
def checkSerdePropMerging(
114+
props1: Map[String, String], props2: Map[String, String]): Unit = {
115+
if (props1.keySet.intersect(props2.keySet).nonEmpty) {
116+
throw new UnsupportedOperationException(
117+
s"""
118+
|Cannot safely merge SERDEPROPERTIES:
119+
|${props1.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
120+
|${props2.map { case (k, v) => s"$k=$v" }.mkString("{", ",", "}")}
121+
|""".stripMargin)
122+
}
111123
}
112124
}
113125

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,12 +325,19 @@ private[sql] object CatalogV2Util {
325325
options ++ // to make the transition to the "option." prefix easier, add both
326326
options.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value } ++
327327
convertToProperties(serdeInfo) ++
328-
(if (external) Map(TableCatalog.PROP_EXTERNAL -> "true") else Map.empty) ++
328+
(if (external) Some(TableCatalog.PROP_EXTERNAL -> "true") else None) ++
329329
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
330330
comment.map(TableCatalog.PROP_COMMENT -> _) ++
331331
location.map(TableCatalog.PROP_LOCATION -> _)
332332
}
333333

334+
/**
335+
* Converts Hive Serde info to table properties. The mapped property keys are:
336+
* - INPUTFORMAT/OUTPUTFORMAT: hive.input/output-format
337+
* - STORED AS: hive.stored-as
338+
* - ROW FORMAT SERDE: hive.serde
339+
* - SERDEPROPERTIES: add "option." prefix
340+
*/
334341
private def convertToProperties(serdeInfo: Option[SerdeInfo]): Map[String, String] = {
335342
serdeInfo match {
336343
case Some(s) =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,25 @@ class DDLParserSuite extends AnalysisTest {
290290
}
291291
}
292292

293+
test("create/replace table - empty columns list") {
294+
val createSql = "CREATE TABLE my_tab PARTITIONED BY (part string)"
295+
val replaceSql = "REPLACE TABLE my_tab PARTITIONED BY (part string)"
296+
val expectedTableSpec = TableSpec(
297+
Seq("my_tab"),
298+
Some(new StructType().add("part", StringType)),
299+
Seq(IdentityTransform(FieldReference("part"))),
300+
None,
301+
Map.empty[String, String],
302+
None,
303+
Map.empty[String, String],
304+
None,
305+
None,
306+
None)
307+
Seq(createSql, replaceSql).foreach { sql =>
308+
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
309+
}
310+
}
311+
293312
test("create/replace table - using with partition column definitions") {
294313
val createSql = "CREATE TABLE my_tab (id bigint) USING parquet PARTITIONED BY (part string)"
295314
val replaceSql = "REPLACE TABLE my_tab (id bigint) USING parquet PARTITIONED BY (part string)"

0 commit comments

Comments
 (0)