diff --git a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index 6a21e79c2803..d8128d39052e 100644 --- a/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -73,6 +73,13 @@ statement | ALTER TABLE multipartIdentifier WRITE writeSpec #setWriteDistributionAndOrdering | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields + | ALTER TABLE multipartIdentifier CREATE BRANCH identifier (AS OF VERSION snapshotId)? (RETAIN snapshotRefRetain snapshotRefRetainTimeUnit)? (snapshotRetentionClause)? #createBranch + ; + +snapshotRetentionClause + : WITH SNAPSHOT RETENTION numSnapshots SNAPSHOTS + | WITH SNAPSHOT RETENTION snapshotRetain snapshotRetainTimeUnit + | WITH SNAPSHOT RETENTION numSnapshots SNAPSHOTS snapshotRetain snapshotRetainTimeUnit ; writeSpec @@ -168,34 +175,76 @@ fieldList ; nonReserved - : ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE - | DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET + : ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | FIELD | FIRST | HOURS | LAST | NULLS | OF | ORDERED | PARTITION | TABLE | WRITE + | DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS | TRUE | FALSE | MAP ; +snapshotId + : number + ; + +numSnapshots + : number + ; + +snapshotRetain + : number + ; + +snapshotRefRetain + : number + ; + +snapshotRefRetainTimeUnit + : timeUnit + ; + +snapshotRetainTimeUnit + : timeUnit + ; + +timeUnit + : DAYS + | HOURS + | MINUTES + ; + ADD: 'ADD'; ALTER: 'ALTER'; AS: 'AS'; ASC: 'ASC'; +BRANCH: 'BRANCH'; BY: 'BY'; CALL: 'CALL'; +DAYS: 'DAYS'; DESC: 'DESC'; DISTRIBUTED: 'DISTRIBUTED'; DROP: 'DROP'; FIELD: 'FIELD'; FIELDS: 'FIELDS'; FIRST: 'FIRST'; +HOURS: 'HOURS'; LAST: 'LAST'; LOCALLY: 'LOCALLY'; +MINUTES: 'MINUTES'; +MONTHS: 'MONTHS'; +CREATE: 'CREATE'; NULLS: 'NULLS'; +OF: 'OF'; ORDERED: 'ORDERED'; PARTITION: 'PARTITION'; REPLACE: 'REPLACE'; +RETAIN: 'RETAIN'; +RETENTION: 'RETENTION'; IDENTIFIER_KW: 'IDENTIFIER'; SET: 'SET'; +SNAPSHOT: 'SNAPSHOT'; +SNAPSHOTS: 'SNAPSHOTS'; TABLE: 'TABLE'; UNORDERED: 'UNORDERED'; +VERSION: 'VERSION'; WITH: 'WITH'; WRITE: 'WRITE'; diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 0af1caf43ae7..4c059f7c343b 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -205,7 +205,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI normalized.contains("write distributed by") || normalized.contains("write unordered") || normalized.contains("set identifier fields") || - normalized.contains("drop identifier fields"))) + normalized.contains("drop identifier fields") || + normalized.contains("create branch"))) + } protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = { diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 2e2e15ed66a6..950e161f9f99 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.parser.extensions +import java.util.Locale +import java.util.concurrent.TimeUnit import org.antlr.v4.runtime._ import org.antlr.v4.runtime.misc.Interval import org.antlr.v4.runtime.tree.ParseTree @@ -26,8 +28,6 @@ import org.antlr.v4.runtime.tree.TerminalNode import org.apache.iceberg.DistributionMode import org.apache.iceberg.NullOrder import org.apache.iceberg.SortDirection -import org.apache.iceberg.SortOrder -import org.apache.iceberg.UnboundSortOrder import org.apache.iceberg.expressions.Term import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.AnalysisException @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParse import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.CallArgument import org.apache.spark.sql.catalyst.plans.logical.CallStatement +import org.apache.spark.sql.catalyst.plans.logical.CreateBranch import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -90,6 +91,26 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS typedVisit[Transform](ctx.transform)) } + /** + * Create an ADD BRANCH logical command. + */ + override def visitCreateBranch(ctx: CreateBranchContext): CreateBranch = withOrigin(ctx) { + val snapshotRetention = Option(ctx.snapshotRetentionClause()) + + CreateBranch( + typedVisit[Seq[String]](ctx.multipartIdentifier), + ctx.identifier().getText, + Option(ctx.snapshotId()).map(_.getText.toLong), + snapshotRetention.flatMap(s => Option(s.numSnapshots())).map(_.getText.toLong), + snapshotRetention.flatMap(s => Option(s.snapshotRetain())).map(retain => { + TimeUnit.valueOf(ctx.snapshotRetentionClause().snapshotRetainTimeUnit().getText.toUpperCase(Locale.ENGLISH)) + .toMillis(retain.getText.toLong) + }), + Option(ctx.snapshotRefRetain()).map(retain => { + TimeUnit.valueOf(ctx.snapshotRefRetainTimeUnit().getText.toUpperCase(Locale.ENGLISH)) + .toMillis(retain.getText.toLong) + })) + } /** * Create an REPLACE PARTITION FIELD logical command. diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateBranch.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateBranch.scala new file mode 100644 index 000000000000..91e2bc6f1951 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateBranch.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class CreateBranch(table: Seq[String], branch: String, snapshotId: Option[Long], numSnapshots: Option[Long], + snapshotRetain: Option[Long], snapshotRefRetain: Option[Long]) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"Create branch: ${branch} for table: ${table.quoted} " + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateBranchExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateBranchExec.scala new file mode 100644 index 000000000000..acaab93b0bd0 --- /dev/null +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateBranchExec.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.CreateBranch +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.TableCatalog + +case class CreateBranchExec( + catalog: TableCatalog, + ident: Identifier, + createBranch: CreateBranch) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + + val snapshotId = createBranch.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId()) + val manageSnapshot = iceberg.table.manageSnapshots() + .createBranch(createBranch.branch, snapshotId) + + if (createBranch.numSnapshots.nonEmpty) { + manageSnapshot.setMinSnapshotsToKeep(createBranch.branch, createBranch.numSnapshots.get.toInt) + } + + if (createBranch.snapshotRetain.nonEmpty) { + manageSnapshot.setMaxSnapshotAgeMs(createBranch.branch, createBranch.snapshotRetain.get) + } + + if (createBranch.snapshotRefRetain.nonEmpty) { + manageSnapshot.setMaxRefAgeMs(createBranch.branch, createBranch.snapshotRefRetain.get) + } + + manageSnapshot.commit() + + case table => + throw new UnsupportedOperationException(s"Cannot add branch to non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"Create branch: ${createBranch.branch} operation for table: ${ident.quoted}" + } +} diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index b5eb3e47b7cc..08c1c1dae61d 100644 --- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.Call +import org.apache.spark.sql.catalyst.plans.logical.CreateBranch import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField @@ -61,6 +62,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case AddPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform, name) => AddPartitionFieldExec(catalog, ident, transform, name) :: Nil + case CreateBranch(IcebergCatalogAndIdentifier(catalog, ident), _, _, _, _, _) => + CreateBranchExec(catalog, ident, plan.asInstanceOf[CreateBranch]) :: Nil + case DropPartitionField(IcebergCatalogAndIdentifier(catalog, ident), transform) => DropPartitionFieldExec(catalog, ident, transform) :: Nil diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java new file mode 100644 index 000000000000..0379bcf7a91d --- /dev/null +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateBranch.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.IllegalFormatConversionException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestCreateBranch extends SparkExtensionsTestBase { + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + } + }; + } + + public TestCreateBranch(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateBranch() throws NoSuchTableException { + Table table = createDefaultTableAndInsert2Row(); + long snapshotId = table.currentSnapshot().snapshotId(); + String branchName = "b1"; + Integer minSnapshotsToKeep = 2; + long maxSnapshotAge = 2L; + long maxRefAge = 10L; + sql( + "ALTER TABLE %s CREATE BRANCH %s AS OF VERSION %d RETAIN %d DAYS WITH SNAPSHOT RETENTION %d SNAPSHOTS %d days", + tableName, branchName, snapshotId, maxRefAge, minSnapshotsToKeep, maxSnapshotAge); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + Assert.assertNotNull(ref); + Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + + AssertHelpers.assertThrows( + "Cannot create an existing branch", + IllegalArgumentException.class, + "already exists", + () -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName)); + } + + @Test + public void testCreateBranchUseDefaultConfig() throws NoSuchTableException { + Table table = createDefaultTableAndInsert2Row(); + String branchName = "b1"; + sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + Assert.assertNotNull(ref); + Assert.assertNull(ref.minSnapshotsToKeep()); + Assert.assertNull(ref.maxSnapshotAgeMs()); + Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMinSnapshotsToKeep() throws NoSuchTableException { + Integer minSnapshotsToKeep = 2; + Table table = createDefaultTableAndInsert2Row(); + String branchName = "b1"; + sql( + "ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS", + tableName, branchName, minSnapshotsToKeep); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + Assert.assertNotNull(ref); + Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); + Assert.assertNull(ref.maxSnapshotAgeMs()); + Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMaxSnapshotAge() throws NoSuchTableException { + long maxSnapshotAge = 2L; + Table table = createDefaultTableAndInsert2Row(); + String branchName = "b1"; + sql( + "ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d DAYS", + tableName, branchName, maxSnapshotAge); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + Assert.assertNotNull(ref); + Assert.assertNull(ref.minSnapshotsToKeep()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); + Assert.assertNull(ref.maxRefAgeMs()); + } + + @Test + public void testCreateBranchUseCustomMinSnapshotsToKeepAndMaxSnapshotAge() + throws NoSuchTableException { + Integer minSnapshotsToKeep = 2; + long maxSnapshotAge = 2L; + Table table = createDefaultTableAndInsert2Row(); + String branchName = "b1"; + sql( + "ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION %d SNAPSHOTS %d DAYS", + tableName, branchName, minSnapshotsToKeep, maxSnapshotAge); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + Assert.assertNotNull(ref); + Assert.assertEquals(minSnapshotsToKeep, ref.minSnapshotsToKeep()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxSnapshotAge), ref.maxSnapshotAgeMs().longValue()); + Assert.assertNull(ref.maxRefAgeMs()); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "no viable alternative at input 'WITH SNAPSHOT RETENTION'", + () -> + sql("ALTER TABLE %s CREATE BRANCH %s WITH SNAPSHOT RETENTION", tableName, branchName)); + } + + @Test + public void testCreateBranchUseCustomMaxRefAge() throws NoSuchTableException { + long maxRefAge = 10L; + Table table = createDefaultTableAndInsert2Row(); + String branchName = "b1"; + sql("ALTER TABLE %s CREATE BRANCH %s RETAIN %d DAYS", tableName, branchName, maxRefAge); + table.refresh(); + SnapshotRef ref = table.refs().get(branchName); + Assert.assertNotNull(ref); + Assert.assertNull(ref.minSnapshotsToKeep()); + Assert.assertNull(ref.maxSnapshotAgeMs()); + Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue()); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> sql("ALTER TABLE %s CREATE BRANCH %s RETAIN", tableName, branchName)); + + AssertHelpers.assertThrows( + "Illegal statement", + IllegalFormatConversionException.class, + "d != java.lang.String", + () -> sql("ALTER TABLE %s CREATE BRANCH %s RETAIN %d DAYS", tableName, branchName, "abc")); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}", + () -> + sql( + "ALTER TABLE %s CREATE BRANCH %s RETAIN %d SECONDS", + tableName, branchName, maxRefAge)); + } + + private Table createDefaultTableAndInsert2Row() throws NoSuchTableException { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + return validationCatalog.loadTable(tableIdent); + } +}