From 0a42c8950b560f19218cdfc2ea1ab4193dcb8d61 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 19 Sep 2021 15:36:37 -0700 Subject: [PATCH 01/16] Migrate CreateTableStatement to v2 command framework --- .../sql/catalyst/parser/AstBuilder.scala | 10 ++-- .../catalyst/plans/logical/statements.scala | 19 ------- .../catalyst/plans/logical/v2Commands.scala | 27 ++++++++-- .../sql/connector/catalog/CatalogV2Util.scala | 12 +++-- .../sql/catalyst/parser/DDLParserSuite.scala | 10 ++-- .../analysis/ResolveSessionCatalog.scala | 20 +++----- .../sql/streaming/DataStreamWriter.scala | 15 +++--- .../command/PlanResolutionSuite.scala | 51 ++++++++++++------- 8 files changed, 92 insertions(+), 72 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a78f08ac8f4c..6661715db71a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3414,7 +3414,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a table, returning a [[CreateTableStatement]] logical plan. + * Create a table, returning a [[CreateTable]] or [[CreateV2Table]] logical plan. * * Expected format: * {{{ @@ -3482,8 +3482,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg // Note: table schema includes both the table columns list and the partition columns // with data type. val schema = StructType(columns ++ partCols) - CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider, - options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists) + CreateV2Table( + UnresolvedDBObjectName( + table, + isNamespace = false), + schema, partitioning, bucketSpec, properties, options, serdeInfo, location, comment, + provider, external, ignoreIfExists = ifNotExists) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index ccc4e190ba51..a6ed304e7155 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -123,25 +123,6 @@ object SerdeInfo { } } -/** - * A CREATE TABLE command, as parsed from SQL. - * - * This is a metadata-only command and is not used to write data to the created table. - */ -case class CreateTableStatement( - tableName: Seq[String], - tableSchema: StructType, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: Option[String], - options: Map[String, String], - location: Option[String], - comment: Option[String], - serde: Option[SerdeInfo], - external: Boolean, - ifNotExists: Boolean) extends LeafParsedStatement - /** * A CREATE TABLE AS SELECT command, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 4ed5d87aaf10..f4f4cc15e1c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, UnresolvedException} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} @@ -190,17 +191,35 @@ trait V2CreateTablePlan extends LogicalPlan { def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan } +trait V2CreateTablePlanX extends LogicalPlan { + def name: LogicalPlan + def partitioning: Seq[Transform] + def tableSchema: StructType + + /** + * Creates a copy of this node with the new partitioning transforms. This method is used to + * rewrite the partition transforms normalized according to the table schema. + */ + def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlanX +} + /** * Create a new table with a v2 catalog. */ case class CreateV2Table( - catalog: TableCatalog, - tableName: Identifier, + name: LogicalPlan, tableSchema: StructType, partitioning: Seq[Transform], + bucketSpec: Option[BucketSpec], properties: Map[String, String], - ignoreIfExists: Boolean) extends LeafCommand with V2CreateTablePlan { - override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = { + options: Map[String, String], + serdeInfo: Option[SerdeInfo], + location: Option[String], + comment: Option[String], + provider: Option[String], + external: Boolean, + ignoreIfExists: Boolean) extends LeafCommand with V2CreateTablePlanX { + override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlanX = { this.copy(partitioning = rewritten) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 7c11463fc7c5..96363ca4d671 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,6 +22,8 @@ import java.util.Collections import scala.collection.JavaConverters._ +import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.connector.catalog.TableChange._ @@ -305,10 +307,10 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } - def convertTableProperties(c: CreateTableStatement): Map[String, String] = { - convertTableProperties( - c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) - } +// def convertTableProperties(c: CreateTableStatement): Map[String, String] = { +// convertTableProperties( +// c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) +// } def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = { convertTableProperties( @@ -323,7 +325,7 @@ private[sql] object CatalogV2Util { convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider) } - private def convertTableProperties( + def convertTableProperties( properties: Map[String, String], options: Map[String, String], serdeInfo: Option[SerdeInfo], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 70227bb0554f..e77d4ae985a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -717,8 +717,8 @@ class DDLParserSuite extends AnalysisTest { val parsedPlan = parsePlan(sqlStatement) val newTableToken = sqlStatement.split(" ")(0).trim.toUpperCase(Locale.ROOT) parsedPlan match { - case create: CreateTableStatement if newTableToken == "CREATE" => - assert(create.ifNotExists == expectedIfNotExists) + case create: CreateV2Table if newTableToken == "CREATE" => + assert(create.ignoreIfExists == expectedIfNotExists) case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => assert(ctas.ifNotExists == expectedIfNotExists) case replace: ReplaceTableStatement if newTableToken == "REPLACE" => @@ -2285,9 +2285,9 @@ class DDLParserSuite extends AnalysisTest { private object TableSpec { def apply(plan: LogicalPlan): TableSpec = { plan match { - case create: CreateTableStatement => + case create: CreateV2Table => TableSpec( - create.tableName, + create.name.asInstanceOf[UnresolvedDBObjectName].nameParts, Some(create.tableSchema), create.partitioning, create.bucketSpec, @@ -2296,7 +2296,7 @@ class DDLParserSuite extends AnalysisTest { create.options, create.location, create.comment, - create.serde, + create.serdeInfo, create.external) case replace: ReplaceTableStatement => TableSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5362b6bf6974..88f951ff1e9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -143,25 +143,21 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CreateTableStatement( - SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => + case c@CreateV2Table(UnresolvedDBObjectName( + SessionCatalogAndTable(catalog, name), _), _, _, _, _, _, _, _, _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serde, ctas = false) + c.provider, c.options, c.location, c.serdeInfo, ctas = false) if (!isV2Provider(provider)) { - val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema, + val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema, c.partitioning, c.bucketSpec, c.properties, provider, c.location, c.comment, storageFormat, c.external) - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, None) } else { CreateV2Table( - catalog.asTableCatalog, - tbl.asIdentifier, - c.tableSchema, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - convertTableProperties(c), - ignoreIfExists = c.ifNotExists) + ResolvedDBObjectName(catalog, name), + c.tableSchema, c.partitioning, c.bucketSpec, c.properties, c.options, c.serdeInfo, + c.location, c.comment, c.provider, c.external, c.ignoreIfExists) } case c @ CreateTableAsSelectStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 10ce9d3aaf01..5f2e23efe8b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -27,8 +27,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Evolving import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.CreateTableStatement +import org.apache.spark.sql.catalyst.plans.logical.CreateV2Table import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} @@ -288,19 +289,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * Note, currently the new table creation by this API doesn't fully cover the V2 table. * TODO (SPARK-33638): Full support of v2 table creation */ - val cmd = CreateTableStatement( - originalMultipartIdentifier, + val cmd = CreateV2Table( + UnresolvedDBObjectName( + originalMultipartIdentifier, + isNamespace = true), df.schema.asNullable, partitioningColumns.getOrElse(Nil).asTransforms.toSeq, None, Map.empty[String, String], - Some(source), Map.empty[String, String], - extraOptions.get("path"), None, + extraOptions.get("path"), None, + Some(source), external = false, - ifNotExists = false) + ignoreIfExists = false) Dataset.ofRows(df.sparkSession, cmd) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 85ba14fc7a44..4c049a008507 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -26,15 +26,16 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable} +import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.CatalogV2Util.convertTableProperties import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -479,17 +480,22 @@ class PlanResolutionSuite extends AnalysisTest { "location" -> "s3://bucket/path/to/data", "comment" -> "table comment", "other" -> "20") + import org.apache.spark.sql.connector.catalog.CatalogV2Util.convertTableProperties parseAndResolve(sql) match { case create: CreateV2Table => - assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.table_name") assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) + val props = convertTableProperties( + create.properties, create.options, create.serdeInfo, create.location, create.comment, + create.provider, create.external) + assert(props == expectedProperties) assert(create.ignoreIfExists) case other => @@ -522,14 +528,18 @@ class PlanResolutionSuite extends AnalysisTest { parseAndResolve(sql, withDefault = true) match { case create: CreateV2Table => - assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.table_name") assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) + val props = convertTableProperties( + create.properties, create.options, create.serdeInfo, create.location, create.comment, + create.provider, create.external) + assert(props == expectedProperties) assert(create.ignoreIfExists) case other => @@ -560,14 +570,19 @@ class PlanResolutionSuite extends AnalysisTest { parseAndResolve(sql) match { case create: CreateV2Table => - assert(create.catalog.name == CatalogManager.SESSION_CATALOG_NAME) - assert(create.tableName == Identifier.of(Array("mydb"), "page_view")) + assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == + CatalogManager.SESSION_CATALOG_NAME) + assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.page_view") assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) + val props = convertTableProperties( + create.properties, create.options, create.serdeInfo, create.location, create.comment, + create.provider, create.external) + assert(props == expectedProperties) assert(create.ignoreIfExists) case other => @@ -2261,13 +2276,13 @@ class PlanResolutionSuite extends AnalysisTest { assert(e2.getMessage.contains("Operation not allowed")) } - test("create table - properties") { - val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" - parsePlan(query) match { - case state: CreateTableStatement => - assert(state.properties == Map("k1" -> "v1", "k2" -> "v2")) - } - } +// test("create table - properties") { +// val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" +// parsePlan(query) match { +// case state: CreateTableStatement => +// assert(state.properties == Map("k1" -> "v1", "k2" -> "v2")) +// } +// } test("create table(hive) - everything!") { val query = From 0922184bbe65017dc323ae76a4360b0e66ecc902 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 19 Sep 2021 15:41:19 -0700 Subject: [PATCH 02/16] remove unused code --- .../spark/sql/connector/catalog/CatalogV2Util.scala | 5 ----- .../spark/sql/execution/command/PlanResolutionSuite.scala | 8 -------- 2 files changed, 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 96363ca4d671..fbaf7c226feb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -307,11 +307,6 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } -// def convertTableProperties(c: CreateTableStatement): Map[String, String] = { -// convertTableProperties( -// c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) -// } - def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = { convertTableProperties( c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 4c049a008507..0925975a5093 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -2276,14 +2276,6 @@ class PlanResolutionSuite extends AnalysisTest { assert(e2.getMessage.contains("Operation not allowed")) } -// test("create table - properties") { -// val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" -// parsePlan(query) match { -// case state: CreateTableStatement => -// assert(state.properties == Map("k1" -> "v1", "k2" -> "v2")) -// } -// } - test("create table(hive) - everything!") { val query = """ From 41ad9b60d35e90a34cc022b278dc45b3cc953ceb Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 19 Sep 2021 22:27:18 -0700 Subject: [PATCH 03/16] fix test failure --- .../sql/catalyst/analysis/CheckAnalysis.scala | 17 +++++++ .../analysis/ResolveSessionCatalog.scala | 4 +- .../sql/execution/datasources/rules.scala | 46 +++++++++++++++++++ 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 5bf37a2944cb..7e758ad422a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -466,6 +466,23 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) + case create: V2CreateTablePlanX => + val references = create.partitioning.flatMap(_.references).toSet + val badReferences = references.map(_.fieldNames).flatMap { column => + create.tableSchema.findNestedField(column) match { + case Some(_) => + None + case _ => + Some(s"${column.quoted} is missing or is in a map or array") + } + } + + if (badReferences.nonEmpty) { + failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}") + } + + create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) + case write: V2WriteCommand if write.resolved => write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 88f951ff1e9e..289b0692834a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -156,8 +156,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } else { CreateV2Table( ResolvedDBObjectName(catalog, name), - c.tableSchema, c.partitioning, c.bucketSpec, c.properties, c.options, c.serdeInfo, - c.location, c.comment, c.provider, c.external, c.ignoreIfExists) + c.tableSchema, c.partitioning ++ c.bucketSpec.map(_.asTransform), None, c.properties, + c.options, c.serdeInfo, c.location, c.comment, c.provider, c.external, c.ignoreIfExists) } case c @ CreateTableAsSelectStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 0e8efb629706..1cb11a33c3ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils @@ -263,6 +264,51 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi create.withPartitioning(normalizedPartitions) } + + case create: V2CreateTablePlanX if create.childrenResolved => + val schema = create.tableSchema + val partitioning = create.partitioning + val name = create.name.asInstanceOf[ResolvedDBObjectName].nameParts + val identifier = if (name.length == 2) { + Identifier.of(Array(name(0)), name(1)) + } else { + Identifier.of(Array.empty, name(0)) + } + + val isCaseSensitive = conf.caseSensitiveAnalysis + // Check that columns are not duplicated in the schema + val flattenedSchema = SchemaUtils.explodeNestedFieldNames(schema) + SchemaUtils.checkColumnNameDuplication( + flattenedSchema, + s"in the table definition of $identifier", + isCaseSensitive) + + // Check that columns are not duplicated in the partitioning statement + SchemaUtils.checkTransformDuplication( + partitioning, "in the partitioning", isCaseSensitive) + + if (schema.isEmpty) { + if (partitioning.nonEmpty) { + throw QueryCompilationErrors.specifyPartitionNotAllowedWhenTableSchemaNotDefinedError() + } + + create + } else { + // Resolve and normalize partition columns as necessary + val resolver = conf.resolver + val normalizedPartitions = partitioning.map { + case transform: RewritableTransform => + val rewritten = transform.references().map { ref => + // Throws an exception if the reference cannot be resolved + val position = SchemaUtils.findColumnPosition(ref.fieldNames(), schema, resolver) + FieldReference(SchemaUtils.getColumnName(position, schema)) + } + transform.withReferences(rewritten) + case other => other + } + + create.withPartitioning(normalizedPartitions) + } } private def fallBackV2ToV1(cls: Class[_]): Class[_] = cls.newInstance match { From e3377a3a2f73cbc274b3d30dfff339a7fdd1f04e Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 27 Sep 2021 10:42:47 -0700 Subject: [PATCH 04/16] fix style --- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 289b0692834a..aef41f8a0e3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -143,7 +143,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c@CreateV2Table(UnresolvedDBObjectName( + case c @ CreateV2Table(UnresolvedDBObjectName( SessionCatalogAndTable(catalog, name), _), _, _, _, _, _, _, _, _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( c.provider, c.options, c.location, c.serdeInfo, ctas = false) From 95ef4bfcb04434c0f14072aea511aab57378ec70 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 27 Sep 2021 20:03:16 -0700 Subject: [PATCH 05/16] rebase --- .../execution/datasources/v2/DataSourceV2Strategy.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 026ff63608bb..b7e67ff3db3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -165,9 +165,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil - case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) => + case CreateV2Table(ResolvedDBObjectName(catalog, ident), schema, parts, bucketSpec, + properties, options, serde, location, comment, provider, external, ifNotExists) => + val props = CatalogV2Util.convertTableProperties( + properties, options, serde, location, comment, provider, external) val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) - CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil + CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, + parts, propsWithOwner, ifNotExists) :: Nil case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) From bb8cf8250a0626025841b050b97bee09756a1746 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 6 Oct 2021 10:48:55 -0700 Subject: [PATCH 06/16] use CatalogAndIdentifier to resolve table catalog --- .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 5 ++++- .../spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 5 ++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index f4f4cc15e1c7..748edec9b368 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -218,7 +218,10 @@ case class CreateV2Table( comment: Option[String], provider: Option[String], external: Boolean, - ignoreIfExists: Boolean) extends LeafCommand with V2CreateTablePlanX { + ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlanX { + override def child: LogicalPlan = name + override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlanX = + copy(name = newChild) override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlanX = { this.copy(partitioning = rewritten) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index aef41f8a0e3b..9f03e0da7816 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -143,11 +143,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CreateV2Table(UnresolvedDBObjectName( - SessionCatalogAndTable(catalog, name), _), _, _, _, _, _, _, _, _, _, _, _) => + case c @ CreateV2Table(ResolvedDBObjectName(catalog, name), _, _, _, _, _, _, _, _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( c.provider, c.options, c.location, c.serdeInfo, ctas = false) - if (!isV2Provider(provider)) { + if (isSessionCatalog(catalog) && !isV2Provider(provider)) { val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema, c.partitioning, c.bucketSpec, c.properties, provider, c.location, c.comment, storageFormat, c.external) From 9f59ece9aad792ff16c33f30b7c0bab1b13298bb Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 14 Oct 2021 16:34:52 -0700 Subject: [PATCH 07/16] rebase --- .../spark/sql/catalyst/analysis/ResolveCatalogs.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index efc1ab2cd0e1..83d7b932a8bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -37,17 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) => ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name()) - case c @ CreateTableStatement( - NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - CreateV2Table( - catalog.asTableCatalog, - tbl.asIdentifier, - c.tableSchema, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - convertTableProperties(c), - ignoreIfExists = c.ifNotExists) - case c @ CreateTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => CreateTableAsSelect( From 6114516b834a217d7557a2ceb9d5d6c3ac918cd2 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 21 Nov 2021 23:43:03 -0800 Subject: [PATCH 08/16] add case class TableProperties --- .../sql/catalyst/parser/AstBuilder.scala | 5 +-- .../catalyst/plans/logical/v2Commands.scala | 17 +++++---- .../sql/catalyst/parser/DDLParserSuite.scala | 14 ++++---- .../analysis/ResolveSessionCatalog.scala | 19 ++++++---- .../datasources/v2/DataSourceV2Strategy.scala | 8 +++-- .../sql/streaming/DataStreamWriter.scala | 20 ++++++----- .../command/PlanResolutionSuite.scala | 35 +++++++++++-------- 7 files changed, 68 insertions(+), 50 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 6661715db71a..7cfb7b8c78be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3481,13 +3481,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. + val tableProperties = TableProperties(properties, provider, options, location, comment, + serdeInfo, external) val schema = StructType(columns ++ partCols) CreateV2Table( UnresolvedDBObjectName( table, isNamespace = false), - schema, partitioning, bucketSpec, properties, options, serdeInfo, location, comment, - provider, external, ignoreIfExists = ifNotExists) + schema, partitioning, bucketSpec, tableProperties, ignoreIfExists = ifNotExists) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 748edec9b368..d0ca95e023ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -211,13 +211,7 @@ case class CreateV2Table( tableSchema: StructType, partitioning: Seq[Transform], bucketSpec: Option[BucketSpec], - properties: Map[String, String], - options: Map[String, String], - serdeInfo: Option[SerdeInfo], - location: Option[String], - comment: Option[String], - provider: Option[String], - external: Boolean, + tableProperties: TableProperties, ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlanX { override def child: LogicalPlan = name override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlanX = @@ -1112,3 +1106,12 @@ case class DropIndex( override protected def withNewChildInternal(newChild: LogicalPlan): DropIndex = copy(table = newChild) } + +case class TableProperties( + properties: Map[String, String], + provider: Option[String], + options: Map[String, String], + location: Option[String], + comment: Option[String], + serde: Option[SerdeInfo], + external: Boolean) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index e77d4ae985a7..e2d665eb0efc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2291,13 +2291,13 @@ class DDLParserSuite extends AnalysisTest { Some(create.tableSchema), create.partitioning, create.bucketSpec, - create.properties, - create.provider, - create.options, - create.location, - create.comment, - create.serdeInfo, - create.external) + create.tableProperties.properties, + create.tableProperties.provider, + create.tableProperties.options, + create.tableProperties.location, + create.tableProperties.comment, + create.tableProperties.serde, + create.tableProperties.external) case replace: ReplaceTableStatement => TableSpec( replace.tableName, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 9f03e0da7816..a40e01afe39d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -143,20 +143,25 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CreateV2Table(ResolvedDBObjectName(catalog, name), _, _, _, _, _, _, _, _, _, _, _) => + case c @ CreateV2Table(ResolvedDBObjectName(catalog, name), _, _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serdeInfo, ctas = false) + c.tableProperties.provider, + c.tableProperties.options, + c.tableProperties.location, + c.tableProperties.serde, + ctas = false) if (isSessionCatalog(catalog) && !isV2Provider(provider)) { val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema, - c.partitioning, c.bucketSpec, c.properties, provider, c.location, - c.comment, storageFormat, c.external) + c.partitioning, c.bucketSpec, c.tableProperties.properties, provider, + c.tableProperties.location, c.tableProperties.comment, storageFormat, + c.tableProperties.external) val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, None) } else { CreateV2Table( - ResolvedDBObjectName(catalog, name), - c.tableSchema, c.partitioning ++ c.bucketSpec.map(_.asTransform), None, c.properties, - c.options, c.serdeInfo, c.location, c.comment, c.provider, c.external, c.ignoreIfExists) + ResolvedDBObjectName(catalog, name), c.tableSchema, + c.partitioning ++ c.bucketSpec.map(_.asTransform), None, c.tableProperties, + c.ignoreIfExists) } case c @ CreateTableAsSelectStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index b7e67ff3db3c..25916d889ebc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -165,10 +165,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil - case CreateV2Table(ResolvedDBObjectName(catalog, ident), schema, parts, bucketSpec, - properties, options, serde, location, comment, provider, external, ifNotExists) => + case CreateV2Table(ResolvedDBObjectName(catalog, ident), schema, parts, _, + tableProperties, ifNotExists) => val props = CatalogV2Util.convertTableProperties( - properties, options, serde, location, comment, provider, external) + tableProperties.properties, tableProperties.options, tableProperties.serde, + tableProperties.location, tableProperties.comment, tableProperties.provider, + tableProperties.external) val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, parts, propsWithOwner, ifNotExists) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 5f2e23efe8b9..3d6f2a24a398 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -29,7 +29,7 @@ import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.CreateV2Table +import org.apache.spark.sql.catalyst.plans.logical.{CreateV2Table, TableProperties} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} @@ -289,20 +289,22 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * Note, currently the new table creation by this API doesn't fully cover the V2 table. * TODO (SPARK-33638): Full support of v2 table creation */ + val tableProperties = TableProperties( + Map.empty[String, String], + Some(source), + Map.empty[String, String], + extraOptions.get("path"), + None, + None, + false) val cmd = CreateV2Table( UnresolvedDBObjectName( originalMultipartIdentifier, - isNamespace = true), + isNamespace = false), df.schema.asNullable, partitioningColumns.getOrElse(Nil).asTransforms.toSeq, None, - Map.empty[String, String], - Map.empty[String, String], - None, - extraOptions.get("path"), - None, - Some(source), - external = false, + tableProperties, ignoreIfExists = false) Dataset.ofRows(df.sparkSession, cmd) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 0925975a5093..d717fc2e32ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -34,8 +34,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table} -import org.apache.spark.sql.connector.catalog.CatalogV2Util.convertTableProperties +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogV2Util, Identifier, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -480,7 +479,6 @@ class PlanResolutionSuite extends AnalysisTest { "location" -> "s3://bucket/path/to/data", "comment" -> "table comment", "other" -> "20") - import org.apache.spark.sql.connector.catalog.CatalogV2Util.convertTableProperties parseAndResolve(sql) match { case create: CreateV2Table => @@ -492,10 +490,13 @@ class PlanResolutionSuite extends AnalysisTest { .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - val props = convertTableProperties( - create.properties, create.options, create.serdeInfo, create.location, create.comment, - create.provider, create.external) - assert(props == expectedProperties) + + val properties = CatalogV2Util.convertTableProperties( + create.tableProperties.properties, create.tableProperties.options, + create.tableProperties.serde, create.tableProperties.location, + create.tableProperties.comment, create.tableProperties.provider, + create.tableProperties.external) + assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => @@ -536,10 +537,12 @@ class PlanResolutionSuite extends AnalysisTest { .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - val props = convertTableProperties( - create.properties, create.options, create.serdeInfo, create.location, create.comment, - create.provider, create.external) - assert(props == expectedProperties) + val properties = CatalogV2Util.convertTableProperties( + create.tableProperties.properties, create.tableProperties.options, + create.tableProperties.serde, create.tableProperties.location, + create.tableProperties.comment, create.tableProperties.provider, + create.tableProperties.external) + assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => @@ -579,10 +582,12 @@ class PlanResolutionSuite extends AnalysisTest { .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - val props = convertTableProperties( - create.properties, create.options, create.serdeInfo, create.location, create.comment, - create.provider, create.external) - assert(props == expectedProperties) + val properties = CatalogV2Util.convertTableProperties( + create.tableProperties.properties, create.tableProperties.options, + create.tableProperties.serde, create.tableProperties.location, + create.tableProperties.comment, create.tableProperties.provider, + create.tableProperties.external) + assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => From 8374054627d5fe5e52cd266a5df78af5b35b974d Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 23 Nov 2021 22:51:01 -0800 Subject: [PATCH 09/16] address comments --- .../sql/catalyst/parser/AstBuilder.scala | 6 +-- .../catalyst/plans/logical/v2Commands.scala | 6 +-- .../spark/sql/catalyst/trees/TreeNode.scala | 3 ++ .../sql/catalyst/parser/DDLParserSuite.scala | 18 ++++----- .../analysis/ReplaceCharWithVarchar.scala | 4 +- .../analysis/ResolveSessionCatalog.scala | 22 +++++------ .../datasources/v2/DataSourceV2Strategy.scala | 11 +++--- .../sql/streaming/DataStreamWriter.scala | 6 +-- .../command/PlanResolutionSuite.scala | 38 +++++++++---------- 9 files changed, 58 insertions(+), 56 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 7cfb7b8c78be..0e9d57df9336 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3481,14 +3481,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. - val tableProperties = TableProperties(properties, provider, options, location, comment, + val tableSpec = TableSpec(properties, provider, options, location, comment, serdeInfo, external) val schema = StructType(columns ++ partCols) - CreateV2Table( + CreateTable( UnresolvedDBObjectName( table, isNamespace = false), - schema, partitioning, bucketSpec, tableProperties, ignoreIfExists = ifNotExists) + schema, partitioning, bucketSpec, tableSpec, ignoreIfExists = ifNotExists) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d0ca95e023ba..fcf6afb93b67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -206,12 +206,12 @@ trait V2CreateTablePlanX extends LogicalPlan { /** * Create a new table with a v2 catalog. */ -case class CreateV2Table( +case class CreateTable( name: LogicalPlan, tableSchema: StructType, partitioning: Seq[Transform], bucketSpec: Option[BucketSpec], - tableProperties: TableProperties, + tableSpec: TableSpec, ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlanX { override def child: LogicalPlan = name override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlanX = @@ -1107,7 +1107,7 @@ case class DropIndex( copy(table = newChild) } -case class TableProperties( +case class TableSpec( properties: Map[String, String], provider: Option[String], options: Map[String, String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f3f6744720f2..e6ffb0e70fa3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.TableSpec import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.RuleId import org.apache.spark.sql.catalyst.rules.RuleIdCollection @@ -819,6 +820,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre redactMapString(map.asCaseSensitiveMap().asScala, maxFields) case map: Map[_, _] => redactMapString(map, maxFields) + case t: TableSpec => + redactMapString(t.options, maxFields) case table: CatalogTable => table.storage.serde match { case Some(serde) => table.identifier :: serde :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index e2d665eb0efc..49f679dcd207 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -717,7 +717,7 @@ class DDLParserSuite extends AnalysisTest { val parsedPlan = parsePlan(sqlStatement) val newTableToken = sqlStatement.split(" ")(0).trim.toUpperCase(Locale.ROOT) parsedPlan match { - case create: CreateV2Table if newTableToken == "CREATE" => + case create: CreateTable if newTableToken == "CREATE" => assert(create.ignoreIfExists == expectedIfNotExists) case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => assert(ctas.ifNotExists == expectedIfNotExists) @@ -2285,19 +2285,19 @@ class DDLParserSuite extends AnalysisTest { private object TableSpec { def apply(plan: LogicalPlan): TableSpec = { plan match { - case create: CreateV2Table => + case create: CreateTable => TableSpec( create.name.asInstanceOf[UnresolvedDBObjectName].nameParts, Some(create.tableSchema), create.partitioning, create.bucketSpec, - create.tableProperties.properties, - create.tableProperties.provider, - create.tableProperties.options, - create.tableProperties.location, - create.tableProperties.comment, - create.tableProperties.serde, - create.tableProperties.external) + create.tableSpec.properties, + create.tableSpec.provider, + create.tableSpec.options, + create.tableSpec.location, + create.tableSpec.comment, + create.tableSpec.serde, + create.tableSpec.external) case replace: ReplaceTableStatement => TableSpec( replace.tableName, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala index 7404a30fed71..3f9eb5c8084e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, CreateV2Table, LogicalPlan, ReplaceColumns, ReplaceTable} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, CreateTable, LogicalPlan, ReplaceColumns, ReplaceTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, CreateDataSourceTableCommand, CreateTableCommand} @@ -31,7 +31,7 @@ object ReplaceCharWithVarchar extends Rule[LogicalPlan] { plan.resolveOperators { // V2 commands - case cmd: CreateV2Table => + case cmd: CreateTable => cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema)) case cmd: ReplaceTable => cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index a40e01afe39d..746e0c2341e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable => CatalystCreateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL} import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table} @@ -143,25 +144,22 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CreateV2Table(ResolvedDBObjectName(catalog, name), _, _, _, _, _) => + case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.tableProperties.provider, - c.tableProperties.options, - c.tableProperties.location, - c.tableProperties.serde, + c.tableSpec.provider, + c.tableSpec.options, + c.tableSpec.location, + c.tableSpec.serde, ctas = false) if (isSessionCatalog(catalog) && !isV2Provider(provider)) { val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema, - c.partitioning, c.bucketSpec, c.tableProperties.properties, provider, - c.tableProperties.location, c.tableProperties.comment, storageFormat, - c.tableProperties.external) + c.partitioning, c.bucketSpec, c.tableSpec.properties, provider, + c.tableSpec.location, c.tableSpec.comment, storageFormat, + c.tableSpec.external) val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, None) } else { - CreateV2Table( - ResolvedDBObjectName(catalog, name), c.tableSchema, - c.partitioning ++ c.bucketSpec.map(_.asTransform), None, c.tableProperties, - c.ignoreIfExists) + c } case c @ CreateTableAsSelectStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 25916d889ebc..c5e6de3ce3e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -165,13 +165,14 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil - case CreateV2Table(ResolvedDBObjectName(catalog, ident), schema, parts, _, - tableProperties, ifNotExists) => + case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning, bucketSpec, + tableSpec, ifNotExists) => val props = CatalogV2Util.convertTableProperties( - tableProperties.properties, tableProperties.options, tableProperties.serde, - tableProperties.location, tableProperties.comment, tableProperties.provider, - tableProperties.external) + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + val parts = partitioning ++ bucketSpec.map(_.asTransform) CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, parts, propsWithOwner, ifNotExists) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 3d6f2a24a398..d232350856ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -29,7 +29,7 @@ import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.{CreateV2Table, TableProperties} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} @@ -289,7 +289,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * Note, currently the new table creation by this API doesn't fully cover the V2 table. * TODO (SPARK-33638): Full support of v2 table creation */ - val tableProperties = TableProperties( + val tableProperties = TableSpec( Map.empty[String, String], Some(source), Map.empty[String, String], @@ -297,7 +297,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { None, None, false) - val cmd = CreateV2Table( + val cmd = CreateTable( UnresolvedDBObjectName( originalMultipartIdentifier, isNamespace = false), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index d717fc2e32ea..53bd25330ff4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable => CatalystCreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogV2Util, Identifier, Table, TableCapability, TableCatalog, V1Table} @@ -481,7 +481,7 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20") parseAndResolve(sql) match { - case create: CreateV2Table => + case create: CatalystCreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name") @@ -492,15 +492,15 @@ class PlanResolutionSuite extends AnalysisTest { assert(create.partitioning.isEmpty) val properties = CatalogV2Util.convertTableProperties( - create.tableProperties.properties, create.tableProperties.options, - create.tableProperties.serde, create.tableProperties.location, - create.tableProperties.comment, create.tableProperties.provider, - create.tableProperties.external) + create.tableSpec.properties, create.tableSpec.options, + create.tableSpec.serde, create.tableSpec.location, + create.tableSpec.comment, create.tableSpec.provider, + create.tableSpec.external) assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + fail(s"Expected to parse ${classOf[CatalystCreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -528,7 +528,7 @@ class PlanResolutionSuite extends AnalysisTest { "other" -> "20") parseAndResolve(sql, withDefault = true) match { - case create: CreateV2Table => + case create: CatalystCreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name") @@ -538,15 +538,15 @@ class PlanResolutionSuite extends AnalysisTest { .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) val properties = CatalogV2Util.convertTableProperties( - create.tableProperties.properties, create.tableProperties.options, - create.tableProperties.serde, create.tableProperties.location, - create.tableProperties.comment, create.tableProperties.provider, - create.tableProperties.external) + create.tableSpec.properties, create.tableSpec.options, + create.tableSpec.serde, create.tableSpec.location, + create.tableSpec.comment, create.tableSpec.provider, + create.tableSpec.external) assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + fail(s"Expected to parse ${classOf[CatalystCreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -572,7 +572,7 @@ class PlanResolutionSuite extends AnalysisTest { "comment" -> "This is the staging page view table") parseAndResolve(sql) match { - case create: CreateV2Table => + case create: CatalystCreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == CatalogManager.SESSION_CATALOG_NAME) assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == @@ -583,15 +583,15 @@ class PlanResolutionSuite extends AnalysisTest { .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) val properties = CatalogV2Util.convertTableProperties( - create.tableProperties.properties, create.tableProperties.options, - create.tableProperties.serde, create.tableProperties.location, - create.tableProperties.comment, create.tableProperties.provider, - create.tableProperties.external) + create.tableSpec.properties, create.tableSpec.options, + create.tableSpec.serde, create.tableSpec.location, + create.tableSpec.comment, create.tableSpec.provider, + create.tableSpec.external) assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + fail(s"Expected to parse ${classOf[CatalystCreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } From 1b38e2fbd5ddf66e6a726a0ece61edc84cd2005f Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 23 Nov 2021 23:26:17 -0800 Subject: [PATCH 10/16] fix build failure --- .../apache/spark/sql/execution/datasources/rules.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 1cb11a33c3ff..be5b923743e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.{CreateTable => DataSourceCreateTable} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{AtomicType, StructType} @@ -82,7 +83,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // bucketing information is specified, as we can't infer bucketing from data files currently. // Since the runtime inferred partition columns could be different from what user specified, // we fail the query if the partitioning information is specified. - case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + case c @ DataSourceCreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => if (tableDesc.bucketSpec.isDefined) { failAnalysis("Cannot specify bucketing information if the table schema is not specified " + "when creating and will be inferred at runtime") @@ -97,7 +98,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // When we append data to an existing table, check if the given provider, partition columns, // bucket spec, etc. match the existing table, and adjust the columns order of the given query // if necessary. - case c @ CreateTable(tableDesc, SaveMode.Append, Some(query)) + case c @ DataSourceCreateTable(tableDesc, SaveMode.Append, Some(query)) if query.resolved && catalog.tableExists(tableDesc.identifier) => // This is guaranteed by the parser and `DataFrameWriter` assert(tableDesc.provider.isDefined) @@ -190,7 +191,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // * partition columns' type must be AtomicType. // * sort columns' type must be orderable. // * reorder table schema or output of query plan, to put partition columns at the end. - case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) => + case c @ DataSourceCreateTable(tableDesc, _, query) if query.forall(_.resolved) => if (query.isDefined) { assert(tableDesc.schema.isEmpty, "Schema may not be specified in a Create Table As Select (CTAS) statement") @@ -479,7 +480,7 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] { object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => + case DataSourceCreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => throw QueryCompilationErrors.ddlWithoutHiveSupportEnabledError( "CREATE Hive TABLE (AS SELECT)") case i: InsertIntoDir if DDLUtils.isHiveTable(i.provider) => From a54240d4c9b73311d52f814a764e81c72a2a11c1 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 24 Nov 2021 12:38:45 -0800 Subject: [PATCH 11/16] address comments --- .../sql/catalyst/analysis/CheckAnalysis.scala | 17 ------- .../sql/catalyst/parser/AstBuilder.scala | 4 +- .../catalyst/plans/logical/v2Commands.scala | 31 ++++++------- .../spark/sql/catalyst/trees/TreeNode.scala | 1 + .../sql/catalyst/parser/DDLParserSuite.scala | 2 +- .../analysis/ResolveSessionCatalog.scala | 8 ++-- .../sql/execution/datasources/rules.scala | 46 ------------------- .../datasources/v2/CreateTableExec.scala | 11 ++++- .../datasources/v2/DataSourceV2Strategy.scala | 10 +--- .../sql/streaming/DataStreamWriter.scala | 2 +- 10 files changed, 35 insertions(+), 97 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7e758ad422a7..5bf37a2944cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -466,23 +466,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) - case create: V2CreateTablePlanX => - val references = create.partitioning.flatMap(_.references).toSet - val badReferences = references.map(_.fieldNames).flatMap { column => - create.tableSchema.findNestedField(column) match { - case Some(_) => - None - case _ => - Some(s"${column.quoted} is missing or is in a map or array") - } - } - - if (badReferences.nonEmpty) { - failAnalysis(s"Invalid partitioning: ${badReferences.mkString(", ")}") - } - - create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) - case write: V2WriteCommand if write.resolved => write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 0e9d57df9336..2d1ec55b4384 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3481,14 +3481,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. - val tableSpec = TableSpec(properties, provider, options, location, comment, + val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, serdeInfo, external) val schema = StructType(columns ++ partCols) CreateTable( UnresolvedDBObjectName( table, isNamespace = false), - schema, partitioning, bucketSpec, tableSpec, ignoreIfExists = ifNotExists) + schema, partitioning, tableSpec, ignoreIfExists = ifNotExists) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index fcf6afb93b67..d39e28865da8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource @@ -191,18 +191,6 @@ trait V2CreateTablePlan extends LogicalPlan { def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan } -trait V2CreateTablePlanX extends LogicalPlan { - def name: LogicalPlan - def partitioning: Seq[Transform] - def tableSchema: StructType - - /** - * Creates a copy of this node with the new partitioning transforms. This method is used to - * rewrite the partition transforms normalized according to the table schema. - */ - def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlanX -} - /** * Create a new table with a v2 catalog. */ @@ -210,13 +198,21 @@ case class CreateTable( name: LogicalPlan, tableSchema: StructType, partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], tableSpec: TableSpec, - ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlanX { + ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper + override def child: LogicalPlan = name - override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlanX = + + override def tableName: Identifier = { + assert(child.resolved) + child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier + } + + override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan = copy(name = newChild) - override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlanX = { + + override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = { this.copy(partitioning = rewritten) } } @@ -1108,6 +1104,7 @@ case class DropIndex( } case class TableSpec( + bucketSpec: Option[BucketSpec], properties: Map[String, String], provider: Option[String], options: Map[String, String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e6ffb0e70fa3..f9c7bef615d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -822,6 +822,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre redactMapString(map, maxFields) case t: TableSpec => redactMapString(t.options, maxFields) + redactMapString(t.properties, maxFields) case table: CatalogTable => table.storage.serde match { case Some(serde) => table.identifier :: serde :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 49f679dcd207..f4ab8076938d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2290,7 +2290,7 @@ class DDLParserSuite extends AnalysisTest { create.name.asInstanceOf[UnresolvedDBObjectName].nameParts, Some(create.tableSchema), create.partitioning, - create.bucketSpec, + create.tableSpec.bucketSpec, create.tableSpec.properties, create.tableSpec.provider, create.tableSpec.options, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 746e0c2341e3..0940982f7a3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -144,7 +144,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _, _) => + case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( c.tableSpec.provider, c.tableSpec.options, @@ -153,13 +153,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) ctas = false) if (isSessionCatalog(catalog) && !isV2Provider(provider)) { val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema, - c.partitioning, c.bucketSpec, c.tableSpec.properties, provider, + c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, c.tableSpec.location, c.tableSpec.comment, storageFormat, c.tableSpec.external) val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, None) } else { - c + val newTableSpec = c.tableSpec.copy(bucketSpec = None) + c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), + tableSpec = newTableSpec) } case c @ CreateTableAsSelectStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index be5b923743e5..4895cab37380 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils @@ -265,51 +264,6 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi create.withPartitioning(normalizedPartitions) } - - case create: V2CreateTablePlanX if create.childrenResolved => - val schema = create.tableSchema - val partitioning = create.partitioning - val name = create.name.asInstanceOf[ResolvedDBObjectName].nameParts - val identifier = if (name.length == 2) { - Identifier.of(Array(name(0)), name(1)) - } else { - Identifier.of(Array.empty, name(0)) - } - - val isCaseSensitive = conf.caseSensitiveAnalysis - // Check that columns are not duplicated in the schema - val flattenedSchema = SchemaUtils.explodeNestedFieldNames(schema) - SchemaUtils.checkColumnNameDuplication( - flattenedSchema, - s"in the table definition of $identifier", - isCaseSensitive) - - // Check that columns are not duplicated in the partitioning statement - SchemaUtils.checkTransformDuplication( - partitioning, "in the partitioning", isCaseSensitive) - - if (schema.isEmpty) { - if (partitioning.nonEmpty) { - throw QueryCompilationErrors.specifyPartitionNotAllowedWhenTableSchemaNotDefinedError() - } - - create - } else { - // Resolve and normalize partition columns as necessary - val resolver = conf.resolver - val normalizedPartitions = partitioning.map { - case transform: RewritableTransform => - val rewritten = transform.references().map { ref => - // Throws an exception if the reference cannot be resolved - val position = SchemaUtils.findColumnPosition(ref.fieldNames(), schema, resolver) - FieldReference(SchemaUtils.getColumnName(position, schema)) - } - transform.withReferences(rewritten) - case other => other - } - - create.withPartitioning(normalizedPartitions) - } } private def fallBackV2ToV1(cls: Class[_]): Class[_] = cls.newInstance match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index be7331b0d7dc..28996834eca8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -22,7 +22,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.catalyst.plans.logical.TableSpec +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType @@ -32,10 +33,16 @@ case class CreateTableExec( identifier: Identifier, tableSchema: StructType, partitioning: Seq[Transform], - tableProperties: Map[String, String], + tableSpec: TableSpec, ignoreIfExists: Boolean) extends LeafV2CommandExec { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val props = CatalogV2Util.convertTableProperties( + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) + val tableProperties = CatalogV2Util.withDefaultOwnership(props) + override protected def run(): Seq[InternalRow] = { if (!catalog.tableExists(identifier)) { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index c5e6de3ce3e2..f64c1ee001be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -165,16 +165,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil - case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning, bucketSpec, + case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning, tableSpec, ifNotExists) => - val props = CatalogV2Util.convertTableProperties( - tableSpec.properties, tableSpec.options, tableSpec.serde, - tableSpec.location, tableSpec.comment, tableSpec.provider, - tableSpec.external) - val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) - val parts = partitioning ++ bucketSpec.map(_.asTransform) CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, - parts, propsWithOwner, ifNotExists) :: Nil + partitioning, tableSpec, ifNotExists) :: Nil case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index d232350856ce..2d3c89874f59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -290,6 +290,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * TODO (SPARK-33638): Full support of v2 table creation */ val tableProperties = TableSpec( + None, Map.empty[String, String], Some(source), Map.empty[String, String], @@ -303,7 +304,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { isNamespace = false), df.schema.asNullable, partitioningColumns.getOrElse(Nil).asTransforms.toSeq, - None, tableProperties, ignoreIfExists = false) Dataset.ofRows(df.sparkSession, cmd) From 8eda28290d07c6953d80f2581fd7cde2721d8ffb Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 24 Nov 2021 12:47:10 -0800 Subject: [PATCH 12/16] Update sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala Co-authored-by: Wenchen Fan --- .../org/apache/spark/sql/execution/datasources/rules.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 4895cab37380..bab1dde113f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable => DataSourceCreateTable} +import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{AtomicType, StructType} From 267466e1f063213ea7c1f57911d76eca4daba41e Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 24 Nov 2021 12:53:00 -0800 Subject: [PATCH 13/16] DataSourceCreateTable -> CreateTableV1 --- .../apache/spark/sql/execution/datasources/rules.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index bab1dde113f1..327d92672db8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -82,7 +82,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // bucketing information is specified, as we can't infer bucketing from data files currently. // Since the runtime inferred partition columns could be different from what user specified, // we fail the query if the partitioning information is specified. - case c @ DataSourceCreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + case c @ CreateTableV1(tableDesc, _, None) if tableDesc.schema.isEmpty => if (tableDesc.bucketSpec.isDefined) { failAnalysis("Cannot specify bucketing information if the table schema is not specified " + "when creating and will be inferred at runtime") @@ -97,7 +97,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // When we append data to an existing table, check if the given provider, partition columns, // bucket spec, etc. match the existing table, and adjust the columns order of the given query // if necessary. - case c @ DataSourceCreateTable(tableDesc, SaveMode.Append, Some(query)) + case c @ CreateTableV1(tableDesc, SaveMode.Append, Some(query)) if query.resolved && catalog.tableExists(tableDesc.identifier) => // This is guaranteed by the parser and `DataFrameWriter` assert(tableDesc.provider.isDefined) @@ -190,7 +190,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // * partition columns' type must be AtomicType. // * sort columns' type must be orderable. // * reorder table schema or output of query plan, to put partition columns at the end. - case c @ DataSourceCreateTable(tableDesc, _, query) if query.forall(_.resolved) => + case c @ CreateTableV1(tableDesc, _, query) if query.forall(_.resolved) => if (query.isDefined) { assert(tableDesc.schema.isEmpty, "Schema may not be specified in a Create Table As Select (CTAS) statement") @@ -434,7 +434,7 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] { object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case DataSourceCreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => + case CreateTableV1(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => throw QueryCompilationErrors.ddlWithoutHiveSupportEnabledError( "CREATE Hive TABLE (AS SELECT)") case i: InsertIntoDir if DDLUtils.isHiveTable(i.provider) => From 4cf26bc94edfb691de9fd0b621bdb9203e5ef8c5 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 25 Nov 2021 23:14:11 -0800 Subject: [PATCH 14/16] address comments --- .../sql/catalyst/parser/AstBuilder.scala | 6 +-- .../datasources/v2/CreateTableExec.scala | 12 +++-- .../command/PlanResolutionSuite.scala | 46 +------------------ 3 files changed, 10 insertions(+), 54 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2d1ec55b4384..0b8a087d794a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3414,7 +3414,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a table, returning a [[CreateTable]] or [[CreateV2Table]] logical plan. + * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan. * * Expected format: * {{{ @@ -3485,9 +3485,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg serdeInfo, external) val schema = StructType(columns ++ partCols) CreateTable( - UnresolvedDBObjectName( - table, - isNamespace = false), + UnresolvedDBObjectName(table, isNamespace = false), schema, partitioning, tableSpec, ignoreIfExists = ifNotExists) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index 28996834eca8..6e5c3af4573c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -37,11 +37,13 @@ case class CreateTableExec( ignoreIfExists: Boolean) extends LeafV2CommandExec { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - val props = CatalogV2Util.convertTableProperties( - tableSpec.properties, tableSpec.options, tableSpec.serde, - tableSpec.location, tableSpec.comment, tableSpec.provider, - tableSpec.external) - val tableProperties = CatalogV2Util.withDefaultOwnership(props) + val tableProperties = { + val props = CatalogV2Util.convertTableProperties( + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) + CatalogV2Util.withDefaultOwnership(props) + } override protected def run(): Seq[InternalRow] = { if (!catalog.tableExists(identifier)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 53bd25330ff4..546adac5a35c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable => CatalystCreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, CatalogV2Util, Identifier, Table, TableCapability, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation @@ -471,15 +471,6 @@ class PlanResolutionSuite extends AnalysisTest { |OPTIONS (path 's3://bucket/path/to/data', other 20) """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql) match { case create: CatalystCreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") @@ -490,13 +481,6 @@ class PlanResolutionSuite extends AnalysisTest { .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - - val properties = CatalogV2Util.convertTableProperties( - create.tableSpec.properties, create.tableSpec.options, - create.tableSpec.serde, create.tableSpec.location, - create.tableSpec.comment, create.tableSpec.provider, - create.tableSpec.external) - assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => @@ -518,15 +502,6 @@ class PlanResolutionSuite extends AnalysisTest { |OPTIONS (path 's3://bucket/path/to/data', other 20) """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql, withDefault = true) match { case create: CatalystCreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") @@ -537,12 +512,6 @@ class PlanResolutionSuite extends AnalysisTest { .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - val properties = CatalogV2Util.convertTableProperties( - create.tableSpec.properties, create.tableSpec.options, - create.tableSpec.serde, create.tableSpec.location, - create.tableSpec.comment, create.tableSpec.provider, - create.tableSpec.external) - assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => @@ -564,13 +533,6 @@ class PlanResolutionSuite extends AnalysisTest { |TBLPROPERTIES ('p1'='v1', 'p2'='v2') """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> v2Format, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - parseAndResolve(sql) match { case create: CatalystCreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == @@ -582,12 +544,6 @@ class PlanResolutionSuite extends AnalysisTest { .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - val properties = CatalogV2Util.convertTableProperties( - create.tableSpec.properties, create.tableSpec.options, - create.tableSpec.serde, create.tableSpec.location, - create.tableSpec.comment, create.tableSpec.provider, - create.tableSpec.external) - assert(properties == expectedProperties) assert(create.ignoreIfExists) case other => From 39320f4f12546475cea54a9914d05973849bb89b Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 25 Nov 2021 23:26:16 -0800 Subject: [PATCH 15/16] rebase --- .../apache/spark/sql/connector/catalog/CatalogV2Util.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index fbaf7c226feb..fabc73f9cf69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,10 +22,8 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} From 8fdf059093623b8d36e7231ac539dbb4a6449f40 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 29 Nov 2021 13:58:38 -0800 Subject: [PATCH 16/16] address comments --- .../spark/sql/catalyst/trees/TreeNode.scala | 4 +- .../command/PlanResolutionSuite.scala | 56 +++++++++---------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f9c7bef615d3..3d62cf2b8342 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -821,8 +821,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre case map: Map[_, _] => redactMapString(map, maxFields) case t: TableSpec => - redactMapString(t.options, maxFields) - redactMapString(t.properties, maxFields) + t.copy(properties = Utils.redact(t.properties).toMap, + options = Utils.redact(t.options).toMap) :: Nil case table: CatalogTable => table.storage.serde match { case Some(serde) => table.identifier :: serde :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 546adac5a35c..a6b979a3fd52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -31,12 +31,12 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable => CatalystCreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.sources.SimpleScanSource @@ -210,7 +210,7 @@ class PlanResolutionSuite extends AnalysisTest { private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parseAndResolve(sql).collect { - case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) + case CreateTableV1(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) }.head } @@ -240,7 +240,7 @@ class PlanResolutionSuite extends AnalysisTest { ) parseAndResolve(query) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -282,7 +282,7 @@ class PlanResolutionSuite extends AnalysisTest { ) parseAndResolve(query) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -302,7 +302,7 @@ class PlanResolutionSuite extends AnalysisTest { comment = Some("abc")) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -322,7 +322,7 @@ class PlanResolutionSuite extends AnalysisTest { properties = Map("test" -> "test")) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -341,7 +341,7 @@ class PlanResolutionSuite extends AnalysisTest { provider = Some("parquet")) parseAndResolve(v1) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -372,7 +372,7 @@ class PlanResolutionSuite extends AnalysisTest { provider = Some("parquet")) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -398,7 +398,7 @@ class PlanResolutionSuite extends AnalysisTest { ) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -472,7 +472,7 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql) match { - case create: CatalystCreateTable => + case create: CreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name") @@ -484,7 +484,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CatalystCreateTable].getName} from query," + + fail(s"Expected to parse ${classOf[CreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -503,7 +503,7 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql, withDefault = true) match { - case create: CatalystCreateTable => + case create: CreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name") @@ -515,7 +515,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CatalystCreateTable].getName} from query," + + fail(s"Expected to parse ${classOf[CreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -534,7 +534,7 @@ class PlanResolutionSuite extends AnalysisTest { """.stripMargin parseAndResolve(sql) match { - case create: CatalystCreateTable => + case create: CreateTable => assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == CatalogManager.SESSION_CATALOG_NAME) assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == @@ -547,7 +547,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CatalystCreateTable].getName} from query," + + fail(s"Expected to parse ${classOf[CreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -1660,9 +1660,9 @@ class PlanResolutionSuite extends AnalysisTest { */ def normalizePlan(plan: LogicalPlan): LogicalPlan = { plan match { - case CreateTable(tableDesc, mode, query) => + case CreateTableV1(tableDesc, mode, query) => val newTableDesc = tableDesc.copy(createTime = -1L) - CreateTable(newTableDesc, mode, query) + CreateTableV1(newTableDesc, mode, query) case _ => plan // Don't transform } } @@ -1683,8 +1683,8 @@ class PlanResolutionSuite extends AnalysisTest { partitionColumnNames: Seq[String] = Seq.empty, comment: Option[String] = None, mode: SaveMode = SaveMode.ErrorIfExists, - query: Option[LogicalPlan] = None): CreateTable = { - CreateTable( + query: Option[LogicalPlan] = None): CreateTableV1 = { + CreateTableV1( CatalogTable( identifier = TableIdentifier(table, database), tableType = tableType, @@ -1766,7 +1766,7 @@ class PlanResolutionSuite extends AnalysisTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab STORED AS $s" parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == @@ -1785,14 +1785,14 @@ class PlanResolutionSuite extends AnalysisTest { // No conflicting serdes here, OK parseAndResolve(query1) match { - case parsed1: CreateTable => + case parsed1: CreateTableV1 => assert(parsed1.tableDesc.storage.serde == Some("anything")) assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt")) } parseAndResolve(query2) match { - case parsed2: CreateTable => + case parsed2: CreateTableV1 => assert(parsed2.tableDesc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt")) @@ -1808,7 +1808,7 @@ class PlanResolutionSuite extends AnalysisTest { val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s" if (supportedSources.contains(s)) { parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == Some("anything")) @@ -1829,7 +1829,7 @@ class PlanResolutionSuite extends AnalysisTest { val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s" if (supportedSources.contains(s)) { parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == hiveSerde.get.serde @@ -1846,14 +1846,14 @@ class PlanResolutionSuite extends AnalysisTest { test("create hive external table") { val withoutLoc = "CREATE EXTERNAL TABLE my_tab STORED AS parquet" parseAndResolve(withoutLoc) match { - case ct: CreateTable => + case ct: CreateTableV1 => assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri.isEmpty) } val withLoc = "CREATE EXTERNAL TABLE my_tab STORED AS parquet LOCATION '/something/anything'" parseAndResolve(withLoc) match { - case ct: CreateTable => + case ct: CreateTableV1 => assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) } @@ -1873,7 +1873,7 @@ class PlanResolutionSuite extends AnalysisTest { test("create hive table - location implies external") { val query = "CREATE TABLE my_tab STORED AS parquet LOCATION '/something/anything'" parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) }