Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) =>
ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
ignoreIfExists = c.ifNotExists)

case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
CreateTableAsSelect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3414,7 +3414,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create a table, returning a [[CreateTableStatement]] logical plan.
* Create a table, returning a [[CreateTable]] or [[CreateTableAsSelectStatement]] logical plan.
*
* Expected format:
* {{{
Expand Down Expand Up @@ -3481,9 +3481,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
serdeInfo, external)
val schema = StructType(columns ++ partCols)
CreateTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
options, location, comment, serdeInfo, external = external, ifNotExists = ifNotExists)
CreateTable(
UnresolvedDBObjectName(table, isNamespace = false),
schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,6 @@ object SerdeInfo {
}
}

/**
* A CREATE TABLE command, as parsed from SQL.
*
* This is a metadata-only command and is not used to write data to the created table.
*/
case class CreateTableStatement(
tableName: Seq[String],
tableSchema: StructType,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
external: Boolean,
ifNotExists: Boolean) extends LeafParsedStatement

/**
* A CREATE TABLE AS SELECT command, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
Expand Down Expand Up @@ -193,13 +194,24 @@ trait V2CreateTablePlan extends LogicalPlan {
/**
* Create a new table with a v2 catalog.
*/
case class CreateV2Table(
catalog: TableCatalog,
tableName: Identifier,
case class CreateTable(
name: LogicalPlan,
tableSchema: StructType,
partitioning: Seq[Transform],
properties: Map[String, String],
ignoreIfExists: Boolean) extends LeafCommand with V2CreateTablePlan {
tableSpec: TableSpec,
ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

override def child: LogicalPlan = name

override def tableName: Identifier = {
assert(child.resolved)
child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
}

override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
copy(name = newChild)

override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}
Expand Down Expand Up @@ -1090,3 +1102,13 @@ case class DropIndex(
override protected def withNewChildInternal(newChild: LogicalPlan): DropIndex =
copy(table = newChild)
}

case class TableSpec(
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
external: Boolean)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.RuleId
import org.apache.spark.sql.catalyst.rules.RuleIdCollection
Expand Down Expand Up @@ -819,6 +820,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Tre
redactMapString(map.asCaseSensitiveMap().asScala, maxFields)
case map: Map[_, _] =>
redactMapString(map, maxFields)
case t: TableSpec =>
t.copy(properties = Utils.redact(t.properties).toMap,
options = Utils.redact(t.options).toMap) :: Nil
case table: CatalogTable =>
table.storage.serde match {
case Some(serde) => table.identifier :: serde :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
Expand Down Expand Up @@ -305,11 +305,6 @@ private[sql] object CatalogV2Util {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}

def convertTableProperties(c: CreateTableStatement): Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just change c to CreateV2Table

convertTableProperties(
c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external)
}

def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = {
convertTableProperties(
c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external)
Expand All @@ -323,7 +318,7 @@ private[sql] object CatalogV2Util {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
}

private def convertTableProperties(
def convertTableProperties(
properties: Map[String, String],
options: Map[String, String],
serdeInfo: Option[SerdeInfo],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,8 @@ class DDLParserSuite extends AnalysisTest {
val parsedPlan = parsePlan(sqlStatement)
val newTableToken = sqlStatement.split(" ")(0).trim.toUpperCase(Locale.ROOT)
parsedPlan match {
case create: CreateTableStatement if newTableToken == "CREATE" =>
assert(create.ifNotExists == expectedIfNotExists)
case create: CreateTable if newTableToken == "CREATE" =>
assert(create.ignoreIfExists == expectedIfNotExists)
case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
assert(ctas.ifNotExists == expectedIfNotExists)
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
Expand Down Expand Up @@ -2285,19 +2285,19 @@ class DDLParserSuite extends AnalysisTest {
private object TableSpec {
def apply(plan: LogicalPlan): TableSpec = {
plan match {
case create: CreateTableStatement =>
case create: CreateTable =>
TableSpec(
create.tableName,
create.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
Some(create.tableSchema),
create.partitioning,
create.bucketSpec,
create.properties,
create.provider,
create.options,
create.location,
create.comment,
create.serde,
create.external)
create.tableSpec.bucketSpec,
create.tableSpec.properties,
create.tableSpec.provider,
create.tableSpec.options,
create.tableSpec.location,
create.tableSpec.comment,
create.tableSpec.serde,
create.tableSpec.external)
case replace: ReplaceTableStatement =>
TableSpec(
replace.tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, CreateV2Table, LogicalPlan, ReplaceColumns, ReplaceTable}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, CreateTable, LogicalPlan, ReplaceColumns, ReplaceTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, CreateDataSourceTableCommand, CreateTableCommand}
Expand All @@ -31,7 +31,7 @@ object ReplaceCharWithVarchar extends Rule[LogicalPlan] {

plan.resolveOperators {
// V2 commands
case cmd: CreateV2Table =>
case cmd: CreateTable =>
cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema))
case cmd: ReplaceTable =>
cmd.copy(tableSchema = replaceCharWithVarcharInSchema(cmd.tableSchema))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable => CatalystCreateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table}
Expand Down Expand Up @@ -143,25 +144,24 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ CreateTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.provider, c.options, c.location, c.serde, ctas = false)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, c.tableSchema,
c.partitioning, c.bucketSpec, c.properties, provider, c.location,
c.comment, storageFormat, c.external)
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
c.tableSpec.provider,
c.tableSpec.options,
c.tableSpec.location,
c.tableSpec.serde,
ctas = false)
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
val tableDesc = buildCatalogTable(name.asTableIdentifier, c.tableSchema,
c.partitioning, c.tableSpec.bucketSpec, c.tableSpec.properties, provider,
c.tableSpec.location, c.tableSpec.comment, storageFormat,
c.tableSpec.external)
val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTable(tableDesc, mode, None)
} else {
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
ignoreIfExists = c.ifNotExists)
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}

case c @ CreateTableAsSelectStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.{AtomicType, StructType}
Expand Down Expand Up @@ -81,7 +82,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
// bucketing information is specified, as we can't infer bucketing from data files currently.
// Since the runtime inferred partition columns could be different from what user specified,
// we fail the query if the partitioning information is specified.
case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
case c @ CreateTableV1(tableDesc, _, None) if tableDesc.schema.isEmpty =>
if (tableDesc.bucketSpec.isDefined) {
failAnalysis("Cannot specify bucketing information if the table schema is not specified " +
"when creating and will be inferred at runtime")
Expand All @@ -96,7 +97,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
// When we append data to an existing table, check if the given provider, partition columns,
// bucket spec, etc. match the existing table, and adjust the columns order of the given query
// if necessary.
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
case c @ CreateTableV1(tableDesc, SaveMode.Append, Some(query))
if query.resolved && catalog.tableExists(tableDesc.identifier) =>
// This is guaranteed by the parser and `DataFrameWriter`
assert(tableDesc.provider.isDefined)
Expand Down Expand Up @@ -189,7 +190,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
// * partition columns' type must be AtomicType.
// * sort columns' type must be orderable.
// * reorder table schema or output of query plan, to put partition columns at the end.
case c @ CreateTable(tableDesc, _, query) if query.forall(_.resolved) =>
case c @ CreateTableV1(tableDesc, _, query) if query.forall(_.resolved) =>
if (query.isDefined) {
assert(tableDesc.schema.isEmpty,
"Schema may not be specified in a Create Table As Select (CTAS) statement")
Expand Down Expand Up @@ -433,7 +434,7 @@ object PreprocessTableInsertion extends Rule[LogicalPlan] {
object HiveOnlyCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case CreateTable(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
case CreateTableV1(tableDesc, _, _) if DDLUtils.isHiveTable(tableDesc) =>
throw QueryCompilationErrors.ddlWithoutHiveSupportEnabledError(
"CREATE Hive TABLE (AS SELECT)")
case i: InsertIntoDir if DDLUtils.isHiveTable(i.provider) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.StructType
Expand All @@ -32,10 +33,18 @@ case class CreateTableExec(
identifier: Identifier,
tableSchema: StructType,
partitioning: Seq[Transform],
tableProperties: Map[String, String],
tableSpec: TableSpec,
ignoreIfExists: Boolean) extends LeafV2CommandExec {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val tableProperties = {
val props = CatalogV2Util.convertTableProperties(
tableSpec.properties, tableSpec.options, tableSpec.serde,
tableSpec.location, tableSpec.comment, tableSpec.provider,
tableSpec.external)
CatalogV2Util.withDefaultOwnership(props)
}

override protected def run(): Seq[InternalRow] = {
if (!catalog.tableExists(identifier)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil

case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil
case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning,
tableSpec, ifNotExists) =>
CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, schema,
partitioning, tableSpec, ifNotExists) :: Nil

case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
Expand Down
Loading