diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index efc1ab2cd0e1..83d7b932a8bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -37,17 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) => ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name()) - case c @ CreateTableStatement( - NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => - CreateV2Table( - catalog.asTableCatalog, - tbl.asIdentifier, - c.tableSchema, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - convertTableProperties(c), - ignoreIfExists = c.ifNotExists) - case c @ CreateTableAsSelectStatement( NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) => CreateTableAsSelect( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a78f08ac8f4c..0b8a087d794a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3414,7 +3414,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create a table, returning a [[CreateTableStatement]] logical plan. + * Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan. * * Expected format: * {{{ @@ -3481,9 +3481,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg case _ => // Note: table schema includes both the table columns list and the partition columns // with data type. + val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment, + serdeInfo, external) val schema = StructType(columns ++ partCols) - CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider, - options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists) + CreateTable( + UnresolvedDBObjectName(table, isNamespace = false), + schema, partitioning, tableSpec, ignoreIfExists = ifNotExists) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index ccc4e190ba51..a6ed304e7155 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -123,25 +123,6 @@ object SerdeInfo { } } -/** - * A CREATE TABLE command, as parsed from SQL. - * - * This is a metadata-only command and is not used to write data to the created table. - */ -case class CreateTableStatement( - tableName: Seq[String], - tableSchema: StructType, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: Option[String], - options: Map[String, String], - location: Option[String], - comment: Option[String], - serde: Option[SerdeInfo], - external: Boolean, - ifNotExists: Boolean) extends LeafParsedStatement - /** * A CREATE TABLE AS SELECT command, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 4ed5d87aaf10..d39e28865da8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, UnresolvedException} +import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException} +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} @@ -193,13 +194,24 @@ trait V2CreateTablePlan extends LogicalPlan { /** * Create a new table with a v2 catalog. */ -case class CreateV2Table( - catalog: TableCatalog, - tableName: Identifier, +case class CreateTable( + name: LogicalPlan, tableSchema: StructType, partitioning: Seq[Transform], - properties: Map[String, String], - ignoreIfExists: Boolean) extends LeafCommand with V2CreateTablePlan { + tableSpec: TableSpec, + ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper + + override def child: LogicalPlan = name + + override def tableName: Identifier = { + assert(child.resolved) + child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier + } + + override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan = + copy(name = newChild) + override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = { this.copy(partitioning = rewritten) } @@ -1090,3 +1102,13 @@ case class DropIndex( override protected def withNewChildInternal(newChild: LogicalPlan): DropIndex = copy(table = newChild) } + +case class TableSpec( + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: Option[String], + options: Map[String, String], + location: Option[String], + comment: Option[String], + serde: Option[SerdeInfo], + external: Boolean) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index f3f6744720f2..3d62cf2b8342 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType +import org.apache.spark.sql.catalyst.plans.logical.TableSpec import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.RuleId import org.apache.spark.sql.catalyst.rules.RuleIdCollection @@ -819,6 +820,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre redactMapString(map.asCaseSensitiveMap().asScala, maxFields) case map: Map[_, _] => redactMapString(map, maxFields) + case t: TableSpec => + t.copy(properties = Utils.redact(t.properties).toMap, + options = Utils.redact(t.options).toMap) :: Nil case table: CatalogTable => table.storage.serde match { case Some(serde) => table.identifier :: serde :: Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 7c11463fc7c5..fabc73f9cf69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -23,7 +23,7 @@ import java.util.Collections import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -305,11 +305,6 @@ private[sql] object CatalogV2Util { catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) } - def convertTableProperties(c: CreateTableStatement): Map[String, String] = { - convertTableProperties( - c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) - } - def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = { convertTableProperties( c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external) @@ -323,7 +318,7 @@ private[sql] object CatalogV2Util { convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider) } - private def convertTableProperties( + def convertTableProperties( properties: Map[String, String], options: Map[String, String], serdeInfo: Option[SerdeInfo], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 70227bb0554f..f4ab8076938d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -717,8 +717,8 @@ class DDLParserSuite extends AnalysisTest { val parsedPlan = parsePlan(sqlStatement) val newTableToken = sqlStatement.split(" ")(0).trim.toUpperCase(Locale.ROOT) parsedPlan match { - case create: CreateTableStatement if newTableToken == "CREATE" => - assert(create.ifNotExists == expectedIfNotExists) + case create: CreateTable if newTableToken == "CREATE" => + assert(create.ignoreIfExists == expectedIfNotExists) case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" => assert(ctas.ifNotExists == expectedIfNotExists) case replace: ReplaceTableStatement if newTableToken == "REPLACE" => @@ -2285,19 +2285,19 @@ class DDLParserSuite extends AnalysisTest { private object TableSpec { def apply(plan: LogicalPlan): TableSpec = { plan match { - case create: CreateTableStatement => + case create: CreateTable => TableSpec( - create.tableName, + create.name.asInstanceOf[UnresolvedDBObjectName].nameParts, Some(create.tableSchema), create.partitioning, - create.bucketSpec, - create.properties, - create.provider, - create.options, - create.location, - create.comment, - create.serde, - create.external) + create.tableSpec.bucketSpec, + create.tableSpec.properties, + create.tableSpec.provider, + create.tableSpec.options, + create.tableSpec.location, + create.tableSpec.comment, + create.tableSpec.serde, + create.tableSpec.external) case replace: ReplaceTableStatement => TableSpec( replace.tableName, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala index 7404a30fed71..3f9eb5c8084e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, CreateV2Table, LogicalPlan, ReplaceColumns, ReplaceTable} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, CreateTable, LogicalPlan, ReplaceColumns, ReplaceTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, CreateDataSourceTableCommand, CreateTableCommand} @@ -31,7 +31,7 @@ object ReplaceCharWithVarchar extends Rule[LogicalPlan] { plan.resolveOperators { // V2 commands - case cmd: CreateV2Table => + case cmd: CreateTable => cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema)) case cmd: ReplaceTable => cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5362b6bf6974..0940982f7a3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable => CatalystCreateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL} import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table} @@ -143,25 +144,24 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. - case c @ CreateTableStatement( - SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) => + case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) => val (storageFormat, provider) = getStorageFormatAndProvider( - c.provider, c.options, c.location, c.serde, ctas = false) - if (!isV2Provider(provider)) { - val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema, - c.partitioning, c.bucketSpec, c.properties, provider, c.location, - c.comment, storageFormat, c.external) - val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + c.tableSpec.provider, + c.tableSpec.options, + c.tableSpec.location, + c.tableSpec.serde, + ctas = false) + if (isSessionCatalog(catalog) && !isV2Provider(provider)) { + val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema, + c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider, + c.tableSpec.location, c.tableSpec.comment, storageFormat, + c.tableSpec.external) + val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTable(tableDesc, mode, None) } else { - CreateV2Table( - catalog.asTableCatalog, - tbl.asIdentifier, - c.tableSchema, - // convert the bucket spec and add it as a transform - c.partitioning ++ c.bucketSpec.map(_.asTransform), - convertTableProperties(c), - ignoreIfExists = c.ifNotExists) + val newTableSpec = c.tableSpec.copy(bucketSpec = None) + c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform), + tableSpec = newTableSpec) } case c @ CreateTableAsSelectStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 0e8efb629706..327d92672db8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.sources.InsertableRelation import org.apache.spark.sql.types.{AtomicType, StructType} @@ -81,7 +82,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // bucketing information is specified, as we can't infer bucketing from data files currently. // Since the runtime inferred partition columns could be different from what user specified, // we fail the query if the partitioning information is specified. - case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty => + case c @ CreateTableV1(tableDesc, _, None) if tableDesc.schema.isEmpty => if (tableDesc.bucketSpec.isDefined) { failAnalysis("Cannot specify bucketing information if the table schema is not specified " + "when creating and will be inferred at runtime") @@ -96,7 +97,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // When we append data to an existing table, check if the given provider, partition columns, // bucket spec, etc. match the existing table, and adjust the columns order of the given query // if necessary. - case c @ CreateTable(tableDesc, SaveMode.Append, Some(query)) + case c @ CreateTableV1(tableDesc, SaveMode.Append, Some(query)) if query.resolved && catalog.tableExists(tableDesc.identifier) => // This is guaranteed by the parser and `DataFrameWriter` assert(tableDesc.provider.isDefined) @@ -189,7 +190,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi // * partition columns' type must be AtomicType. // * sort columns' type must be orderable. // * reorder table schema or output of query plan, to put partition columns at the end. - case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) => + case c @ CreateTableV1(tableDesc, _, query) if query.forall(_.resolved) => if (query.isDefined) { assert(tableDesc.schema.isEmpty, "Schema may not be specified in a Create Table As Select (CTAS) statement") @@ -433,7 +434,7 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] { object HiveOnlyCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => + case CreateTableV1(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) => throw QueryCompilationErrors.ddlWithoutHiveSupportEnabledError( "CREATE Hive TABLE (AS SELECT)") case i: InsertIntoDir if DDLUtils.isHiveTable(i.provider) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala index be7331b0d7dc..6e5c3af4573c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateTableExec.scala @@ -22,7 +22,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.catalyst.plans.logical.TableSpec +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.StructType @@ -32,10 +33,18 @@ case class CreateTableExec( identifier: Identifier, tableSchema: StructType, partitioning: Seq[Transform], - tableProperties: Map[String, String], + tableSpec: TableSpec, ignoreIfExists: Boolean) extends LeafV2CommandExec { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val tableProperties = { + val props = CatalogV2Util.convertTableProperties( + tableSpec.properties, tableSpec.options, tableSpec.serde, + tableSpec.location, tableSpec.comment, tableSpec.provider, + tableSpec.external) + CatalogV2Util.withDefaultOwnership(props) + } + override protected def run(): Seq[InternalRow] = { if (!catalog.tableExists(identifier)) { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 026ff63608bb..f64c1ee001be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -165,9 +165,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil - case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) => - val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) - CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil + case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning, + tableSpec, ifNotExists) => + CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, + partitioning, tableSpec, ifNotExists) :: Nil case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 10ce9d3aaf01..2d3c89874f59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -27,8 +27,9 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Evolving import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.CreateTableStatement +import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} @@ -288,10 +289,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * Note, currently the new table creation by this API doesn't fully cover the V2 table. * TODO (SPARK-33638): Full support of v2 table creation */ - val cmd = CreateTableStatement( - originalMultipartIdentifier, - df.schema.asNullable, - partitioningColumns.getOrElse(Nil).asTransforms.toSeq, + val tableProperties = TableSpec( None, Map.empty[String, String], Some(source), @@ -299,8 +297,15 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { extraOptions.get("path"), None, None, - external = false, - ifNotExists = false) + false) + val cmd = CreateTable( + UnresolvedDBObjectName( + originalMultipartIdentifier, + isNamespace = false), + df.schema.asNullable, + partitioningColumns.getOrElse(Nil).asTransforms.toSeq, + tableProperties, + ignoreIfExists = false) Dataset.ofRows(df.sparkSession, cmd) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 85ba14fc7a44..a6b979a3fd52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -26,17 +26,17 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable} +import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{AnsiCast, AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTableAsSelect, CreateTableStatement, CreateV2Table, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.FakeV2Provider import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCapability, TableCatalog, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.sources.SimpleScanSource @@ -210,7 +210,7 @@ class PlanResolutionSuite extends AnalysisTest { private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { parseAndResolve(sql).collect { - case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) + case CreateTableV1(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) }.head } @@ -240,7 +240,7 @@ class PlanResolutionSuite extends AnalysisTest { ) parseAndResolve(query) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -282,7 +282,7 @@ class PlanResolutionSuite extends AnalysisTest { ) parseAndResolve(query) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -302,7 +302,7 @@ class PlanResolutionSuite extends AnalysisTest { comment = Some("abc")) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -322,7 +322,7 @@ class PlanResolutionSuite extends AnalysisTest { properties = Map("test" -> "test")) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -341,7 +341,7 @@ class PlanResolutionSuite extends AnalysisTest { provider = Some("parquet")) parseAndResolve(v1) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -372,7 +372,7 @@ class PlanResolutionSuite extends AnalysisTest { provider = Some("parquet")) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -398,7 +398,7 @@ class PlanResolutionSuite extends AnalysisTest { ) parseAndResolve(sql) match { - case CreateTable(tableDesc, _, None) => + case CreateTableV1(tableDesc, _, None) => assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime)) case other => fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," + @@ -471,29 +471,20 @@ class PlanResolutionSuite extends AnalysisTest { |OPTIONS (path 's3://bucket/path/to/data', other 20) """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql) match { - case create: CreateV2Table => - assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + case create: CreateTable => + assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.table_name") assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + fail(s"Expected to parse ${classOf[CreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -511,29 +502,20 @@ class PlanResolutionSuite extends AnalysisTest { |OPTIONS (path 's3://bucket/path/to/data', other 20) """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "option.other" -> "20", - "provider" -> "parquet", - "location" -> "s3://bucket/path/to/data", - "comment" -> "table comment", - "other" -> "20") - parseAndResolve(sql, withDefault = true) match { - case create: CreateV2Table => - assert(create.catalog.name == "testcat") - assert(create.tableName == Identifier.of(Array("mydb"), "table_name")) + case create: CreateTable => + assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat") + assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.table_name") assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + fail(s"Expected to parse ${classOf[CreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -551,27 +533,21 @@ class PlanResolutionSuite extends AnalysisTest { |TBLPROPERTIES ('p1'='v1', 'p2'='v2') """.stripMargin - val expectedProperties = Map( - "p1" -> "v1", - "p2" -> "v2", - "provider" -> v2Format, - "location" -> "/user/external/page_view", - "comment" -> "This is the staging page view table") - parseAndResolve(sql) match { - case create: CreateV2Table => - assert(create.catalog.name == CatalogManager.SESSION_CATALOG_NAME) - assert(create.tableName == Identifier.of(Array("mydb"), "page_view")) + case create: CreateTable => + assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == + CatalogManager.SESSION_CATALOG_NAME) + assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == + "mydb.page_view") assert(create.tableSchema == new StructType() .add("id", LongType) .add("description", StringType) .add("point", new StructType().add("x", DoubleType).add("y", DoubleType))) assert(create.partitioning.isEmpty) - assert(create.properties == expectedProperties) assert(create.ignoreIfExists) case other => - fail(s"Expected to parse ${classOf[CreateV2Table].getName} from query," + + fail(s"Expected to parse ${classOf[CreateTable].getName} from query," + s"got ${other.getClass.getName}: $sql") } } @@ -1684,9 +1660,9 @@ class PlanResolutionSuite extends AnalysisTest { */ def normalizePlan(plan: LogicalPlan): LogicalPlan = { plan match { - case CreateTable(tableDesc, mode, query) => + case CreateTableV1(tableDesc, mode, query) => val newTableDesc = tableDesc.copy(createTime = -1L) - CreateTable(newTableDesc, mode, query) + CreateTableV1(newTableDesc, mode, query) case _ => plan // Don't transform } } @@ -1707,8 +1683,8 @@ class PlanResolutionSuite extends AnalysisTest { partitionColumnNames: Seq[String] = Seq.empty, comment: Option[String] = None, mode: SaveMode = SaveMode.ErrorIfExists, - query: Option[LogicalPlan] = None): CreateTable = { - CreateTable( + query: Option[LogicalPlan] = None): CreateTableV1 = { + CreateTableV1( CatalogTable( identifier = TableIdentifier(table, database), tableType = tableType, @@ -1790,7 +1766,7 @@ class PlanResolutionSuite extends AnalysisTest { allSources.foreach { s => val query = s"CREATE TABLE my_tab STORED AS $s" parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == @@ -1809,14 +1785,14 @@ class PlanResolutionSuite extends AnalysisTest { // No conflicting serdes here, OK parseAndResolve(query1) match { - case parsed1: CreateTable => + case parsed1: CreateTableV1 => assert(parsed1.tableDesc.storage.serde == Some("anything")) assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt")) assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt")) } parseAndResolve(query2) match { - case parsed2: CreateTable => + case parsed2: CreateTableV1 => assert(parsed2.tableDesc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt")) @@ -1832,7 +1808,7 @@ class PlanResolutionSuite extends AnalysisTest { val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s" if (supportedSources.contains(s)) { parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == Some("anything")) @@ -1853,7 +1829,7 @@ class PlanResolutionSuite extends AnalysisTest { val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s" if (supportedSources.contains(s)) { parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => val hiveSerde = HiveSerDe.sourceToSerDe(s) assert(hiveSerde.isDefined) assert(ct.tableDesc.storage.serde == hiveSerde.get.serde @@ -1870,14 +1846,14 @@ class PlanResolutionSuite extends AnalysisTest { test("create hive external table") { val withoutLoc = "CREATE EXTERNAL TABLE my_tab STORED AS parquet" parseAndResolve(withoutLoc) match { - case ct: CreateTable => + case ct: CreateTableV1 => assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri.isEmpty) } val withLoc = "CREATE EXTERNAL TABLE my_tab STORED AS parquet LOCATION '/something/anything'" parseAndResolve(withLoc) match { - case ct: CreateTable => + case ct: CreateTableV1 => assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) } @@ -1897,7 +1873,7 @@ class PlanResolutionSuite extends AnalysisTest { test("create hive table - location implies external") { val query = "CREATE TABLE my_tab STORED AS parquet LOCATION '/something/anything'" parseAndResolve(query) match { - case ct: CreateTable => + case ct: CreateTableV1 => assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL) assert(ct.tableDesc.storage.locationUri == Some(new URI("/something/anything"))) } @@ -2261,14 +2237,6 @@ class PlanResolutionSuite extends AnalysisTest { assert(e2.getMessage.contains("Operation not allowed")) } - test("create table - properties") { - val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" - parsePlan(query) match { - case state: CreateTableStatement => - assert(state.properties == Map("k1" -> "v1", "k2" -> "v2")) - } - } - test("create table(hive) - everything!") { val query = """