diff --git a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index 8e41a8afc9c7..40c8de813cbd 100644 --- a/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark3-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -69,6 +69,7 @@ statement : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call | ALTER TABLE multipartIdentifier ADD PARTITION FIELD transform (AS name=identifier)? #addPartitionField | ALTER TABLE multipartIdentifier DROP PARTITION FIELD transform #dropPartitionField + | ALTER TABLE multipartIdentifier REPLACE PARTITION FIELD transform WITH transform (AS name=identifier)? #replacePartitionField | ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering ; @@ -179,8 +180,10 @@ LOCALLY: 'LOCALLY'; NULLS: 'NULLS'; ORDERED: 'ORDERED'; PARTITION: 'PARTITION'; +REPLACE: 'REPLACE'; TABLE: 'TABLE'; UNORDERED: 'UNORDERED'; +WITH: 'WITH'; WRITE: 'WRITE'; TRUE: 'TRUE'; diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 8aecf8b13e2b..c19401a4d1c9 100644 --- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -114,6 +114,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI normalized.startsWith("alter table") && ( normalized.contains("add partition field") || normalized.contains("drop partition field") || + normalized.contains("replace partition field") || normalized.contains("write ordered by") || normalized.contains("write locally ordered by") || normalized.contains("write distributed by") || diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 6370581eb7ba..3d7a50e8af74 100644 --- a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.NamedArgument import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument +import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering import org.apache.spark.sql.connector.expressions import org.apache.spark.sql.connector.expressions.ApplyTransform @@ -80,6 +81,18 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS typedVisit[Transform](ctx.transform)) } + + /** + * Create an CHANGE PARTITION FIELD logical command. + */ + override def visitReplacePartitionField(ctx: ReplacePartitionFieldContext): ReplacePartitionField = withOrigin(ctx) { + ReplacePartitionField( + typedVisit[Seq[String]](ctx.multipartIdentifier), + typedVisit[Transform](ctx.transform(0)), + typedVisit[Transform](ctx.transform(1)), + Option(ctx.name).map(_.getText)) + } + /** * Create a [[SetWriteDistributionAndOrdering]] for changing the write distribution and ordering. */ diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplacePartitionField.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplacePartitionField.scala new file mode 100644 index 000000000000..3ad8c59bfc82 --- /dev/null +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplacePartitionField.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.expressions.Transform + +case class ReplacePartitionField( + table: Seq[String], + transformFrom: Transform, + transformTo: Transform, + name: Option[String]) extends Command { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"ReplacePartitionField ${table.quoted} ${transformFrom.describe} " + + s"with ${name.map(n => s"$n=").getOrElse("")}${transformTo.describe}" + } +} diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index cb9a0652705c..d5901a8446ce 100644 --- a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical.DynamicFileFilterWithCardinal import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.MergeInto import org.apache.spark.sql.catalyst.plans.logical.ReplaceData +import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog @@ -62,6 +63,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy { case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) => DropPartitionFieldExec(catalog, ident, transform) :: Nil + case ReplacePartitionField(IcebergCatalogAndIdentifier(catalog, ident), transformFrom, transformTo, name) => + ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil + case SetWriteDistributionAndOrdering( IcebergCatalogAndIdentifier(catalog, ident), distributionMode, ordering) => SetWriteDistributionAndOrderingExec(catalog, ident, distributionMode, ordering) :: Nil diff --git a/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplacePartitionFieldExec.scala b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplacePartitionFieldExec.scala new file mode 100644 index 000000000000..11e900c77b1e --- /dev/null +++ b/spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplacePartitionFieldExec.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.expressions.FieldReference +import org.apache.spark.sql.connector.expressions.IdentityTransform +import org.apache.spark.sql.connector.expressions.Transform + +case class ReplacePartitionFieldExec( + catalog: TableCatalog, + ident: Identifier, + transformFrom: Transform, + transformTo: Transform, + name: Option[String]) extends V2CommandExec { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val schema = iceberg.table.schema + transformFrom match { + case IdentityTransform(FieldReference(parts)) if parts.size == 1 && schema.findField(parts.head) == null => + // the name is not present in the Iceberg schema, so it must be a partition field name, not a column name + iceberg.table.updateSpec() + .removeField(parts.head) + .addField(name.orNull, Spark3Util.toIcebergTerm(transformTo)) + .commit() + + case _ => + iceberg.table.updateSpec() + .removeField(Spark3Util.toIcebergTerm(transformFrom)) + .addField(name.orNull, Spark3Util.toIcebergTerm(transformTo)) + .commit() + } + + case table => + throw new UnsupportedOperationException(s"Cannot replace partition field in non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"ReplacePartitionField ${catalog.name}.${ident.quoted} ${transformFrom.describe} " + + s"with ${name.map(n => s"$n=").getOrElse("")}${transformTo.describe}" + } +} diff --git a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java index 80206b875c4e..b016bb1cd810 100644 --- a/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java +++ b/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAlterTablePartitionFields.java @@ -272,4 +272,100 @@ public void testDropPartitionByName() { Assert.assertEquals("Should have new spec field", expected, table.spec()); } + + @Test + public void testReplacePartition() { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned()); + + sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName); + table.refresh(); + PartitionSpec expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .day("ts") + .build(); + Assert.assertEquals("Should have new spec field", expected, table.spec()); + + sql("ALTER TABLE %s REPLACE PARTITION FIELD days(ts) WITH hours(ts)", tableName); + table.refresh(); + expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("ts", "ts_day") + .hour("ts") + .build(); + Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec()); + } + + @Test + public void testReplacePartitionAndRename() { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned()); + + sql("ALTER TABLE %s ADD PARTITION FIELD days(ts)", tableName); + table.refresh(); + PartitionSpec expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .day("ts") + .build(); + Assert.assertEquals("Should have new spec field", expected, table.spec()); + + sql("ALTER TABLE %s REPLACE PARTITION FIELD days(ts) WITH hours(ts) AS hour_col", tableName); + table.refresh(); + expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("ts", "ts_day") + .hour("ts", "hour_col") + .build(); + Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec()); + } + + @Test + public void testReplaceNamedPartition() { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned()); + + sql("ALTER TABLE %s ADD PARTITION FIELD days(ts) AS day_col", tableName); + table.refresh(); + PartitionSpec expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .day("ts", "day_col") + .build(); + Assert.assertEquals("Should have new spec field", expected, table.spec()); + + sql("ALTER TABLE %s REPLACE PARTITION FIELD day_col WITH hours(ts)", tableName); + table.refresh(); + expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("ts", "day_col") + .hour("ts") + .build(); + Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec()); + } + + @Test + public void testReplaceNamedPartitionAndRenameDifferently() { + sql("CREATE TABLE %s (id bigint NOT NULL, category string, ts timestamp, data string) USING iceberg", tableName); + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertTrue("Table should start unpartitioned", table.spec().isUnpartitioned()); + + sql("ALTER TABLE %s ADD PARTITION FIELD days(ts) AS day_col", tableName); + table.refresh(); + PartitionSpec expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(1) + .day("ts", "day_col") + .build(); + Assert.assertEquals("Should have new spec field", expected, table.spec()); + + sql("ALTER TABLE %s REPLACE PARTITION FIELD day_col WITH hours(ts) AS hour_col", tableName); + table.refresh(); + expected = PartitionSpec.builderFor(table.schema()) + .withSpecId(2) + .alwaysNull("ts", "day_col") + .hour("ts", "hour_col") + .build(); + Assert.assertEquals("Should changed from daily to hourly partitioned field", expected, table.spec()); + } }