Skip to content

Commit 9776bac

Browse files
committed
Implement conversion to v1 plans.
1 parent 0522c93 commit 9776bac

File tree

5 files changed

+173
-49
lines changed

5 files changed

+173
-49
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2411,7 +2411,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
24112411

24122412
/**
24132413
* Type to keep track of table clauses:
2414-
* (partitioning, bucketSpec, properties, options, location, comment).
2414+
* (partTransforms, partCols, bucketSpec, properties, options, location, comment, serde).
24152415
*/
24162416
type TableClauses = (
24172417
Seq[Transform], Seq[StructField], Option[BucketSpec], Map[String, String],
@@ -2869,7 +2869,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
28692869
checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
28702870
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
28712871
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
2872-
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
28732872
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
28742873
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
28752874
checkDuplicateClauses(ctx.commentSpec(), "COMMENT", ctx)
@@ -2952,9 +2951,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
29522951
*/
29532952
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
29542953
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
2955-
if (external) {
2956-
operationNotAllowed("CREATE EXTERNAL TABLE ...", ctx)
2957-
}
2954+
29582955
val columns = Option(ctx.colTypeList()).map(visitColTypeList)
29592956
val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
29602957
val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ case class SerdeInfo(
6767
assert(storedAs.isEmpty || serde.isEmpty,
6868
s"Conflicting STORED AS $storedAs and SERDE $serde values")
6969

70+
def describe: String = {
71+
val serdeString = serde.map(sd => s" SERDE $sd").getOrElse("")
72+
this match {
73+
case SerdeInfo(Some(format), _, _, _) =>
74+
s"STORED AS $format$serdeString"
75+
case SerdeInfo(_, Some((inFormat, outFormat)), _, _) =>
76+
s"INPUTFORMAT $inFormat OUTPUTFORMAT $outFormat$serdeString"
77+
case _ =>
78+
serdeString;
79+
}
80+
}
81+
7082
def merge(other: SerdeInfo): SerdeInfo = {
7183
def getOnly[T](desc: String, left: Option[T], right: Option[T]): Option[T] = {
7284
(left, right) match {
@@ -90,6 +102,12 @@ case class SerdeInfo(
90102
}
91103
}
92104

105+
object SerdeInfo {
106+
val empty: SerdeInfo = {
107+
SerdeInfo(None, None, None, Map.empty)
108+
}
109+
}
110+
93111
/**
94112
* A CREATE TABLE command, as parsed from SQL.
95113
*

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 142 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.connector.expressions.Transform
2727
import org.apache.spark.sql.execution.command._
2828
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable}
2929
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
30-
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3131
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
3232

3333
/**
@@ -268,47 +268,41 @@ class ResolveSessionCatalog(
268268
// session catalog and the table provider is not v2.
269269
case c @ CreateTableStatement(
270270
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
271-
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
272-
if (!isV2Provider(provider)) {
273-
if (!DDLUtils.isHiveTable(Some(provider))) {
271+
buildV1Table(tbl.asTableIdentifier, c) match {
272+
case Some(tableDesc) =>
273+
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
274+
CreateTable(tableDesc, mode, None)
275+
276+
case None =>
274277
assertNoCharTypeInSchema(c.tableSchema)
275-
}
276-
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema,
277-
c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
278-
c.comment, c.ifNotExists)
279-
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
280-
CreateTable(tableDesc, mode, None)
281-
} else {
282-
assertNoCharTypeInSchema(c.tableSchema)
283-
CreateV2Table(
284-
catalog.asTableCatalog,
285-
tbl.asIdentifier,
286-
c.tableSchema,
287-
// convert the bucket spec and add it as a transform
288-
c.partitioning ++ c.bucketSpec.map(_.asTransform),
289-
convertTableProperties(c),
290-
ignoreIfExists = c.ifNotExists)
278+
CreateV2Table(
279+
catalog.asTableCatalog,
280+
tbl.asIdentifier,
281+
c.tableSchema,
282+
// convert the bucket spec and add it as a transform
283+
c.partitioning ++ c.bucketSpec.map(_.asTransform),
284+
convertTableProperties(c),
285+
ignoreIfExists = c.ifNotExists)
291286
}
292287

293288
case c @ CreateTableAsSelectStatement(
294289
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
295-
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
296-
if (!isV2Provider(provider)) {
297-
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
298-
c.partitioning, c.bucketSpec, c.properties, provider, c.options, c.location,
299-
c.comment, c.ifNotExists)
300-
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
301-
CreateTable(tableDesc, mode, Some(c.asSelect))
302-
} else {
303-
CreateTableAsSelect(
304-
catalog.asTableCatalog,
305-
tbl.asIdentifier,
306-
// convert the bucket spec and add it as a transform
307-
c.partitioning ++ c.bucketSpec.map(_.asTransform),
308-
c.asSelect,
309-
convertTableProperties(c),
310-
writeOptions = c.options,
311-
ignoreIfExists = c.ifNotExists)
290+
buildV1Table(tbl.asTableIdentifier, c) match {
291+
case Some(tableDesc) =>
292+
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
293+
CreateTable(tableDesc, mode, Some(c.asSelect))
294+
295+
case None =>
296+
assertNoCharTypeInSchema(c.schema)
297+
CreateTableAsSelect(
298+
catalog.asTableCatalog,
299+
tbl.asIdentifier,
300+
// convert the bucket spec and add it as a transform
301+
c.partitioning ++ c.bucketSpec.map(_.asTransform),
302+
c.asSelect,
303+
convertTableProperties(c),
304+
writeOptions = c.options,
305+
ignoreIfExists = c.ifNotExists)
312306
}
313307

314308
// v1 REFRESH TABLE supports temp view.
@@ -633,6 +627,63 @@ class ResolveSessionCatalog(
633627
case _ => throw new AnalysisException(s"$sql is only supported with temp views or v1 tables.")
634628
}
635629

630+
private def buildV1Table(
631+
ident: TableIdentifier,
632+
c: CreateTableAsSelectStatement): Option[CatalogTable] = {
633+
buildV1Table(
634+
ident, new StructType, c.partitioning, c.bucketSpec, c.properties, c.provider, c.serde,
635+
c.options, c.location, c.comment)
636+
}
637+
638+
private def buildV1Table(
639+
ident: TableIdentifier,
640+
c: CreateTableStatement): Option[CatalogTable] = {
641+
buildV1Table(
642+
ident, c.tableSchema, c.partitioning, c.bucketSpec, c.properties, c.provider, c.serde,
643+
c.options, c.location, c.comment)
644+
}
645+
646+
private def buildV1Table(
647+
table: TableIdentifier,
648+
schema: StructType,
649+
partitioning: Seq[Transform],
650+
bucketSpec: Option[BucketSpec],
651+
properties: Map[String, String],
652+
provider: Option[String],
653+
serdeInfo: Option[SerdeInfo],
654+
options: Map[String, String],
655+
location: Option[String],
656+
comment: Option[String]): Option[CatalogTable] = {
657+
(provider, serdeInfo) match {
658+
case (Some(provider), Some(serde)) =>
659+
throw new AnalysisException(
660+
s"Cannot create table with both USING $provider and ${serde.describe}")
661+
662+
case (None, Some(serde)) =>
663+
Some(buildHiveCatalogTable(
664+
table, schema, partitioning, bucketSpec, properties, serde, options, location,
665+
comment))
666+
667+
case (None, None) if conf.createHiveTableByDefaultEnabled =>
668+
Some(buildHiveCatalogTable(
669+
table, schema, partitioning, bucketSpec, properties, SerdeInfo.empty, options, location,
670+
comment))
671+
672+
case (Some(provider), None) if !isV2Provider(provider) =>
673+
Some(buildCatalogTable(
674+
table, schema, partitioning, bucketSpec, properties, provider, options, location,
675+
comment))
676+
677+
case (None, None) if !isV2Provider(conf.defaultDataSourceName) =>
678+
Some(buildCatalogTable(
679+
table, schema, partitioning, bucketSpec, properties, conf.defaultDataSourceName, options,
680+
location, comment))
681+
682+
case _ =>
683+
None
684+
}
685+
}
686+
636687
private def buildCatalogTable(
637688
table: TableIdentifier,
638689
schema: StructType,
@@ -642,8 +693,9 @@ class ResolveSessionCatalog(
642693
provider: String,
643694
options: Map[String, String],
644695
location: Option[String],
645-
comment: Option[String],
646-
ifNotExists: Boolean): CatalogTable = {
696+
comment: Option[String]): CatalogTable = {
697+
assertNoCharTypeInSchema(schema)
698+
647699
val storage = CatalogStorageFormat.empty.copy(
648700
locationUri = location.map(CatalogUtils.stringToURI),
649701
properties = options)
@@ -666,6 +718,57 @@ class ResolveSessionCatalog(
666718
comment = comment)
667719
}
668720

721+
private def buildHiveCatalogTable(
722+
table: TableIdentifier,
723+
schema: StructType,
724+
partitioning: Seq[Transform],
725+
bucketSpec: Option[BucketSpec],
726+
properties: Map[String, String],
727+
serdeInfo: SerdeInfo,
728+
options: Map[String, String],
729+
location: Option[String],
730+
comment: Option[String]): CatalogTable = {
731+
val baseStorage = HiveSerDe.getDefaultStorage(conf).copy(
732+
locationUri = location.map(CatalogUtils.stringToURI),
733+
serde = serdeInfo.serde,
734+
properties = options ++ serdeInfo.serdeProperties)
735+
736+
val storage = (serdeInfo.storedAs, serdeInfo.formatClasses) match {
737+
case (Some(format), None) =>
738+
HiveSerDe.sourceToSerDe(format) match {
739+
case Some(hiveSerDe) =>
740+
baseStorage.copy(
741+
inputFormat = hiveSerDe.inputFormat,
742+
outputFormat = hiveSerDe.outputFormat)
743+
case _ =>
744+
baseStorage
745+
}
746+
case (None, Some((inFormat, outFormat))) =>
747+
baseStorage.copy(
748+
inputFormat = Some(inFormat),
749+
outputFormat = Some(outFormat))
750+
751+
case _ =>
752+
baseStorage
753+
}
754+
755+
val tableType = if (location.isDefined) {
756+
CatalogTableType.EXTERNAL
757+
} else {
758+
CatalogTableType.MANAGED
759+
}
760+
761+
CatalogTable(
762+
identifier = table,
763+
tableType = tableType,
764+
storage = storage,
765+
schema = schema,
766+
partitionColumnNames = partitioning.asPartitionColumns,
767+
bucketSpec = bucketSpec,
768+
properties = properties,
769+
comment = comment)
770+
}
771+
669772
object SessionCatalogAndTable {
670773
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
671774
case SessionCatalogAndIdentifier(catalog, ident) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
8686
val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
8787
val tableProperties = properties.asScala
8888
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
89-
val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
89+
val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
9090
.copy(locationUri = location.map(CatalogUtils.stringToURI))
9191
val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
9292

@@ -112,6 +112,12 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
112112
loadTable(ident)
113113
}
114114

115+
private def toOptions(properties: Map[String, String]): Map[String, String] = {
116+
properties
117+
.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX))
118+
.map { case (key, value) => key.replaceFirst(TableCatalog.OPTION_PREFIX, "") -> value }
119+
}
120+
115121
override def alterTable(
116122
ident: Identifier,
117123
changes: TableChange*): Table = {

sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ class PlanResolutionSuite extends AnalysisTest {
428428
val expectedProperties = Map(
429429
"p1" -> "v1",
430430
"p2" -> "v2",
431-
"other" -> "20",
431+
"option.other" -> "20",
432432
"provider" -> "parquet",
433433
"location" -> "s3://bucket/path/to/data",
434434
"comment" -> "table comment")
@@ -467,7 +467,7 @@ class PlanResolutionSuite extends AnalysisTest {
467467
val expectedProperties = Map(
468468
"p1" -> "v1",
469469
"p2" -> "v2",
470-
"other" -> "20",
470+
"option.other" -> "20",
471471
"provider" -> "parquet",
472472
"location" -> "s3://bucket/path/to/data",
473473
"comment" -> "table comment")
@@ -542,7 +542,7 @@ class PlanResolutionSuite extends AnalysisTest {
542542
val expectedProperties = Map(
543543
"p1" -> "v1",
544544
"p2" -> "v2",
545-
"other" -> "20",
545+
"option.other" -> "20",
546546
"provider" -> "parquet",
547547
"location" -> "s3://bucket/path/to/data",
548548
"comment" -> "table comment")
@@ -576,7 +576,7 @@ class PlanResolutionSuite extends AnalysisTest {
576576
val expectedProperties = Map(
577577
"p1" -> "v1",
578578
"p2" -> "v2",
579-
"other" -> "20",
579+
"option.other" -> "20",
580580
"provider" -> "parquet",
581581
"location" -> "s3://bucket/path/to/data",
582582
"comment" -> "table comment")

0 commit comments

Comments
 (0)