Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,13 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Lo
class ResolveCatalogs(val catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Util._

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case UnresolvedDBObjectName(CatalogAndNamespace(catalog, name), isNamespace) if isNamespace =>
ResolvedDBObjectName(catalog, name)

case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) =>
ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())

case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
orCreate = c.orCreate)
}

object NonSessionCatalogAndTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3487,7 +3487,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Replace a table, returning a [[ReplaceTableStatement]] or [[ReplaceTableAsSelect]]
* Replace a table, returning a [[ReplaceTable]] or [[ReplaceTableAsSelect]]
* logical plan.
*
* Expected format:
Expand Down Expand Up @@ -3540,6 +3540,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

val partitioning = partitionExpressions(partTransforms, partCols, ctx)
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
serdeInfo, false)

Option(ctx.query).map(plan) match {
case Some(_) if columns.nonEmpty =>
Expand All @@ -3554,8 +3556,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx)

case Some(query) =>
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
serdeInfo, false)
ReplaceTableAsSelect(
UnresolvedDBObjectName(table, isNamespace = false),
partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
Expand All @@ -3564,8 +3564,9 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
ReplaceTableStatement(table, schema, partitioning, bucketSpec, properties, provider,
options, location, comment, serdeInfo, orCreate = orCreate)
ReplaceTable(
UnresolvedDBObjectName(table, isNamespace = false),
schema, partitioning, tableSpec, orCreate = orCreate)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.DataType

/**
* A logical plan node that contains exactly what was parsed from SQL.
Expand Down Expand Up @@ -123,25 +121,6 @@ object SerdeInfo {
}
}

/**
* A REPLACE TABLE command, as parsed from SQL.
*
* If the table exists prior to running this command, executing this statement
* will replace the table's metadata and clear the underlying rows from the table.
*/
case class ReplaceTableStatement(
tableName: Seq[String],
tableSchema: StructType,
partitioning: Seq[Transform],
bucketSpec: Option[BucketSpec],
properties: Map[String, String],
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String],
serde: Option[SerdeInfo],
orCreate: Boolean) extends LeafParsedStatement

/**
* Column data as parsed by ALTER TABLE ... (ADD|REPLACE) COLUMNS.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,23 @@ case class CreateTableAsSelect(
* The persisted table will have no contents as a result of this operation.
*/
case class ReplaceTable(
catalog: TableCatalog,
tableName: Identifier,
name: LogicalPlan,
tableSchema: StructType,
partitioning: Seq[Transform],
properties: Map[String, String],
orCreate: Boolean) extends LeafCommand with V2CreateTablePlan {
tableSpec: TableSpec,
orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper

override def child: LogicalPlan = name

override def tableName: Identifier = {
assert(child.resolved)
child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
}

override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
copy(name = newChild)

override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
this.copy(partitioning = rewritten)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.plans.logical.{ReplaceTableStatement, SerdeInfo, TableSpec}
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
Expand Down Expand Up @@ -305,10 +305,6 @@ private[sql] object CatalogV2Util {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}

def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
}

def convertTableProperties(t: TableSpec): Map[String, String] = {
val props = convertTableProperties(
t.properties, t.options, t.serde, t.location, t.comment, t.provider, t.external)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ class DDLParserSuite extends AnalysisTest {
assert(create.ignoreIfExists == expectedIfNotExists)
case ctas: CreateTableAsSelect if newTableToken == "CREATE" =>
assert(ctas.ignoreIfExists == expectedIfNotExists)
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
case replace: ReplaceTable if newTableToken == "REPLACE" =>
case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" =>
case other =>
fail("First token in statement does not match the expected parsed plan; CREATE TABLE" +
Expand Down Expand Up @@ -2298,18 +2298,18 @@ class DDLParserSuite extends AnalysisTest {
create.tableSpec.comment,
create.tableSpec.serde,
create.tableSpec.external)
case replace: ReplaceTableStatement =>
case replace: ReplaceTable =>
TableSpec(
replace.tableName,
replace.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
Some(replace.tableSchema),
replace.partitioning,
replace.bucketSpec,
replace.properties,
replace.provider,
replace.options,
replace.location,
replace.comment,
replace.serde)
replace.tableSpec.bucketSpec,
replace.tableSpec.properties,
replace.tableSpec.provider,
replace.tableSpec.options,
replace.tableSpec.location,
replace.tableSpec.comment,
replace.tableSpec.serde)
case ctas: CreateTableAsSelect =>
TableSpec(
ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)

// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ ReplaceTableStatement(
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
case c @ ReplaceTable(
ResolvedDBObjectName(catalog, name), _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (isSessionCatalog(catalog) && !isV2Provider(provider)) {
throw QueryCompilationErrors.replaceTableOnlySupportedWithV2TableError
} else {
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c),
orCreate = c.orCreate)
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
tableSpec = newTableSpec)
}

case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, Literal => V2Literal, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse => V2AlwaysFalse, AlwaysTrue => V2AlwaysTrue, And => V2And, EqualNullSafe => V2EqualNullSafe, EqualTo => V2EqualTo, Filter => V2Filter, GreaterThan => V2GreaterThan, GreaterThanOrEqual => V2GreaterThanOrEqual, In => V2In, IsNotNull => V2IsNotNull, IsNull => V2IsNull, LessThan => V2LessThan, LessThanOrEqual => V2LessThanOrEqual, Not => V2Not, Or => V2Or, StringContains => V2StringContains, StringEndsWith => V2StringEndsWith, StringStartsWith => V2StringStartsWith}
Expand Down Expand Up @@ -184,19 +184,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil

case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
val newProps = props.get(TableCatalog.PROP_LOCATION).map { loc =>
props + (TableCatalog.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
}.getOrElse(props)
val propsWithOwner = CatalogV2Util.withDefaultOwnership(newProps)
case ReplaceTable(ResolvedDBObjectName(catalog, ident), schema, parts, tableSpec, orCreate) =>
val qualifiedLocation = tableSpec.location.map(makeQualifiedDBObjectPath(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR: seems path is not qualified in CreateTableAsSelect? cc @imback82

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it doesn't seem to be qualified. I will create a PR for that.

catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate,
invalidateCache) :: Nil
AtomicReplaceTableExec(staging, ident.asIdentifier, schema, parts,
tableSpec.copy(location = qualifiedLocation),
orCreate = orCreate, invalidateCache) :: Nil
case _ =>
ReplaceTableExec(
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate,
ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, schema, parts,
tableSpec.copy(location = qualifiedLocation), orCreate = orCreate,
invalidateCache) :: Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.catalyst.plans.logical.TableSpec
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.StructType
Expand All @@ -33,10 +34,12 @@ case class ReplaceTableExec(
ident: Identifier,
tableSchema: StructType,
partitioning: Seq[Transform],
tableProperties: Map[String, String],
tableSpec: TableSpec,
orCreate: Boolean,
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends LeafV2CommandExec {

val tableProperties = CatalogV2Util.convertTableProperties(tableSpec)

override protected def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
Expand All @@ -57,10 +60,12 @@ case class AtomicReplaceTableExec(
identifier: Identifier,
tableSchema: StructType,
partitioning: Seq[Transform],
tableProperties: Map[String, String],
tableSpec: TableSpec,
orCreate: Boolean,
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends LeafV2CommandExec {

val tableProperties = CatalogV2Util.convertTableProperties(tableSpec)

override protected def run(): Seq[InternalRow] = {
if (catalog.tableExists(identifier)) {
val table = catalog.loadTable(identifier)
Expand Down