Skip to content

Commit 3323d2e

Browse files
committed
[SPARK-37496][SQL] Migrate ReplaceTableAsSelectStatement to v2 command
1 parent 3657703 commit 3323d2e

File tree

12 files changed

+100
-132
lines changed

12 files changed

+100
-132
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
5959
c.partitioning ++ c.bucketSpec.map(_.asTransform),
6060
convertTableProperties(c),
6161
orCreate = c.orCreate)
62-
63-
case c @ ReplaceTableAsSelectStatement(
64-
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
65-
ReplaceTableAsSelect(
66-
catalog.asTableCatalog,
67-
tbl.asIdentifier,
68-
// convert the bucket spec and add it as a transform
69-
c.partitioning ++ c.bucketSpec.map(_.asTransform),
70-
c.asSelect,
71-
convertTableProperties(c),
72-
writeOptions = c.writeOptions,
73-
orCreate = c.orCreate)
7462
}
7563

7664
object NonSessionCatalogAndTable {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3487,7 +3487,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
34873487
}
34883488

34893489
/**
3490-
* Replace a table, returning a [[ReplaceTableStatement]] logical plan.
3490+
* Replace a table, returning a [[ReplaceTableStatement]] or [[ReplaceTableAsSelect]]
3491+
* logical plan.
34913492
*
34923493
* Expected format:
34933494
* {{{
@@ -3553,9 +3554,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
35533554
ctx)
35543555

35553556
case Some(query) =>
3556-
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
3557-
provider, options, location, comment, writeOptions = Map.empty, serdeInfo,
3558-
orCreate = orCreate)
3557+
val tableSpec = TableSpec(bucketSpec, properties, provider, options, location, comment,
3558+
serdeInfo, false)
3559+
ReplaceTableAsSelect(
3560+
UnresolvedDBObjectName(table, isNamespace = false),
3561+
partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
35593562

35603563
case _ =>
35613564
// Note: table schema includes both the table columns list and the partition columns

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -165,29 +165,6 @@ case class ReplaceTableStatement(
165165
serde: Option[SerdeInfo],
166166
orCreate: Boolean) extends LeafParsedStatement
167167

168-
/**
169-
* A REPLACE TABLE AS SELECT command, as parsed from SQL.
170-
*/
171-
case class ReplaceTableAsSelectStatement(
172-
tableName: Seq[String],
173-
asSelect: LogicalPlan,
174-
partitioning: Seq[Transform],
175-
bucketSpec: Option[BucketSpec],
176-
properties: Map[String, String],
177-
provider: Option[String],
178-
options: Map[String, String],
179-
location: Option[String],
180-
comment: Option[String],
181-
writeOptions: Map[String, String],
182-
serde: Option[SerdeInfo],
183-
orCreate: Boolean) extends UnaryParsedStatement {
184-
185-
override def child: LogicalPlan = asSelect
186-
override protected def withNewChildInternal(
187-
newChild: LogicalPlan): ReplaceTableAsSelectStatement = copy(asSelect = newChild)
188-
}
189-
190-
191168
/**
192169
* Column data as parsed by ALTER TABLE ... (ADD|REPLACE) COLUMNS.
193170
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
21-
import org.apache.spark.sql.catalyst.catalog.BucketSpec
21+
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, FunctionResource}
2222
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
23-
import org.apache.spark.sql.catalyst.catalog.FunctionResource
2423
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
2524
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
2625
import org.apache.spark.sql.catalyst.trees.BinaryLike
@@ -274,16 +273,17 @@ case class ReplaceTable(
274273
* If the table does not exist, and orCreate is false, then an exception will be thrown.
275274
*/
276275
case class ReplaceTableAsSelect(
277-
catalog: TableCatalog,
278-
tableName: Identifier,
276+
name: LogicalPlan,
279277
partitioning: Seq[Transform],
280278
query: LogicalPlan,
281-
properties: Map[String, String],
279+
tableSpec: TableSpec,
282280
writeOptions: Map[String, String],
283-
orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
281+
orCreate: Boolean) extends BinaryCommand with V2CreateTablePlan {
282+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
284283

285284
override def tableSchema: StructType = query.schema
286-
override def child: LogicalPlan = query
285+
override def left: LogicalPlan = name
286+
override def right: LogicalPlan = query
287287

288288
override lazy val resolved: Boolean = childrenResolved && {
289289
// the table schema is created from the query schema, so the only resolution needed is to check
@@ -292,12 +292,19 @@ case class ReplaceTableAsSelect(
292292
references.map(_.fieldNames).forall(query.schema.findNestedField(_).isDefined)
293293
}
294294

295+
override def tableName: Identifier = {
296+
assert(name.resolved)
297+
name.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
298+
}
299+
300+
override protected def withNewChildrenInternal(
301+
newLeft: LogicalPlan,
302+
newRight: LogicalPlan): LogicalPlan =
303+
copy(name = newLeft, query = newRight)
304+
295305
override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
296306
this.copy(partitioning = rewritten)
297307
}
298-
299-
override protected def withNewChildInternal(newChild: LogicalPlan): ReplaceTableAsSelect =
300-
copy(query = newChild)
301308
}
302309

303310
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Collections
2323
import scala.collection.JavaConverters._
2424

2525
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
26-
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
26+
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
2727
import org.apache.spark.sql.connector.catalog.TableChange._
2828
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
2929
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
@@ -314,10 +314,6 @@ private[sql] object CatalogV2Util {
314314
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
315315
}
316316

317-
def convertTableProperties(r: ReplaceTableAsSelectStatement): Map[String, String] = {
318-
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
319-
}
320-
321317
def convertTableProperties(
322318
properties: Map[String, String],
323319
options: Map[String, String],

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ class DDLParserSuite extends AnalysisTest {
722722
case ctas: CreateTableAsSelectStatement if newTableToken == "CREATE" =>
723723
assert(ctas.ifNotExists == expectedIfNotExists)
724724
case replace: ReplaceTableStatement if newTableToken == "REPLACE" =>
725-
case replace: ReplaceTableAsSelectStatement if newTableToken == "REPLACE" =>
725+
case replace: ReplaceTableAsSelect if newTableToken == "REPLACE" =>
726726
case other =>
727727
fail("First token in statement does not match the expected parsed plan; CREATE TABLE" +
728728
" should create a CreateTableStatement, and REPLACE TABLE should create a" +
@@ -2323,18 +2323,18 @@ class DDLParserSuite extends AnalysisTest {
23232323
ctas.comment,
23242324
ctas.serde,
23252325
ctas.external)
2326-
case rtas: ReplaceTableAsSelectStatement =>
2326+
case rtas: ReplaceTableAsSelect =>
23272327
TableSpec(
2328-
rtas.tableName,
2329-
Some(rtas.asSelect).filter(_.resolved).map(_.schema),
2328+
rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
2329+
Some(rtas.query).filter(_.resolved).map(_.schema),
23302330
rtas.partitioning,
2331-
rtas.bucketSpec,
2332-
rtas.properties,
2333-
rtas.provider,
2334-
rtas.options,
2335-
rtas.location,
2336-
rtas.comment,
2337-
rtas.serde)
2331+
rtas.tableSpec.bucketSpec,
2332+
rtas.tableSpec.properties,
2333+
rtas.tableSpec.provider,
2334+
rtas.tableSpec.options,
2335+
rtas.tableSpec.location,
2336+
rtas.tableSpec.comment,
2337+
rtas.tableSpec.serde)
23382338
case other =>
23392339
fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
23402340
s" from query, got ${other.getClass.getName}.")

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.annotation.Stable
2525
import org.apache.spark.sql.catalyst.TableIdentifier
26-
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
26+
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation}
2727
import org.apache.spark.sql.catalyst.catalog._
2828
import org.apache.spark.sql.catalyst.expressions.Literal
29-
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
29+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
3030
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3131
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
3232
import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -586,19 +586,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
586586
AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)
587587

588588
case (SaveMode.Overwrite, _) =>
589-
ReplaceTableAsSelectStatement(
590-
nameParts,
591-
df.queryExecution.analyzed,
592-
partitioningAsV2,
593-
None,
594-
Map.empty,
595-
Some(source),
596-
Map.empty,
597-
extraOptions.get("path"),
598-
extraOptions.get(TableCatalog.PROP_COMMENT),
599-
extraOptions.toMap,
600-
None,
601-
orCreate = true) // Create the table if it doesn't exist
589+
val tableSpec = TableSpec(None, Map.empty, Some(source), Map.empty,
590+
extraOptions.get("path"), extraOptions.get(TableCatalog.PROP_COMMENT),
591+
None, false)
592+
ReplaceTableAsSelect(
593+
UnresolvedDBObjectName(nameParts, isNamespace = false),
594+
partitioningAsV2, df.queryExecution.analyzed, tableSpec, writeOptions = Map.empty,
595+
orCreate = true) // Create the table if it doesn't exist
602596

603597
case (other, _) =>
604598
// We have a potential race condition here in AppendMode, if the table suddenly gets

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ import scala.collection.JavaConverters._
2121
import scala.collection.mutable
2222

2323
import org.apache.spark.annotation.Experimental
24-
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedRelation}
24+
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation}
2525
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
26-
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
26+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
2727
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
2828
import org.apache.spark.sql.errors.QueryCompilationErrors
2929
import org.apache.spark.sql.types.IntegerType
@@ -195,20 +195,12 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
195195
}
196196

197197
private def internalReplace(orCreate: Boolean): Unit = {
198-
runCommand(
199-
ReplaceTableAsSelectStatement(
200-
tableName,
201-
logicalPlan,
202-
partitioning.getOrElse(Seq.empty),
203-
None,
204-
properties.toMap,
205-
provider,
206-
Map.empty,
207-
None,
208-
None,
209-
options.toMap,
210-
None,
211-
orCreate = orCreate))
198+
val tableSpec = TableSpec(None, properties.toMap, provider, Map.empty,
199+
None, None, None, false)
200+
runCommand(ReplaceTableAsSelect(
201+
UnresolvedDBObjectName(tableName, isNamespace = false),
202+
partitioning.getOrElse(Seq.empty), logicalPlan, tableSpec, writeOptions = options.toMap,
203+
orCreate = orCreate))
212204
}
213205
}
214206

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2222
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
2323
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
2424
import org.apache.spark.sql.catalyst.plans.logical._
25-
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable => CatalystCreateTable}
2625
import org.apache.spark.sql.catalyst.rules.Rule
2726
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL}
2827
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table}
2928
import org.apache.spark.sql.connector.expressions.Transform
3029
import org.apache.spark.sql.errors.QueryCompilationErrors
3130
import org.apache.spark.sql.execution.command._
32-
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
31+
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, DataSource}
3332
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
3433
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3534
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
@@ -144,7 +143,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
144143

145144
// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
146145
// session catalog and the table provider is not v2.
147-
case c @ CatalystCreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
146+
case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _) =>
148147
val (storageFormat, provider) = getStorageFormatAndProvider(
149148
c.tableSpec.provider,
150149
c.tableSpec.options,
@@ -157,7 +156,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
157156
c.tableSpec.location, c.tableSpec.comment, storageFormat,
158157
c.tableSpec.external)
159158
val mode = if (c.ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
160-
CreateTable(tableDesc, mode, None)
159+
CreateTableV1(tableDesc, mode, None)
161160
} else {
162161
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
163162
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
@@ -173,7 +172,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
173172
c.partitioning, c.bucketSpec, c.properties, provider, c.location,
174173
c.comment, storageFormat, c.external)
175174
val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
176-
CreateTable(tableDesc, mode, Some(c.asSelect))
175+
CreateTableV1(tableDesc, mode, Some(c.asSelect))
177176
} else {
178177
CreateTableAsSelect(
179178
catalog.asTableCatalog,
@@ -210,21 +209,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
210209
orCreate = c.orCreate)
211210
}
212211

213-
case c @ ReplaceTableAsSelectStatement(
214-
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
215-
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
212+
case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)
213+
if isSessionCatalog(catalog) =>
214+
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
216215
if (!isV2Provider(provider)) {
217216
throw QueryCompilationErrors.replaceTableAsSelectOnlySupportedWithV2TableError
218217
} else {
219-
ReplaceTableAsSelect(
220-
catalog.asTableCatalog,
221-
tbl.asIdentifier,
222-
// convert the bucket spec and add it as a transform
223-
c.partitioning ++ c.bucketSpec.map(_.asTransform),
224-
c.asSelect,
225-
convertTableProperties(c),
226-
writeOptions = c.writeOptions,
227-
orCreate = c.orCreate)
218+
val newTableSpec = c.tableSpec.copy(bucketSpec = None)
219+
c.copy(partitioning = c.partitioning ++ c.tableSpec.bucketSpec.map(_.asTransform),
220+
tableSpec = newTableSpec)
228221
}
229222

230223
case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,29 +198,29 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
198198
invalidateCache) :: Nil
199199
}
200200

201-
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
202-
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
201+
case ReplaceTableAsSelect(ResolvedDBObjectName(catalog, ident),
202+
parts, query, tableSpec, options, orCreate) =>
203203
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
204204
catalog match {
205205
case staging: StagingTableCatalog =>
206206
AtomicReplaceTableAsSelectExec(
207207
staging,
208-
ident,
208+
ident.asIdentifier,
209209
parts,
210210
query,
211211
planLater(query),
212-
propsWithOwner,
212+
tableSpec,
213213
writeOptions,
214214
orCreate = orCreate,
215215
invalidateCache) :: Nil
216216
case _ =>
217217
ReplaceTableAsSelectExec(
218-
catalog,
219-
ident,
218+
catalog.asTableCatalog,
219+
ident.asIdentifier,
220220
parts,
221221
query,
222222
planLater(query),
223-
propsWithOwner,
223+
tableSpec,
224224
writeOptions,
225225
orCreate = orCreate,
226226
invalidateCache) :: Nil

0 commit comments

Comments
 (0)