diff --git a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 index e614ee9ed3e8..7bd556acc5aa 100644 --- a/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 +++ b/spark/v3.2/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4 @@ -74,7 +74,14 @@ statement | ALTER TABLE multipartIdentifier SET IDENTIFIER_KW FIELDS fieldList #setIdentifierFields | ALTER TABLE multipartIdentifier DROP IDENTIFIER_KW FIELDS fieldList #dropIdentifierFields | ALTER TABLE multipartIdentifier createReplaceBranchClause #createOrReplaceBranch + | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DROP BRANCH (IF EXISTS)? identifier #dropBranch + | ALTER TABLE multipartIdentifier DROP TAG (IF EXISTS)? identifier #dropTag + ; + +createReplaceTagClause + : (CREATE OR)? REPLACE TAG identifier tagOptions + | CREATE TAG (IF NOT EXISTS)? identifier tagOptions ; createReplaceBranchClause @@ -82,6 +89,10 @@ createReplaceBranchClause | CREATE BRANCH (IF NOT EXISTS)? identifier branchOptions ; +tagOptions + : (AS OF VERSION snapshotId)? (refRetain)? + ; + branchOptions : (AS OF VERSION snapshotId)? (refRetain)? (snapshotRetention)? ; @@ -203,8 +214,8 @@ fieldList nonReserved : ADD | ALTER | AS | ASC | BRANCH | BY | CALL | CREATE | DAYS | DESC | DROP | EXISTS | FIELD | FIRST | HOURS | IF | LAST | NOT | NULLS | OF | OR | ORDERED | PARTITION | TABLE | WRITE - | DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | RETENTION | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS - | TRUE | FALSE + | DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS + | TAG | TRUE | FALSE | MAP ; @@ -212,6 +223,10 @@ snapshotId : number ; +numSnapshots + : number + ; + timeUnit : DAYS | HOURS @@ -254,6 +269,7 @@ SET: 'SET'; SNAPSHOT: 'SNAPSHOT'; SNAPSHOTS: 'SNAPSHOTS'; TABLE: 'TABLE'; +TAG: 'TAG'; UNORDERED: 'UNORDERED'; VERSION: 'VERSION'; WITH: 'WITH'; diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 9b71ec8f5c6e..2996ceb366e1 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -212,7 +212,10 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI private def isSnapshotRefDdl(normalized: String): Boolean = { normalized.contains("create branch") || normalized.contains("replace branch") || - normalized.contains("drop branch") + normalized.contains("create tag") || + normalized.contains("replace tag") || + normalized.contains("drop branch") || + normalized.contains("drop tag") } protected def parse[T](command: String)(toResult: IcebergSqlExtensionsParser => T): T = { diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala index 09144514abfa..f758cb08fd3d 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala @@ -41,15 +41,18 @@ import org.apache.spark.sql.catalyst.plans.logical.BranchOptions import org.apache.spark.sql.catalyst.plans.logical.CallArgument import org.apache.spark.sql.catalyst.plans.logical.CallStatement import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch +import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DropBranch import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField +import org.apache.spark.sql.catalyst.plans.logical.DropTag import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.NamedArgument import org.apache.spark.sql.catalyst.plans.logical.PositionalArgument import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.TagOptions import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.expressions @@ -131,6 +134,35 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS ifNotExists) } + /** + * Create an CREATE OR REPLACE TAG logical command. + */ + override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTag = withOrigin(ctx) { + val createTagClause = ctx.createReplaceTagClause() + + val tagName = createTagClause.identifier().getText + + val tagOptionsContext = Option(createTagClause.tagOptions()) + val snapshotId = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.snapshotId())) + .map(_.getText.toLong) + val tagRetain = tagOptionsContext.flatMap(tagOptions => Option(tagOptions.refRetain())) + val tagRefAgeMs = tagRetain.map(retain => + TimeUnit.valueOf(retain.timeUnit().getText.toUpperCase(Locale.ENGLISH)).toMillis(retain.number().getText.toLong)) + val tagOptions = TagOptions( + snapshotId, + tagRefAgeMs + ) + + val replace = createTagClause.REPLACE() != null + val ifNotExists = createTagClause.EXISTS() != null + + CreateOrReplaceTag(typedVisit[Seq[String]](ctx.multipartIdentifier), + tagName, + tagOptions, + replace, + ifNotExists) + } + /** * Create an DROP BRANCH logical command. */ @@ -138,6 +170,13 @@ class IcebergSqlExtensionsAstBuilder(delegate: ParserInterface) extends IcebergS DropBranch(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null) } + /** + * Create an DROP TAG logical command. + */ + override def visitDropTag(ctx: DropTagContext): DropTag = withOrigin(ctx) { + DropTag(typedVisit[Seq[String]](ctx.multipartIdentifier), ctx.identifier().getText, ctx.EXISTS() != null) + } + /** * Create an REPLACE PARTITION FIELD logical command. */ diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala new file mode 100644 index 000000000000..e48f7d8ed04c --- /dev/null +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/CreateOrReplaceTag.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class CreateOrReplaceTag( + table: Seq[String], + tag: String, + tagOptions: TagOptions, + replace: Boolean, + ifNotExists: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"CreateOrReplaceTag tag: ${tag} for table: ${table.quoted}" + } +} diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala new file mode 100644 index 000000000000..7e4b38e74d2f --- /dev/null +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DropTag.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +case class DropTag(table: Seq[String], tag: String, ifExists: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"DropTag tag: ${tag} for table: ${table.quoted}" + } +} diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala new file mode 100644 index 000000000000..85e3b95f4aba --- /dev/null +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TagOptions.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +case class TagOptions(snapshotId: Option[Long], snapshotRefRetain: Option[Long]) diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala new file mode 100644 index 000000000000..7ca193d1b156 --- /dev/null +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.TagOptions +import org.apache.spark.sql.connector.catalog._ + +case class CreateOrReplaceTagExec( + catalog: TableCatalog, + ident: Identifier, + tag: String, + tagOptions: TagOptions, + replace: Boolean, + ifNotExists: Boolean) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val snapshotId: java.lang.Long = tagOptions.snapshotId + .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId())) + .map(java.lang.Long.valueOf) + .orNull + + Preconditions.checkArgument(snapshotId != null, + "Cannot complete create or replace tag operation on %s, main has no snapshot", ident) + + val manageSnapshot = iceberg.table.manageSnapshots() + if (!replace) { + val ref = iceberg.table().refs().get(tag) + if (ref != null && ifNotExists) { + return Nil + } + + manageSnapshot.createTag(tag, snapshotId) + } else { + manageSnapshot.replaceTag(tag, snapshotId) + } + + if (tagOptions.snapshotRefRetain.nonEmpty) { + manageSnapshot.setMaxRefAgeMs(tag, tagOptions.snapshotRefRetain.get) + } + + manageSnapshot.commit() + + case table => + throw new UnsupportedOperationException(s"Cannot create tag to non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"Create tag: $tag for table: ${ident.quoted}" + } +} diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala new file mode 100644 index 000000000000..0a1c17c0b1b2 --- /dev/null +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTagExec.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.TableCatalog + +case class DropTagExec( + catalog: TableCatalog, + ident: Identifier, + tag: String, + ifExists: Boolean) extends LeafV2CommandExec { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable => + val ref = iceberg.table().refs().get(tag) + if (ref != null || !ifExists) { + iceberg.table().manageSnapshots().removeTag(tag).commit() + } + + case table => + throw new UnsupportedOperationException(s"Cannot drop tag on non-Iceberg table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropTag tag: ${tag} for table: ${ident.quoted}" + } +} diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 19d5fbedbfe1..326574bf25e4 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -32,10 +32,12 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField import org.apache.spark.sql.catalyst.plans.logical.Call import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch +import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable import org.apache.spark.sql.catalyst.plans.logical.DropBranch import org.apache.spark.sql.catalyst.plans.logical.DropIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.DropPartitionField +import org.apache.spark.sql.catalyst.plans.logical.DropTag import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.MergeRows import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode @@ -70,12 +72,18 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi ReplacePartitionFieldExec(catalog, ident, transformFrom, transformTo, name) :: Nil case CreateOrReplaceBranch( - IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => + IcebergCatalogAndIdentifier(catalog, ident), branch, branchOptions, replace, ifNotExists) => CreateOrReplaceBranchExec(catalog, ident, branch, branchOptions, replace, ifNotExists) :: Nil + case CreateOrReplaceTag(IcebergCatalogAndIdentifier(catalog, ident), tag, tagOptions, replace, ifNotExists) => + CreateOrReplaceTagExec(catalog, ident, tag, tagOptions, replace, ifNotExists) :: Nil + case DropBranch(IcebergCatalogAndIdentifier(catalog, ident), branch, ifExists) => DropBranchExec(catalog, ident, branch, ifExists) :: Nil + case DropTag(IcebergCatalogAndIdentifier(catalog, ident), tag, ifExists) => + DropTagExec(catalog, ident, tag, ifExists) :: Nil + case SetIdentifierFields(IcebergCatalogAndIdentifier(catalog, ident), fields) => SetIdentifierFieldsExec(catalog, ident, fields) :: Nil diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java new file mode 100644 index 000000000000..ec3148de6cb5 --- /dev/null +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestTagDDL extends SparkExtensionsTestBase { + private static final String[] TIME_UNITS = {"DAYS", "HOURS", "MINUTES"}; + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK.catalogName(), + SparkCatalogConfig.SPARK.implementation(), + SparkCatalogConfig.SPARK.properties() + } + }; + } + + public TestTagDDL(String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @Before + public void before() { + sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName); + } + + @After + public void removeTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testCreateTagWithRetain() throws NoSuchTableException { + Table table = insertRows(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + long maxRefAge = 10L; + + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + + for (String timeUnit : TIME_UNITS) { + String tagName = "t1" + timeUnit; + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, firstSnapshotId, maxRefAge, timeUnit); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", firstSnapshotId, ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", + TimeUnit.valueOf(timeUnit.toUpperCase(Locale.ENGLISH)).toMillis(maxRefAge), + ref.maxRefAgeMs().longValue()); + } + + String tagName = "t1"; + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN", + tableName, tagName, firstSnapshotId, maxRefAge)); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input", + () -> sql("ALTER TABLE %s CREATE TAG %s RETAIN %s DAYS", tableName, tagName, "abc")); + + AssertHelpers.assertThrows( + "Illegal statement", + IcebergParseException.class, + "mismatched input 'SECONDS' expecting {'DAYS', 'HOURS', 'MINUTES'}", + () -> + sql( + "ALTER TABLE %s CREATE TAG %s AS OF VERSION %d RETAIN %d SECONDS", + tableName, tagName, firstSnapshotId, maxRefAge)); + } + + @Test + public void testCreateTagOnEmptyTable() { + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "abc")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining( + "Cannot complete create or replace tag operation on %s, main has no snapshot", + tableName); + } + + @Test + public void testCreateTagUseDefaultConfig() throws NoSuchTableException { + Table table = insertRows(); + long snapshotId = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + + AssertHelpers.assertThrows( + "unknown snapshot", + ValidationException.class, + "unknown snapshot: -1", + () -> sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, -1)); + + sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId()); + Assert.assertNull( + "The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs()); + + AssertHelpers.assertThrows( + "Cannot create an exist tag", + IllegalArgumentException.class, + "already exists", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, tagName)); + + AssertHelpers.assertThrows( + "Non-conforming tag name", + IcebergParseException.class, + "mismatched input '123'", + () -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "123")); + + table.manageSnapshots().removeTag(tagName).commit(); + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + snapshotId = table.currentSnapshot().snapshotId(); + sql("ALTER TABLE %s CREATE TAG %s AS OF VERSION %d", tableName, tagName, snapshotId); + table.refresh(); + ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", snapshotId, ref.snapshotId()); + Assert.assertNull( + "The tag needs to have the default max ref age, which is null.", ref.maxRefAgeMs()); + } + + @Test + public void testCreateTagIfNotExists() throws NoSuchTableException { + long maxSnapshotAge = 2L; + Table table = insertRows(); + String tagName = "t1"; + sql("ALTER TABLE %s CREATE TAG %s RETAIN %d days", tableName, tagName, maxSnapshotAge); + sql("ALTER TABLE %s CREATE TAG IF NOT EXISTS %s", tableName, tagName); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", + table.currentSnapshot().snapshotId(), + ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", + TimeUnit.DAYS.toMillis(maxSnapshotAge), + ref.maxRefAgeMs().longValue()); + } + + @Test + public void testReplaceTagFailsForBranch() throws NoSuchTableException { + String branchName = "branch1"; + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createBranch(branchName, first).commit(); + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + AssertHelpers.assertThrows( + "Cannot perform replace tag on branches", + IllegalArgumentException.class, + "Ref branch1 is a branch not a tag", + () -> sql("ALTER TABLE %s REPLACE Tag %s", tableName, branchName, second)); + } + + @Test + public void testReplaceTag() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + long expectedMaxRefAgeMs = 1000; + table + .manageSnapshots() + .createTag(tagName, first) + .setMaxRefAgeMs(tagName, expectedMaxRefAgeMs) + .commit(); + + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + sql("ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", tableName, tagName, second); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", second, ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", + expectedMaxRefAgeMs, + ref.maxRefAgeMs().longValue()); + } + + @Test + public void testReplaceTagDoesNotExist() throws NoSuchTableException { + Table table = insertRows(); + + AssertHelpers.assertThrows( + "Cannot perform replace tag on tag which does not exist", + IllegalArgumentException.class, + "Tag does not exist", + () -> + sql( + "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d", + tableName, "someTag", table.currentSnapshot().snapshotId())); + } + + @Test + public void testReplaceTagWithRetain() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, first).commit(); + insertRows(); + long second = table.currentSnapshot().snapshotId(); + + long maxRefAge = 10; + for (String timeUnit : TIME_UNITS) { + sql( + "ALTER TABLE %s REPLACE Tag %s AS OF VERSION %d RETAIN %d %s", + tableName, tagName, second, maxRefAge, timeUnit); + + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", second, ref.snapshotId()); + Assert.assertEquals( + "The tag needs to have the correct max ref age.", + TimeUnit.valueOf(timeUnit).toMillis(maxRefAge), + ref.maxRefAgeMs().longValue()); + } + } + + @Test + public void testCreateOrReplace() throws NoSuchTableException { + Table table = insertRows(); + long first = table.currentSnapshot().snapshotId(); + String tagName = "t1"; + insertRows(); + long second = table.currentSnapshot().snapshotId(); + table.manageSnapshots().createTag(tagName, second).commit(); + + sql("ALTER TABLE %s CREATE OR REPLACE TAG %s AS OF VERSION %d", tableName, tagName, first); + table.refresh(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", first, ref.snapshotId()); + } + + @Test + public void testDropTag() throws NoSuchTableException { + insertRows(); + Table table = validationCatalog.loadTable(tableIdent); + String tagName = "t1"; + table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit(); + SnapshotRef ref = table.refs().get(tagName); + Assert.assertEquals( + "The tag needs to point to a specific snapshot id.", + table.currentSnapshot().snapshotId(), + ref.snapshotId()); + + sql("ALTER TABLE %s DROP TAG %s", tableName, tagName); + table.refresh(); + ref = table.refs().get(tagName); + Assert.assertNull("The tag needs to be dropped.", ref); + } + + @Test + public void testDropTagNonConformingName() { + AssertHelpers.assertThrows( + "Non-conforming tag name", + IcebergParseException.class, + "mismatched input '123'", + () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "123")); + } + + @Test + public void testDropTagDoesNotExist() { + AssertHelpers.assertThrows( + "Cannot perform drop tag on tag which does not exist", + IllegalArgumentException.class, + "Tag does not exist: nonExistingTag", + () -> sql("ALTER TABLE %s DROP TAG %s", tableName, "nonExistingTag")); + } + + @Test + public void testDropTagFailesForBranch() throws NoSuchTableException { + String branchName = "b1"; + Table table = insertRows(); + table.manageSnapshots().createBranch(branchName, table.currentSnapshot().snapshotId()).commit(); + + AssertHelpers.assertThrows( + "Cannot perform drop tag on branch", + IllegalArgumentException.class, + "Ref b1 is a branch not a tag", + () -> sql("ALTER TABLE %s DROP TAG %s", tableName, branchName)); + } + + @Test + public void testDropTagIfExists() throws NoSuchTableException { + String tagName = "nonExistingTag"; + Table table = insertRows(); + Assert.assertNull("The tag does not exists.", table.refs().get(tagName)); + + sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName); + table.refresh(); + Assert.assertNull("The tag still does not exist.", table.refs().get(tagName)); + + table.manageSnapshots().createTag(tagName, table.currentSnapshot().snapshotId()).commit(); + Assert.assertEquals( + "The tag has been created successfully.", + table.currentSnapshot().snapshotId(), + table.refs().get(tagName).snapshotId()); + + sql("ALTER TABLE %s DROP TAG IF EXISTS %s", tableName, tagName); + table.refresh(); + Assert.assertNull("The tag needs to be dropped.", table.refs().get(tagName)); + } + + private Table insertRows() throws NoSuchTableException { + List records = + ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + return validationCatalog.loadTable(tableIdent); + } +}